celery日志和loguru结合

因为loguru的logger.exception可以打印出异常的堆栈,对于celery的任务调试非常友好,所以尝试将loguru和celery结合使用

# celery_app.py
import logging

from celery import Celery
from celery.signals import setup_logging
from loguru import logger

app: Celery = Celery("filewatcher", include=['tasks'])


class InterceptHandler(logging.Handler):
    def emit(self, record: logging.LogRecord):
        # Retrieve context where the logging call occurred, this happens to be in the 6th frame upward
        logger_opt = logger.opt(depth=6, exception=record.exc_info)
        mapper = {
            20: "INFO",
            10: "DEBUG",
            30: "WARNING",
            40: "ERROR",
            50: "CRITICAL"
        }
        logger_opt.log(mapper.get(record.levelno), record.getMessage())


handler = InterceptHandler()


@setup_logging.connect
def setup_logging(*args, **kwargs):
    logging.basicConfig(handlers=[InterceptHandler()], level="INFO")

任务里面获取到

# tasks.py
from celery.utils.log import get_task_logger
# celery_app是我定义celery的文件
from celery_app import app
logger = get_task_logger(__name__)


@app.task()
def handle_event(src_path: str) -> str:
    pass


版权声明:本文为zm429438709原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。