Celery - 基本学习、异步任务的提交和执行、定时任务

目录

零、学习资料

一、简单认识

1-1 特性总结

1-2 架构总结

1-2-1 消息中间件(message broker) - 将任务提交至任务队列

1-2-2 任务执行单元(worker)- 负责将任务从任务队列中取出后的执行

1-2-3 任务执行结果存储(task result store)

1-3 功能总结

1-4 框架集成

1-5 版本支持情况

二、基于Windows系统的简单操作

2-0 安装

2-1 简单应用实现

三、Celery实现异步任务 - 单任务结构

3-0 常见的windows下的启动错误及解决方式

3-1 实现异步任务提交和执行

3-2 实现文件代替celery worker命令

3-3 使用相关方法、属性,查看任务执行结果

四、异步任务提交及执行 - 多任务结构

五、实现定时任务 - apply_async的使用

5-1 单任务下的实现方式

5-1-1 方式一、指定时间点执行任务

5-1-2 方式二、指定当前时间的多少时间后执行任务

5-2 多任务结构下的实现方式

六、基于Django使用Celery

6-0 settings.py文件内的注册

6-1 django项目内的celery配置文件

6-2 django内部实现函数


零、学习资料

官方文档 - 全英文

官方文档 - 部分中译

一、简单认识

Celery - 一个处理大量消息的分布式系统,专注于实时处理任务队列,并支持任务调度。

1-1 特性总结

  • 简单
    • Celery 易于使用和维护,并且它 不需要配置文件 。
      Celery 有一个活跃、友好的社区来让你寻求帮助,包括一个 邮件列表 和一个 IRC频道
  • 高可用
    • 倘若连接丢失或失败,职程和客户端会自动重试,并且一些中间人通过 主/主 或 主/从 方式复制来提高可用性。
  • 快速
    • 单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级(使用 RabbitMQ、py-librabbitmq 和优化过的设置)。
  • 灵活
    • Celery 几乎所有部分都可以扩展或单独使用。可以自制连接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 中间人传输或更多。

1-2 架构总结

三大组成部分:消息中间件(message broker)、任务执行单元(worker)、任务执行结果存储(task result store)

1-2-1 消息中间件(message broker) - 将任务提交至任务队列

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

celery通过使用Broker在客户端和职程之间调度,这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。

1-2-2 任务执行单元(worker)- 负责将任务从任务队列中取出后的执行

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

!!注意:同一时刻,可以启用多个worker进行执行任务,只要启动worker,只要broker提交了任务,自动执行。

1-2-3 任务执行结果存储(task result store)

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

1-3 功能总结

1-4 框架集成

常用框架框架内celery依赖包
Djangodjango-celery
Pyramidpyramid_celery
Pylonscelery-pylons
Flask不需要
web2pyweb2py-celery
Tornadotornado-celery

1-5 版本支持情况

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform. - 官方表示不支持windows系统,即在win下使用broker需要做额外操作。

二、基于Windows系统的简单操作

2-0 安装

2-1 简单应用实现

from celery import Celery

# 创建Celery对象
app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def hello():
    return 'hello world'

三、Celery实现异步任务 - 单任务结构

3-0 常见的windows下的启动错误及解决方式

  • 注意 - 启动方式 - celery worker -A 任务文件名 -l info
    • 通常woker的启动命令:celery worker -A test -l info
    • windows下的启动命令:celery worker -A test -l info -P eventlet

3-1 实现异步任务提交和执行

import celery

# 服务器使用密码
# backend = 'redis://:123456@127.0.0.1:6379/1'
# broker = 'redis://:123456@127.0.0.1:6379/2'
# 服务器未使用密码 - redis为例
broker = 'redis://127.0.0.1:6379/1'  # 将redis的db1作为消息中间人
backend = 'redis://127.0.0.1:6379/2'  # 将redis的db2作为数据存储器

# 第一个参数默认为cel对象别名
cel = celery.Celery('test', backend=backend, broker=broker)

# 创建相关函数
@cel.task
def add(x, y):
    return x + y


# 通常下列操作单独提出为另一文件
if __name__ == '__main__':
    # 执行函数,将函数添加到任务队列,一旦添加,自动获取执行。result返回对象而非函数执行结果
    result = add.delay(1, 2)
    print(result.id)

3-2 实现文件代替celery worker命令

# 导入新建对象
from test import cel_app

if __name__ == '__main__':
    '''方法一'''
    # worker = worker.worker(cel_app)
    # options = {
    #     'loglevel': 'INFO',
    #     'pool_cls': 'eventlet'
    # }
    # worker.run(**options)

    '''方法二'''
    # celery worker -A extensions -l info -P eventlet
    cel_app.worker_main(argv=['worker', '-l=info', '-P=eventlet'])

3-3 使用相关方法、属性,查看任务执行结果

from celery.result import AsyncResult
from test import cel

# 指定查询对象,id为task_id 93d0890f-488a-4ed5-bafc-b86df81d23a2,app指定对象,需要导入
async = AsyncResult(id="93d0890f-488a-4ed5-bafc-b86df81d23a2", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')

四、异步任务提交及执行 - 多任务结构

pro_cel
    ├── celery_task# celery相关文件夹
    │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
    │   └── tasks1.py    #  所有任务函数1
    │	└── tasks2.py    #  所有任务函数2
    ├── check_result.py # 检查结果,查询任务执行状态
    └── send_task.py    # 触发任务 - 将任务函数添加到队列中
'''celery.py'''
from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=['celery_task.task1',
                      'celery_task.task2'
                      ])

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

'''tasks1.py'''
import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任务结果:%s"%res

'''tasks2.py'''
import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任务结果:%s"%res

'''
send_task.py
!注意开启worker!

执行send_task.py,添加任务进执行队列
开启work:celery worker -A celery_task -l info  -P  eventlet
检查任务执行结果,执行check_result.py
'''
from celery_task.task1 import test_celery
from celery_task.task2 import test_celery2

# 立即告知celery去执行test_celery任务,并传入一个参数
result = test_celery.delay('第一个的执行')
print(result.id)
result = test_celery2.delay('第二个的执行')
print(result.id)

 

五、实现定时任务 - apply_async的使用

注意!!!需要使用 beat + worker

启动一个beat:celery beat -A 文件名或者包名 -l info
启动work执行:celery worker -A 文件名或者包名 -l info -P eventlet

5-1 单任务下的实现方式

5-1-1 方式一、指定时间点执行任务

from celery_app_task import add
from datetime import datetime

# 指定年,月,日,时,分,秒
v1 = datetime(2019, 2, 14, 18, 19, 56)
print(v1)

# 转换成utc格式
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)

result = add.apply_async(args=[1, 3], eta=v2)
print(result.id)

5-1-2 方式二、指定当前时间的多少时间后执行任务

from celery_app_task import add
from datetime import datetime

ctime = datetime.now()

# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())

from datetime import timedelta
# 计算十秒后的时间
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

5-2 多任务结构下的实现方式

'''
celery.py文件的修改
'''
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_task.task1',
    'celery_task.task2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名 - add-every-10-seconds
    'add-every-10-seconds': {
        # 执行task1下的test_celery函数
        'task': 'celery_task.task1.test_celery',
        # 每隔2秒执行一次
        # 'schedule': 2.0,
        # 'schedule': crontab(minute="*/2"),
        'schedule': timedelta(seconds=2),
        # 传递参数给执行函数
        'args': ('test',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.task1.test_celery',
    #     每年4月11号,8点42分执行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}

 

六、基于Django使用Celery

注意:依赖django-celery模块

6-0 settings.py文件内的注册

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...
# 导入配置文件
from djagocele import celeryconfig
# 指定服务器信息
BROKER_BACKEND='redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

6-1 django项目内的celery配置文件

'''
项目目录下新建文件 celeryconfig.py
'''
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

6-2 django内部实现函数

'''
app01下的tasks.py,即celery的逻辑任务函数
'''
from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

'''
视图函数views.py
'''
from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime

def test(request):
    # result=add.delay(2,3)

    # 加入定时执行的实现
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)

    print(result.id)
    return HttpResponse('ok')

 

 


版权声明:本文为qq_33961117原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。