The django-apschedule scheduled task stops abnormally

Background

In the Django project, django-apschedule is used to implement scheduled tasks. The BackgroundScheduler scheduling class is used. The scheduling is implemented by executing scheduled tasks through background threads. The tasks are all persisted to the database.

During the running of the project, due to an exception in the database, the scheduled task thread terminated abnormally. Even if the database subsequently returned to normal, execution would no longer continue. I tried to reproduce the problem many times without success. When the scheduled task was enabled, I manually disconnected the database connection and the scheduled task failed to execute. Then I established a connection to the database and the scheduled task was restored. This made me confused for a moment.

The specific error log is as follows. Through analysis, it is found that the update_job connected to the database abnormally without any capture mechanism, and then thrown online layer by layer, eventually causing the thread to stop. It is certain that the scheduled task failed due to the failure of the database connection. Then Why can’t it be reproduced?

Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/python3/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/blocking.py", line 32, in _main_loop
    wait_seconds = self._process_jobs()
  File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 1009, in _process_jobs
    jobstore.update_job(job)
  File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/util.py", line 105, in func_wrapper
    result = func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/jobstores.py", line 249, in update_job
    with transaction.atomic():
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/transaction.py", line 189, in __enter__
    if not connection.get_autocommit():
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 389, in get_autocommit
    self.ensure_connection()
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
     return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
    self.connect()
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/utils.py", line 90, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
    self.connect()
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
    return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 200, in connect
    self.connection = self.get_new_connection(conn_params)
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
    return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 187, in get_new_connection
    connection = Database.connect(**conn_params)
  File "/usr/local/python3/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
django.db.utils.OperationalError: connection to server at "xxxx.postgresql.svc.cluster.local" (xx.xx.xx.xx), port xxxx failed: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

Reasons for source code analysis

You can first look at the implementation of BackgroundScheduler and create a child thread in the start method.

class BackgroundScheduler(BlockingScheduler):

    _thread = None

    def start(self, *args, **kwargs):
        if self._event is None or self._event.is_set():
            self._event = Event()

        BaseScheduler.start(self, *args, **kwargs)
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        self._thread.daemon = self._daemon
        self._thread.start()

    def shutdown(self, *args, **kwargs):
        super(BackgroundScheduler, self).shutdown(*args, **kwargs)
        self._thread.join()
        del self._thread

Among them, _main_loop is implemented in BlockingScheduler. It is an infinite loop and executes the _process_jobs method.

class BlockingScheduler(BaseScheduler):
    
    ...

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            self._event.wait(wait_seconds)
            self._event.clear()
            wait_seconds = self._process_jobs()
    
    ...

Let’s look at the content in _process_jobs. It is implemented in BaseScheduler. The main process is as follows. First, find all the jobs to be executed, then traverse and run them and update the status of the jobs. The previous error log, that is, the update_job here throws an exception, but the exception is not caught here. Finally, it is thrown upward layer by layer, update_job -> _process_jobs -> _main_loop, and finally the thread terminates abnormally.

def _process_jobs(self):
    for jobstore_alias, jobstore in six.iteritems(self._jobstores):
        try:
            due_jobs = jobstore.get_due_jobs(now)
        except Exception as e:
            ...
            continue

        ...
                
        for job in due_jobs:
      
            ...
            
            try:
                executor.submit_job(job, run_times)
            except BaseException:
                ...

            ...
            jobstore.update_job(job)

So why can’t it be reproduced? This is because when the database connection is closed, the program may not run exactly in update_job. You can see that the previous get_due_jobs has exception capture. If a database connection exception is thrown here It can be captured, then skip the subsequent operations and wait for the execution of the next scheduled task. If it still fails, wait again, so the exception here will not be thrown to the top layer and cause the thread to stop.

But if at a certain time, the above connection to the database is successful, and an exception is thrown here in update_job, it will cause the entire thread to stop and the scheduled task will no longer be executed.

So how to solve this problem?

Build demo

First, we build a demo to simulate and reproduce the problem.

  1. Create django project
django-admin startproject apschedule_demo

python manage.py startapp demo

python manage.py makemigrations

python manage.py migrate
  1. Configure good database information in settings.py
DATABASES = {<!-- -->
    "default": {<!-- -->
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "apschedule_demo",
        "HOST": "xxxx",
        "PORT": 5432,
        "USER": "xxx",
        "PASSWORD": "xxx"
    }
}

  1. Build a demo based on the official documentation provided by django-apschedule

Add the APP in settings.py

INSTALLED_APPS = (
    #...
    "django_apscheduler",
)

Create the directory demo/management/commands and create the runapscheduler.py file below it. The code content is as follows:

import logging

from django.conf import settings

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore

logger = logging.getLogger(__name__)


def my_job():
  # Your job processing logic here...
  print("job..")


class Command(BaseCommand):
  help = "Runs APScheduler."

  def handle(self, *args, **options):
    scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
    scheduler.add_jobstore(DjangoJobStore(), "default")

    scheduler.add_job(
      my_job,
      trigger=CronTrigger(second="*/3"), # Every 3 seconds
      id="my_job", # The `id` assigned to each job MUST be unique
      max_instances=1,
      replace_existing=True,
    )
    logger.info("Added job 'my_job'.")

    try:
      logger.info("Starting scheduler...")
      scheduler.start()

    # Because the above is a non-blocking timed task, it needs to be blocked here to prevent the main thread from ending.
    while True:
            time.sleep(10)
    except KeyboardInterrupt:
      logger.info("Stopping scheduler...")
      scheduler.shutdown()
      logger.info("Scheduler shut down successfully!")

You can run the scheduled task by executing the above command through python manage.py runapscheduler. This script creates a task that is executed every 3 seconds.

  1. recurrent

We set a breakpoint on jobstore.update_job(job), and then use debug mode for debugging. When the program runs to the breakpoint, close the database, and then the program continues to run, an error will be reported. And an exception is thrown, and the thread stops running. At this point, we have reproduced the problem.

Thread restart

At first I thought, I can determine whether the thread is abnormal, and if it is abnormal, just restart the thread.

 while True:
        if not scheduler._thread.is_alive():
            scheduler._thread.start()

        time.sleep(10)

But contrary to expectations, an exception was thrown. The exception information is as follows:

RuntimeError: threads can only be started once

By checking the official documentation, you can know that the start method of a thread can only be called once.

listener

Apschedule provides a listener mechanism, that is, the success or failure of scheduled tasks can be called back through the listener method registered in advance. However, by analyzing the source code, it cannot capture the exception of the scheduled task thread.

The following is the principle process of listeners that simplifies the code:

  1. Externally register the callback method through the add_listener method
  2. Each event that occurs in the main process of the scheduled task thread _process_jobs is added to events
  3. Traverse the events, and then match it with the registered callback method mask. If it matches, the callback method is called.
class BaseScheduler:
    def __init__(...):
        self._listeners = []

    def add_listener(self, callback, mask=EVENT_ALL):
        self._listeners.append((callback, mask))

    def _process_jobs(self):

        events = []
        
        ...

        events.append(event)
 
        ...


        for events in events:
            self._dispatch_event(event)


    def _dispatch_event(self, event):
        for cb, mask in listeners:
            if event.code & mask:
                try:
                    cb(event)
                except BaseException:
                    self._logger.exception('Error notifying listener')

If the thread itself hangs, the callback method is not executable.

Capture exceptions from functions in threads

If update_job throws an exception and causes the thread to stop, then I can catch its exception, then continue, wait for the next scheduled task to run and try again, but this requires changing the source code, can I If you change the source code, try not to change it. So here I adopted the method of inheriting the BackgroundScheduler class and then rewriting the _process_jobs method to solve the problem.

In the rewritten _process_jobs method, capture the exception of the parent class’s _process_jobs(), and then continuously retry, so that even if update_job throws an exception, You can also keep trying to recover until you succeed.

class DemoBackgroundScheduler(BackgroundScheduler):
    def _process_jobs(self):
        while True:
            try:
                return super()._process_jobs()
            except BaseException:
                time.sleep(5)

class Command(BaseCommand):
    help = "Runs APScheduler."

    def handle(self, *args, **options):
        scheduler = DemoBackgroundScheduler(timezone=settings.TIME_ZONE)
        ...

Then try to reproduce the problem again. You can find that after disconnecting the database, it can keep retrying without stopping the thread. When the database resumes operation, the job is executed successfully and no exception is thrown.

Related links

  • APScheduler official documentation

Welcome to pay attention, learn from each other and make progress together~

My personal blog
Public account: Programming black hole