如何让apscheduler 定时任务再系统断电重启后恢复之前的进度

如何让apscheduler 定时任务再系统断电重启后恢复之前的进度

  • task_core.py
# coding utf-8
from apscheduler.schedulers.background import BackgroundScheduler
from db_pool import DATABASE_URL
#创建调度器:BlockingScheduler
scheduler = BackgroundScheduler({
        'apscheduler.jobstores.default': {
            'type': 'sqlalchemy',
            'url': DATABASE_URL
        },
        'apscheduler.executors.default': {
            'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
            'max_workers': '20'
        },
        'apscheduler.executors.processpool': {
            'type': 'processpool',
            'max_workers': '5'
        },
        'apscheduler.job_defaults.coalesce': 'false',   # job 合并
        'apscheduler.job_defaults.max_instances': '1',  # 并发执行实例数量
        'apscheduler.timezone': 'Asia/Shanghai',
    })


def dojob():
    scheduler.start()

    scheduler.add_job(
        # 在每天23时56分执行客流数据归档
        func="app.task.task_timer:task1", # 目标任务函数
        trigger='cron', 
        hour=23,
        minute=56,
        id='task1',
        replace_existing=True
    )

    scheduler.add_job(
        # 在每天整点进行一次记录
        func="app.task.task_timer:task2",   # 目标任务函数
        trigger='cron', 
        hour='0-23', 
        id='task2',
        replace_existing=True
    )

    scheduler.add_job(
        # 每天1时执行一次
        func="app.task.task_timer:task3",
        trigger='cron',
        hour='1',
        id='task3',
        replace_existing=True
    )

    scheduler.add_job(
        func="app.task.task_timer:task4", # 目标任务函数
        trigger='cron', 
        hour=2,
        minute=41,
        id='task4',
        replace_existing=True
    )
  • 连接数据库 mysql
  • db_pool.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# engine = create_engine('sqlite:///data/apschedulertest',
#                    connect_args={'check_same_thread':False},
#                   poolclass=StaticPool)
MYSQL_USER = 'root'
MYSQL_PASS = 'root'
MYSQL_HOST =  "127.0.0.1"
MYSQL_PORT = '3308'
MYSQL_DB = 'apschedulertest'

DATABASE_URL = 'mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8'%(
	MYSQL_USER, MYSQL_PASS, MYSQL_HOST, MYSQL_PORT, MYSQL_DB)

engine = create_engine(
    DATABASE_URL,
    max_overflow=500,  # 超过连接池大小外最多创建的连接
    pool_size=50,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=36000,  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    # echo=True,  # orm 查询debug
)
SessionLocal = sessionmaker(bind=engine)

Base = declarative_base()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


版权声明:本文为qq_37721213原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。