drf celery

setttings

# celery_tasks/main.py

# ##################### 需先导入工程配置文件  #######################
#  作用: 比如 获取 Django项目的 redis 配置?!
import os
if not os.getenv('DJANGO_SETTINGS_MODULE'):
    # 设置 添加 Django项目 的 setting 路径 到 os 环境变量
    os.environ['DJANGO_SETTINGS_MODULE'] = 'manage_platform.settings'  # 要对应 自己的 Django 项目名

# ####################### 创建celery实例 ##########################
# main 其实 就是 给celery设置一个名字, 这个名字唯一 就可以
# 推荐使用 文件路径
from celery import Celery
app = Celery(main='celery_tasks')

# ########### 加载 config.py 配置文件(设置broker任务队列) ############
# 配置见下文
# app.config_from_object('celery_tasks.config')
app.config_from_object('django.conf:settings', namespace='CELERY')  # 使用CELERY_ 作为前缀,在settings中写配置

# ################# 实现 celery实例对象 自动检测任务 #################
# 参数: 列表
# 列表中 需要填写任务的包路径
app.autodiscover_tasks(["celery_tasks.solar_log_analy"])  # 发现任务文件每个app下的task.py

 

 views:

class AnalyProcessSolarVideo(views.APIView):
    """
    @description  :
    ---------
    @param  :
    -------
    @Returns  :
    -------
    """
    permission_classes = (MyPermission,)		# 重点

    coreapi_fields_get = (
            DocParam(name="model_id", location='query', description='模'),
            DocParam(name="token", location='header', description='token'),
        )
    coreapi_fields_post = (
            DocParam(name="zone_id", location='body', description=''),
            DocParam(name="task_id", location='body', description='id'),
        )
    coreapi_fields_put = (
        DocParam(name="token", location='header', description='token'),
        )
    coreapi_fields_delete = (
        DocParam(name="token", location='header', description='token'),
        )

    def post(self, request):
        """
        @description  :发起任务
        ---------
        @param  :
        -------
        @Returns  :
        -------
        """
        context = {'code': codes.normal_code, 'msg': message.normal_code}
        res = tasks.send_sms_code.delay("1385", "Hello Celery!")
        print("result:", res)
        return JsonResponse(context)

url:

urlpatterns = [
    path('api/v1/video_process_analy/', views.AnalyProcessSolarVideo.as_view()),  # celery 测试
]

 tasks.fun.delay之后,你的redis中会出现celery的keys,keys*可以查看所有的待执行任务id,执行一个就少一个

 查看任务执行结果

 


版权声明:本文为xiaofeixia666888原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。