python -- Apscheduler + 定时任务

python定时任务的实现方法

使用apscheduler,
参考:
https://www.jianshu.com/p/b77d934cc252
https://www.jianshu.com/p/ad2c42245906

from apscheduler.schedulers.blocking import BlockingScheduler
    # BlockingScheduler:在进程中运行单个任务,
    # 调度器是唯一运行的东西

    # 1.创建scheduler调度
    scheduler = BlockingScheduler()
    # 采用阻塞的方式
    
    # 2.添加调度任务【关键】
    scheduler.add_job(jobs,trigger)
    
    # 3.运行调度
    scheduler.start()

apscheduler基础组件:调度器(scheduler)、触发器(tigger)、作业存储(job store)、执行器(executor)

  1. 调度器:任务调度,添加、修改、删除作业
  • BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程,适用于单个任务。
  • BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。
  • AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
  • GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使用。
  • TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
  • TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
  • QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。
  1. 【关键设置】触发器:调度任务被触发,不触发触发没有操作
  • data 触发器,作业任务只执行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建。

    参数:
    run_date (datetime 或 str):作业的运行日期或时间
    timezone (datetime.tzinfo 或 str):指定时区

# 1.run-data为str,在2019-12-09 00 运行jbos
scheduler.add_job(jobs,'date',run_data='2019-12-09 00:00:00',args=['text'])

# 2.run_data为datatime,在 2019-12-09 00:00:00 时刻运行一次 jobs 方法
scheduler.add_job(jobs, 'date', run_date=datetime(2019, 12, 9, 0, 0, 0), args=['text'])

# 3.run_data 为data, 在2019-12-9 运行一次jobs
scheduler.add_job(jobs, 'date', run_date=date(2019, 12, 9), args=['text'])

# 开始运行
scheduler.start()

  • interval 触发器

    参数:
    weeks (int):间隔几周

    days (int): 间隔几天

    hours (int):间隔几小时

    minutes (int):间隔几分钟

    seconds (int):间隔多少秒

    start_date (datetime 或 str):开始日期

    end_date (datetime 或 str):结束日期

    timezone (datetime.tzinfo 或str):时区

# 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
scheduler.start()
  • 【关键】cron 触发器:非常灵活

    参数:
    year (int 或 str):年,4位数字

    month (int 或 str):月 (范围1-12)

    day (int 或 str):日 (范围1-31

    week (int 或 str):周 (范围1-53)

    day_of_week (int 或 str): 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)

    hour (int 或 str):时 (范围0-23)

    minute (int 或 str):分 (范围0-59)

    second (int 或 str):秒 (范围0-59)

    start_date (datetime 或 str): 最早开始日期(包含)

    end_date (datetime 或 str): 最晚结束时间(包含)

    timezone (datetime.tzinfo 或str): 指定时区

    参数支持算数表达式:
    *:表示所有值(any)
    */a :从最小开始,触发每个a值(any)
    a-b: 触发a-b之间的数值,a必须小于b(any)
    a-b/c: 触发a-b之间以及c的数值(any)
    xth y: 在一个月内的第y个工作的,第x时间点
    last x:在一个月内的最有一个x工作日(day)
    last: 在一个月的最有一天(day)
    x,y,z: 在任意匹配的表达式上触发(any)

  1. 作业存储:
  • 添加job:add_job(), 另一种为scheduler_job()修饰器来修饰函数

    # scheduler_job() 使用实例
    import datetime
    from apscheduler.schedulers.background import BackgroundScheduler
    
    # 添加修饰器
    @scheduler.scheduled_job(job_func, 'interval', minutes=2)
    def job_func(text):
        print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
    
    scheduler = BackgroundScheduler()
    scheduler.start()
    
  • 移除job:remove_job() 和 job_remove()
    remove_job(),是根据job的id来移除,所以需要创建job是设置id

    job_remove(),直接对job 进行remove操作

    # 1.
    scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
    scheduler.remove_job(job_one)
    
    # 2.
    job = add_job(job_func, 'interval', minutes=2, id='job_one')
    job.remvoe()
    
  • 获取job列表

    scheduler.get_jobs() 获取当前调度器中所有jobs的列表
    
    
  • 修改job: Job.modify() 或者 modify_job() 来修改job,job 的id 是无法修改的

    scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
    scheduler.start()
    # 将触发时间间隔修改成 5分钟
    scheduler.modify_job('job_one', minutes=5)
    
    job = scheduler.add_job(job_func, 'interval', minutes=2)
    # 将触发时间间隔修改成 5分钟
    job.modify(minutes=5)
    
    
  • 关闭job:scheduler.shutdown() 默认在所有调度执行完之后,关闭程序,wait=False,无需等待直接关闭。scheduler.shutdown(wait=false)

  1. APScheduler提供了四种存储方式:
  • MemoryJobStore
  • sqlalchemy
  • mongodb
  • redis
  1. 执行器:负责作业的运行,作业运行完会通知调度器。

代码一:简洁版

    import time
    from pymongo import MongoClient
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.jobstores.mongodb import MongoDBJobStore
    
    def job():
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    if __name__ == '__main__':
        # mongodb存储job
        scheduler = BlockingScheduler()
        client = MongoClient(host='127.0.0.1', port=27017)
        store = MongoDBJobStore(collection='job', database='test', client=client)
        scheduler.add_jobstore(store)
        scheduler.add_job(job, 'interval', second=5)
        scheduler.start()

代码二

from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
 
 
def my_job():
    print 'hello world'
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
 
jobstores = {
    'mongo': MongoDBJobStore(collection='job', database='test', client=client),
    'default': MemoryJobStore()
}
executors = {
    'default': ThreadPoolExecutor(10),
    'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, 'interval', seconds=5)
 
try:
    scheduler.start()
except SystemExit:
    client.close() 

版权声明:本文为qq_33624802原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。