How to make your Python program perform tasks at regular intervals and at fixed points?

Usage of apscheduler

It is always unavoidable to use some scheduled tasks in our projects. For example, in a recent project, after the user clicks to register for the exam, he or she needs to push a small program message to the customer’s WeChat when the exam date is approaching. After looking through the implementation in fastapi, although There are many methods and packages, but they are either too heavy (for example, you need to open the service again, and you have to rely on redis, which is not easy to use). Although you can also use the time.sleep() of the time module to disguise the background tasks of fastapi on the machine. Implementation, but relatively simple functions are fine, but more complex codes are troublesome, so it is better to dedicate people to find a package responsible for this amount. After searching around, I found that APScheduler is quite suitable. The code is simple and the implementation effect is very good. I will make a record here!

Installation

pip install apscheduler

Main components

Conceptual things are familiar to everyone, and the code is easier to understand than these definitions.

Triggers contain scheduling logic. Each job has its own trigger, which is used to determine which job will run next. Apart from their own initial configuration, triggers are completely stateless. In human terms, you specify the way to trigger the current task.

The main dry goods include:

  • ① More than 200 Python e-books (and classic books) should be included
  • ② Python standard library information (the most complete Chinese version)
  • ③ Project source code (forty or fifty interesting and reliable practice projects and source code)
  • ④ Videos on basic introduction to Python, crawlers, web development, and big data analysis (suitable for beginners)
  • ⑤ Python learning roadmap (say goodbye to substandard learning)


The above-mentioned complete version of the complete set of Python learning materials has been uploaded to the CSDN official. If necessary, you can scan the CSDN official certification QR code below on WeChat to get it

[[CSDN gift package: “Python part-time resources & full set of learning materials” free sharing]](Safe link, click with confidence)





Type Explanation
DateTrigger Expiration execution (executed at x hour x minute x second on xxxx year x month Once)
CronTrigger A crontab type condition (this is more complex, for example, it is executed every 5 seconds from 4-5 o’clock from Monday to Thursday)

The job store stores scheduled jobs. The default job store simply saves the job in memory. Other job stores store the job in the database. A job’s data is serialized when saved to the persistent job store, and deserialized when loaded. Schedulers cannot share the same job store.

The Jobstore is initialized in the scheduler. In addition, the Jobstore can also be added dynamically through the scheduler's add_jobstore. Each jobstore
An alias will be bound. When the scheduler adds a job, it will find the corresponding jobstore in the scheduler according to the specified jobstore, and
Add the job to the jobstore.

Jobstore mainly uses the loads and dumps of the pickle library [the core implementation is rewritten through python's __getstate__ and __setstate__
Implementation], the job is dynamically saved to the storage every time it is changed, and then dynamically loaded when used. The storage can be redis or
It is a database [integrating multiple databases through the sqlarchemy library], or it can be mongodb, etc.
Jobstore currently supported by APScheduler:

MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore

Executors handle the running of a job, and they typically do so by submitting specified callable objects to a thread or pool within the job. When the job is completed, the executor will notify the scheduler.

- In human terms, it is used to wrap tasks when adding tasks. The type of executor will be selected according to different schedules. If AsyncIO is selected as the scheduling library, then AsyncIOExecutor is selected. If tornado is selected as the scheduling library, TornadoExecutor is selected. , if you choose to start the process as the scheduler, you can choose ThreadPoolExecutor or ProcessPoolExecutor. The choice of Executor needs to select different executors according to the actual scheduler.
Executors currently supported by APScheduler:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor

The scheduler is another component. You usually have only one scheduler in your application, and application developers usually do not deal directly with job storage, schedulers, and triggers. Instead, the scheduler provides appropriate interfaces for handling these. Configuring job storage and executors can be done in the scheduler, such as adding, modifying and removing jobs.

Scheduler is the core of APScheduler, and all related components are defined through it. After the scheduler is started, it will start scheduling according to the configured tasks.
In addition to the wake-up schedule based on the scheduled time generated by all triggers that define the Job. Scheduling will also be triggered when job information changes.

The scheduler can choose different components according to its own needs. If you use AsyncIO, choose AsyncIOScheduler. If you use tornado,
Select TornadoScheduler.
Schedulers currently supported by APScheduler:

AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler

Simple application

import time
from apscheduler.schedulers.blocking import BlockingScheduler #Introduce the background

def my_job():
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5)
sched.start()

Complete code

# trigeers triggers
# job stores job storage
# executors executors
# schedulers scheduler

from pytz import utc
from sqlalchemy import func

from apscheduler.schedulers.background import BackgroundScheduler,AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    # Multiple storage can be configured
    #'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore specifies the storage link
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20}, # The maximum number of worker threads is 20
    'processpool': ProcessPoolExecutor(max_workers=5) # The maximum number of worker processes is 5
}
job_defaults = {
    'coalesce': False, # Close the merging of new jobs when the job is delayed or not executed due to abnormal reasons.
    'max_instances': 3 # What is the default maximum instance for running new jobs concurrently?
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc as the time zone of the scheduler


import os
import time

def print_time(name):
    print(f'{name} - {time.ctime()}')

def add_job(job_id, func, args, seconds):
    """Add job"""
    print(f"Add interval execution task job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)

def add_coun_job(job_id, func, args, start_time):
    """Add job"""
    print(f"Add an execution task job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='date', timezone='Asia/Shanghai', run_date=start_time)
    # scheduler.add_job(func=print_time, trigger='date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args= ['text2'])

def remove_job(job_id):
    """Remove job"""
    scheduler.remove_job(job_id)
    print(f"Remove job - {job_id}")

def pause_job(job_id):
    """Stop job"""
    scheduler.pause_job(job_id)
    print(f"Stop job - {job_id}")

def resume_job(job_id):
    """Restore job"""
    scheduler.resume_job(job_id)
    print(f"Resume job - {job_id}")

def get_jobs():
    """Get all job information, including stopped ones"""
    res = scheduler.get_jobs()
    print(f"All jobs - {res}")

def print_jobs():
    print(f"Detailed job information")
    scheduler.print_jobs()

def start():
    """Start the scheduler"""
    scheduler.start()

def shutdown():
    """Close the scheduler"""
    scheduler.shutdown()

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    #start()
    # print('Press Ctrl + {0} to exit \\
'.format('Break' if os.name == 'nt' else 'C'))
    # add_job('job_A', func=print_time, args=("A", ), seconds=1)
    # add_job('job_B', func=print_time, args=("B", ), seconds=2)
    # time.sleep(6)
    # pause_job('job_A') # Stop a
    #get_jobs() #Get all jobs
    # time.sleep(6)
    # print_jobs()
    # resume_job('job_A')
    # time.sleep(6)
    # remove_job('job_A')
    # time.sleep(6)
    from datetime import datetime
    import pytz
    start()
    
    date_temp = datetime(2022, 2, 19, 17, 30, 5)
    # scheduler.add_job(print_time, 'date', run_date=datetime.now(pytz.timezone('America/Manaus')), args=['text'])
    # scheduler.add_job(print_time, 'date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2 '])
    add_coun_job(job_id="job_C",func=print_time,args=('One-time execution task',),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone())
    time.sleep(130)
    try:
        shutdown()
    except RuntimeError:
        pass