DRF框架下使用django_celery_beat做一个前端可设置的定时任务

安装组合参考

pip install celery==5.0.5
pip install redis==3.5.3
pip install django-celery-beat==2.2.0
pip install django-celery-results==2.0.1

django_celery_beat模型类

在这里插入图片描述
Schedule 定时任务, Periods 周期性

SolarSchedule Solar是太阳,整体的作用就是输入经度纬度来控制任务的起停(不用)
IntervalSchedule 按时间间隔定时任务,(例:每间隔1H执行一次)
ClockedSchedule 指定某个时刻执行, (例:2018年8月8号 8:00这个时刻执行)
CrontabSchedule 时间控制器定时任务 (例:每年的12月星期一的8:30)
PeriodicTasks 周期性的任务
PeriodicTask 周期性的任务 (这里使用的是这个)

主要django_celery_beat模型类,拥有的字段

IntervalSchedule 时间间隔定时任务

every 间隔数
period 间隔单位
在这里插入图片描述

ClockedSchedule 特定时刻定时器

clocked_time 指定时间
在这里插入图片描述

CrontabSchedule 时间控制器定时任务

month_of_year
day_of_month
day_of_week
hour
minute
timezone 时区
在这里插入图片描述

PeriodicTask 周期性的任务

name:任务名
task:指定的任务
interval:时间间隔
crontab:时间控制器
clocked:指定时刻控制器
expires:有效日期
one_off:启用状态(如果为True,调度将只运行该任务一次)
start_time:开始时间
enabled:启用
last_run_at:最后运行时间
total_run_count:运行总次数
date_changed:最后的更改时间
description:描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

项目设计

需求:一个项目只有一个任务计划(taskplan),一个任务计划下对应一个定时任务(schedule)

在这里插入图片描述
思路:新建项目时,新建任务计划,和定时任务。项目和任务计划一对一,任务计划和定时任务也是一对一。
celery和django_celery_beat都按之前的配置:[https://blog.csdn.net/weixin_43665351/article/details/120157942]

tasksplan模块
models.py

from django_celery_beat.models import PeriodicTask
from django.db import models

from projects.models import Projects


class TasksPlan(models.Model):
    name = models.CharField(verbose_name='计划名称', max_length=200, help_text='计划名称')
    schedule = models.OneToOneField('django_celery_beat.PeriodicTask', on_delete=models.CASCADE, related_name="schedule", help_text="项目控制器")
    project = models.OneToOneField(Projects, on_delete=models.CASCADE, related_name='Projects', help_text='所属项目')

    class Meta:
        db_table = 't_taskplan'
        verbose_name = "任务计划"
        verbose_name_plural = "任务计划"

    def __str__(self):
        return self.name

serializer.py

from django_celery_beat.models import PeriodicTask
from rest_framework import serializers

from projects.models import Projects
from tasksplan.models import TasksPlan


class PeriodicTaskSerializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = ('id', 'name', 'enabled', 'clocked_id', 'crontab_id', 'last_run_at')


class TasksPlanSerializer(serializers.ModelSerializer):
    periodic_task = PeriodicTaskSerializer()
    project_id = serializers.PrimaryKeyRelatedField(queryset=Projects.objects.all(), help_text="项目ID")
    schedule_id = serializers.PrimaryKeyRelatedField(queryset=PeriodicTask.objects.all(), help_text="定时任务ID")

    class Meta:
        model = TasksPlan
        fields = ('id', 'name', 'project_id', 'schedule_id', 'periodic_task')

views.py

from rest_framework.viewsets import ModelViewSet

from .models import TasksPlan
from .serializer import TasksPlanSerializer


class TasksPlanViewSet(ModelViewSet):
    queryset = TasksPlan.objects.all()
    serializer_class = TasksPlanSerializer

关键:projects模块中serializer.py的create方法创建

class ProjectsModelSerializer(serializers.ModelSerializer):
    class Meta:
        model = Projects
        # fields = ('interfaces_set', 'name')
        # 指定字段不进行序列化输出与输入
        exclude = ('update_time', 'is_delete')
        # extra_kwargs指定修改字段属性
        extra_kwargs = {
            'create_time': {
                'read_only': True
            }
        }

    def create(self, validated_data):
        project_obj = super().create(validated_data)

        # 默认创建DebugTalks
        DebugTalks.objects.create(project_id=project_obj.id)

        # 创建间隔定时器
        interval = IntervalSchedule.objects.create(every=0, period='days')

        # 创建控制定时器
        # crontab = CrontabSchedule.objects.create(month_of_year='*', day_of_month='*', day_of_week='*', hour='*',
        #                                          minute='*')

        # 创建定时任务
        schedule_obj01 = PeriodicTask.objects.create(name='interval' + validated_data['name'], interval=interval,
                                                     enabled=0, task='testplan.tasks.celery_test')
		# 注意⚠️:这里定时器(interval,crontab等)不可以同时创建,interval,crontab作为外键传的不是对应的id,而是对应的实例,如果要完全做到,截图的功能,需要做对应的判断
        # schedule_obj02 = PeriodicTask.objects.create(name='crontab' + validated_data['name'], crontab=crontab,
        # enabled=0, task='testplan.tasks.celery_test')

        # 创建测试计划
        TasksPlan.objects.create(name='interval' + validated_data['name'], project_id=project_obj.id,
                                 schedule_id=schedule_obj01.id)

        # TasksPlan.objects.create(name='crontab' + validated_data['name'], project_id=project_obj.id, schedule_id=schedule_obj02.id)
        return project_obj

版权声明:本文为weixin_43665351原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。