目录
1-2-1 消息中间件(message broker) - 将任务提交至任务队列
1-2-2 任务执行单元(worker)- 负责将任务从任务队列中取出后的执行
1-2-3 任务执行结果存储(task result store)
零、学习资料
一、简单认识
Celery - 一个处理大量消息的分布式系统,专注于实时处理任务队列,并支持任务调度。
1-1 特性总结
- 简单
- Celery 易于使用和维护,并且它 不需要配置文件 。
Celery 有一个活跃、友好的社区来让你寻求帮助,包括一个 邮件列表 和一个 IRC频道。- 高可用
- 倘若连接丢失或失败,职程和客户端会自动重试,并且一些中间人通过 主/主 或 主/从 方式复制来提高可用性。
- 快速
- 单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级(使用 RabbitMQ、py-librabbitmq 和优化过的设置)。
- 灵活
- Celery 几乎所有部分都可以扩展或单独使用。可以自制连接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 中间人传输或更多。
1-2 架构总结
三大组成部分:消息中间件(message broker)、任务执行单元(worker)、任务执行结果存储(task result store)
1-2-1 消息中间件(message broker) - 将任务提交至任务队列
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
celery通过使用Broker在客户端和职程之间调度,这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。
1-2-2 任务执行单元(worker)- 负责将任务从任务队列中取出后的执行
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
!!注意:同一时刻,可以启用多个worker进行执行任务,只要启动worker,只要broker提交了任务,自动执行。
1-2-3 任务执行结果存储(task result store)
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
1-3 功能总结
1-4 框架集成
常用框架 框架内celery依赖包 Django django-celery Pyramid pyramid_celery Pylons celery-pylons Flask 不需要 web2py web2py-celery Tornado tornado-celery 1-5 版本支持情况
Celery version 4.0 runs on
Python ❨2.7, 3.4, 3.5❩
PyPy ❨5.4, 5.5❩
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform. - 官方表示不支持windows系统,即在win下使用broker需要做额外操作。
二、基于Windows系统的简单操作
2-0 安装
2-1 简单应用实现
from celery import Celery # 创建Celery对象 app = Celery('hello', broker='amqp://guest@localhost//') @app.task def hello(): return 'hello world'
三、Celery实现异步任务 - 单任务结构
3-0 常见的windows下的启动错误及解决方式
- 注意 - 启动方式 - celery worker -A 任务文件名 -l info
- 通常woker的启动命令:celery worker -A test -l info
- windows下的启动命令:celery worker -A test -l info -P eventlet
3-1 实现异步任务提交和执行
import celery # 服务器使用密码 # backend = 'redis://:123456@127.0.0.1:6379/1' # broker = 'redis://:123456@127.0.0.1:6379/2' # 服务器未使用密码 - redis为例 broker = 'redis://127.0.0.1:6379/1' # 将redis的db1作为消息中间人 backend = 'redis://127.0.0.1:6379/2' # 将redis的db2作为数据存储器 # 第一个参数默认为cel对象别名 cel = celery.Celery('test', backend=backend, broker=broker) # 创建相关函数 @cel.task def add(x, y): return x + y # 通常下列操作单独提出为另一文件 if __name__ == '__main__': # 执行函数,将函数添加到任务队列,一旦添加,自动获取执行。result返回对象而非函数执行结果 result = add.delay(1, 2) print(result.id)
3-2 实现文件代替celery worker命令
# 导入新建对象 from test import cel_app if __name__ == '__main__': '''方法一''' # worker = worker.worker(cel_app) # options = { # 'loglevel': 'INFO', # 'pool_cls': 'eventlet' # } # worker.run(**options) '''方法二''' # celery worker -A extensions -l info -P eventlet cel_app.worker_main(argv=['worker', '-l=info', '-P=eventlet'])
3-3 使用相关方法、属性,查看任务执行结果
from celery.result import AsyncResult from test import cel # 指定查询对象,id为task_id 93d0890f-488a-4ed5-bafc-b86df81d23a2,app指定对象,需要导入 async = AsyncResult(id="93d0890f-488a-4ed5-bafc-b86df81d23a2", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
四、异步任务提交及执行 - 多任务结构
pro_cel ├── celery_task# celery相关文件夹 │ ├── celery.py # celery连接和配置相关文件,必须叫这个名字 │ └── tasks1.py # 所有任务函数1 │ └── tasks2.py # 所有任务函数2 ├── check_result.py # 检查结果,查询任务执行状态 └── send_task.py # 触发任务 - 将任务函数添加到队列中
'''celery.py''' from celery import Celery cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类 include=['celery_task.task1', 'celery_task.task2' ]) # 时区 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False '''tasks1.py''' import time from celery_task.celery import cel @cel.task def test_celery(res): time.sleep(5) return "test_celery任务结果:%s"%res '''tasks2.py''' import time from celery_task.celery import cel @cel.task def test_celery2(res): time.sleep(5) return "test_celery2任务结果:%s"%res ''' send_task.py !注意开启worker! 执行send_task.py,添加任务进执行队列 开启work:celery worker -A celery_task -l info -P eventlet 检查任务执行结果,执行check_result.py ''' from celery_task.task1 import test_celery from celery_task.task2 import test_celery2 # 立即告知celery去执行test_celery任务,并传入一个参数 result = test_celery.delay('第一个的执行') print(result.id) result = test_celery2.delay('第二个的执行') print(result.id)
五、实现定时任务 - apply_async的使用
注意!!!需要使用 beat + worker
启动一个beat:celery beat -A 文件名或者包名 -l info 启动work执行:celery worker -A 文件名或者包名 -l info -P eventlet
5-1 单任务下的实现方式
5-1-1 方式一、指定时间点执行任务
from celery_app_task import add from datetime import datetime # 指定年,月,日,时,分,秒 v1 = datetime(2019, 2, 14, 18, 19, 56) print(v1) # 转换成utc格式 v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) result = add.apply_async(args=[1, 3], eta=v2) print(result.id)
5-1-2 方式二、指定当前时间的多少时间后执行任务
from celery_app_task import add from datetime import datetime ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta # 计算十秒后的时间 time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async并设定时间 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)
5-2 多任务结构下的实现方式
''' celery.py文件的修改 ''' from datetime import timedelta from celery import Celery from celery.schedules import crontab cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_task.task1', 'celery_task.task2', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = False cel.conf.beat_schedule = { # 名字随意命名 - add-every-10-seconds 'add-every-10-seconds': { # 执行task1下的test_celery函数 'task': 'celery_task.task1.test_celery', # 每隔2秒执行一次 # 'schedule': 2.0, # 'schedule': crontab(minute="*/2"), 'schedule': timedelta(seconds=2), # 传递参数给执行函数 'args': ('test',) }, # 'add-every-12-seconds': { # 'task': 'celery_task.task1.test_celery', # 每年4月11号,8点42分执行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) # }, }
六、基于Django使用Celery
注意:依赖django-celery模块
6-0 settings.py文件内的注册
INSTALLED_APPS = [ ... 'djcelery', 'app01' ] ... # 导入配置文件 from djagocele import celeryconfig # 指定服务器信息 BROKER_BACKEND='redis' BROKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
6-1 django项目内的celery配置文件
''' 项目目录下新建文件 celeryconfig.py ''' import djcelery djcelery.setup_loader() CELERY_IMPORTS=( 'app01.tasks', ) #有些情况可以防止死锁 CELERYD_FORCE_EXECV=True # 设置并发worker数量 CELERYD_CONCURRENCY=4 #允许重试 CELERY_ACKS_LATE=True # 每个worker最多执行100个任务被销毁,可以防止内存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超时时间 CELERYD_TASK_TIME_LIMIT=12*30
6-2 django内部实现函数
''' app01下的tasks.py,即celery的逻辑任务函数 ''' from celery import task @task def add(a,b): with open('a.text', 'a', encoding='utf-8') as f: f.write('a') print(a+b) ''' 视图函数views.py ''' from django.shortcuts import render,HttpResponse from app01.tasks import add from datetime import datetime def test(request): # result=add.delay(2,3) # 加入定时执行的实现 ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse('ok')