Django + Celery 实现异步查询数据库

借鉴:https://www.cnblogs.com/wdliu/p/9530219.html

背景:需要查询10张表中的数据,计划使用Celery,借鉴上述同学的博客,结果出现问题,所以自己写一个笔记,供下次使用

环境:python 3.6.4 、Django 2.0.4、 celery 5.0.3、redis 3.5.2

系统:win10 、 redis-server 3.2

一、配置:

taskproj
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── taskproj
│   ├── __init__.py
│   ├── settings.py

│   ├── celery.py
│   ├── urls.py
│   └── wsgi.py
└── templates

#taskproj/taskproj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

#taskproj/taskproj/__init__.py:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
import pymysql


__all__ = ['celery_app']
pymysql.install_as_MySQLdb()

#taskproj/app01/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import pymysql

def mysqldata(ip, zone, card, i):
    conn = pymysql.connect(host=ip, user="root", password="root",charset="utf8", port=3306, database="hltest")
    cur = conn.cursor()
    if zone:
        if card:
            zone = f"where zone = {zone}"
            card = f"and card = {card}"
        else:
            zone = f"where zone = {zone}"
            card = ""
    else:
        if card:
            card = f"where card = {card}"
            zone = ''
        else:
            card = ""
            zone = ""
        
    sql = f"select num, sex, name from info0{i} {zone} {card} ;"
    cur.execute(sql)
    results = cur.fetchall()
    data_list = [x for x in results]
    return data_list

@shared_task
def mysqldata0(ip, zone, card, 0):
    data_list = mysqldata(ip,zone,card,i)
    return data_list


@shared_task
def mysqldata1(ip, zone, card, 1):
    data_list = mysqldata(ip,zone,card,i)
    return data_list


@shared_task
def mysqldata2(ip, zone, card, 2):
    data_list = mysqldata(ip,zone,card,i)
    return data_list


@shared_task
def mysqldata3(ip, zone, card, 3):
    data_list = mysqldata(ip,zone,card,i)
    return data_list


@shared_task
def mysqldata4(ip, zone, card, 4):
    data_list = mysqldata(ip,zone,card,i)
    return data_list
    

#taskproj/app01/views.py

from django.http import HttpResponse
from app01.tasks import *
import json


def queryinfo(request):
    res = json.loads(request.body.decode())
    ip = res.get("ip")
    zone = res.get("zone")
    card = res.get("card")
    if not ip:
        ip = "127.0.0.1"
    data_list0 = mysqldata0.delay(ip, zone, card).get()
    data_list1 = mysqldata1.delay(ip, zone, card).get()
    data_list2 = mysqldata2.delay(ip, zone, card).get()
    data_list3 = mysqldata3.delay(ip, zone, card).get()
    data_list4 = mysqldata4.delay(ip, zone, card).get()
    data_list =  data_list0 +  data_list1 +  data_list2 +  data_list3 + data_list4
    return HttpResponce(json.dumps(data_list))

路由的配置不在赘述,自行配置:

二、启动:

python manage.py runserver    启动django

双击redis.exe(请自行下载)     启动redis

进入项目的taskproj目录启动worker:  启动celery

celery -A taskproj worker -l debug (执行该命令与manage.py同级下执行,taskproj是项目下的主目录名称,即:settings.py存在的父目录名称)

视图函数会触发任务启动。

报错:no module named redis

原因:redis 设置放在settings出现的问题,原因未知

原代码:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj')

app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

修改后:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

这样就解决了,感觉莫名其妙,有时间再去测试吧!


版权声明:本文为forevercui原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。