celery 停止任务_Celery异步消息队列的使用

1、准备工作

1.1 流程图

192cc7f8c3a3f49bfc94cf516b9d249a.png

2、环境安装

2.1、在Ubuntu中需要安装redis

安装redis$sudo apt-get update$sudo apt-get install redis-server启动redis$redis-server连接redis$redis-cli $redis-cli -h ip -a 6379 安装Python操作redis的包pip install redis 重启redissudo service reids restart 安装redis 

redis默认绑定的ip为127.0.0.1其他电脑无法访问Ubuntu的redis

f69d48def01846053b1f749793d9a4a7.png

重启redis服务 service redis restart

查看绑定端口

ab5cf1c1b80b47e2f7a80b867742ae45.png

在wind上telnet ip 6379 成功说明成功

2.2、安装celery

pip install celery

2、开始使用celery

2、1基本应用

在/home/zbwu103/celery 文件中创建一个tasks.py的任务文件

#task.py from celery import Celery app = Celery('tasks',             broker='redis://192.168.1.111',             backend='redis://192.168.1.111'             #redis://密码@ip) @app.taskdef add(x,y):    print("running...",x,y)    return x+y        View Code

在home/zbwu103/celery的目录启动监听任务

#打印日志的模式运行celery -A tasks worker --loglevel=info

在开一个终端,到/home/zbwu103/celery用Python进入命令行运行

from tasks import add t = add.delay(4,5)#t.result.ready() 查看任务是否完成,完成返回True,未完成返回False#t.get()  返回完成之后的结果#t.task_id  返回任务的唯一ID号,可以通过ID查询到任务

上面任务都是在终端上运行,如果终端关闭tasks也会终止。

所以需要任务在后台运行

c05f3089275ac45cedacd78ad5b6c1a8.png
celery multi stop w1 停止 w1

2.2 、在项目中如何使用celery

85f5bd0b29be10a93218636723b61163.png
356596c44aca67480f8dc7124df0152d.png
from __future__ import absolute_import, unicode_literalsfrom celery import Celeryapp = Celery('my_proj',             broker='redis://192.168.1.111',             backend='redis://192.168.1.111',             include=['myp_roj.tasks'])app.conf.update(    result_expires=3600,)if __name__ == '__main__':    app.start()celery
from __future__ import absolute_import, unicode_literalsimport subprocessfrom .celery import app@app.taskdef add(x,y):    return x+y@app.taskdef run_cmd(cmd):    obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)    return obj.stdout.read().decode('utf-8')tasks.py

在my_proj同级目录启动

865c87dab72c8e281d8fc4b2313a5454.png

查看任务启动情况

ps -ef |grep celery

2.3 、celery 定时任务

celery使用beat来执行celert beat 来实现定时任务

worker定时任务

from celery import Celeryfrom celery.schedules import crontabapp = Celery('task',             broker='redis://192.168.1.111',             backend='redis://192.168.1.111')@app.on_after_configure.connectdef setup_periodic_tasks(sender, **kwargs):    # Calls test('hello') every 10 seconds.    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')    # Calls test('world') every 30 seconds    sender.add_periodic_task(30.0, test.s('world'), expires=10)    # Executes every Monday morning at 7:30 a.m.    sender.add_periodic_task(        crontab(hour=21, minute=26, day_of_week='Sum'),        test.s('Happy Mondays!'),    )@app.taskdef test(arg):    print('runing test.....')    print(arg)periodic_task.py

启动定时任务

celery -A periodic_task worker

另外开一个任务调度区不断的检测你的任务计划

celery -A periodic_task beat

2.4、celery和django配置一起使用

在setting同级的目录中新建一个celery.py的文件配置celery基本的配置

194faf7b938ab587a4c0f302e91b2a3d.png
from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryTest.settings')app = Celery('CeleryTest')# Using a string here means the worker don't have to serialize# the configuration object to child processes.# - namespace='CELERY' means all celery-related configuration keys#   should have a `CELERY_` prefix.app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()@app.task(bind=True)def debug_task(self):    print('Request: {0!r}'.format(self.request))celery.py

在setting.py同级的目录配置__init__.py

from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when# Django starts so that shared_task will use this app.from .celery import app as celery_app __all__ = ['celery_app']__init__.py

在APP的目录里面新建一个tasks.py的任务来填写任务

#app01/tasks.py# Create your tasks herefrom __future__ import absolute_import, unicode_literalsfrom celery import shared_taskimport time@shared_taskdef add(x, y):    print("running task add,我是windows ")    time.sleep(1)    return x + y@shared_taskdef mul(x, y):    return x * y@shared_taskdef xsum(numbers):    return sum(numbers)tasks.py

从views中调用任务

/app01/view.pyfrom django.shortcuts import render,HttpResponsefrom  app01 import tasksfrom celery.result import AsyncResultdef index(request):    res = tasks.add.delay(5,999)    print("res:",res)    print(res.status)    # import pdb    # pdb.set_trace()    return HttpResponse(res.task_id)def task_res(request):    #通过ID获取结果    result = AsyncResult(id="be4933c0-ed9b-4a04-ade8-79f4c57cfc74")    #return HttpResponse(result.get())    return HttpResponse(result.status)views.py

版权声明:本文为weixin_42168341原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。