因为loguru的logger.exception可以打印出异常的堆栈,对于celery的任务调试非常友好,所以尝试将loguru和celery结合使用
# celery_app.py
import logging
from celery import Celery
from celery.signals import setup_logging
from loguru import logger
app: Celery = Celery("filewatcher", include=['tasks'])
class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord):
# Retrieve context where the logging call occurred, this happens to be in the 6th frame upward
logger_opt = logger.opt(depth=6, exception=record.exc_info)
mapper = {
20: "INFO",
10: "DEBUG",
30: "WARNING",
40: "ERROR",
50: "CRITICAL"
}
logger_opt.log(mapper.get(record.levelno), record.getMessage())
handler = InterceptHandler()
@setup_logging.connect
def setup_logging(*args, **kwargs):
logging.basicConfig(handlers=[InterceptHandler()], level="INFO")
任务里面获取到
# tasks.py
from celery.utils.log import get_task_logger
# celery_app是我定义celery的文件
from celery_app import app
logger = get_task_logger(__name__)
@app.task()
def handle_event(src_path: str) -> str:
pass
版权声明:本文为zm429438709原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。