借鉴:https://www.cnblogs.com/wdliu/p/9530219.html
背景:需要查询10张表中的数据,计划使用Celery,借鉴上述同学的博客,结果出现问题,所以自己写一个笔记,供下次使用
环境:python 3.6.4 、Django 2.0.4、 celery 5.0.3、redis 3.5.2
系统:win10 、 redis-server 3.2
一、配置:
taskproj
├── app01
│ ├── __init__.py
│ ├── apps.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tasks.py
│ └── views.py
├── manage.py
├── taskproj
│ ├── __init__.py
│ ├── settings.py
│ ├── celery.py
│ ├── urls.py
│ └── wsgi.py
└── templates
#taskproj/taskproj/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings') # 设置django环境
app = Celery('taskproj', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')
app.autodiscover_tasks() # 发现任务文件每个app下的task.py#taskproj/taskproj/__init__.py:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
import pymysql
__all__ = ['celery_app']
pymysql.install_as_MySQLdb()#taskproj/app01/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import pymysql
def mysqldata(ip, zone, card, i):
conn = pymysql.connect(host=ip, user="root", password="root",charset="utf8", port=3306, database="hltest")
cur = conn.cursor()
if zone:
if card:
zone = f"where zone = {zone}"
card = f"and card = {card}"
else:
zone = f"where zone = {zone}"
card = ""
else:
if card:
card = f"where card = {card}"
zone = ''
else:
card = ""
zone = ""
sql = f"select num, sex, name from info0{i} {zone} {card} ;"
cur.execute(sql)
results = cur.fetchall()
data_list = [x for x in results]
return data_list
@shared_task
def mysqldata0(ip, zone, card, 0):
data_list = mysqldata(ip,zone,card,i)
return data_list
@shared_task
def mysqldata1(ip, zone, card, 1):
data_list = mysqldata(ip,zone,card,i)
return data_list
@shared_task
def mysqldata2(ip, zone, card, 2):
data_list = mysqldata(ip,zone,card,i)
return data_list
@shared_task
def mysqldata3(ip, zone, card, 3):
data_list = mysqldata(ip,zone,card,i)
return data_list
@shared_task
def mysqldata4(ip, zone, card, 4):
data_list = mysqldata(ip,zone,card,i)
return data_list
#taskproj/app01/views.py
from django.http import HttpResponse
from app01.tasks import *
import json
def queryinfo(request):
res = json.loads(request.body.decode())
ip = res.get("ip")
zone = res.get("zone")
card = res.get("card")
if not ip:
ip = "127.0.0.1"
data_list0 = mysqldata0.delay(ip, zone, card).get()
data_list1 = mysqldata1.delay(ip, zone, card).get()
data_list2 = mysqldata2.delay(ip, zone, card).get()
data_list3 = mysqldata3.delay(ip, zone, card).get()
data_list4 = mysqldata4.delay(ip, zone, card).get()
data_list = data_list0 + data_list1 + data_list2 + data_list3 + data_list4
return HttpResponce(json.dumps(data_list))路由的配置不在赘述,自行配置:
二、启动:
python manage.py runserver 启动django
双击redis.exe(请自行下载) 启动redis
进入项目的taskproj目录启动worker: 启动celery
celery -A taskproj worker -l debug (执行该命令与manage.py同级下执行,taskproj是项目下的主目录名称,即:settings.py存在的父目录名称)
视图函数会触发任务启动。
报错:no module named redis
原因:redis 设置放在settings出现的问题,原因未知
原代码:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings') # 设置django环境
app = Celery('taskproj')
app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作为前缀,在settings中写配置
app.autodiscover_tasks() # 发现任务文件每个app下的task.py修改后:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings') # 设置django环境
app = Celery('taskproj', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')
app.autodiscover_tasks() # 发现任务文件每个app下的task.py这样就解决了,感觉莫名其妙,有时间再去测试吧!
版权声明:本文为forevercui原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。