Background
In the Django project, django-apschedule
is used to implement scheduled tasks. The BackgroundScheduler
scheduling class is used. The scheduling is implemented by executing scheduled tasks through background threads. The tasks are all persisted to the database.
During the running of the project, due to an exception in the database, the scheduled task thread terminated abnormally. Even if the database subsequently returned to normal, execution would no longer continue. I tried to reproduce the problem many times without success. When the scheduled task was enabled, I manually disconnected the database connection and the scheduled task failed to execute. Then I established a connection to the database and the scheduled task was restored. This made me confused for a moment.
The specific error log is as follows. Through analysis, it is found that the update_job connected to the database abnormally without any capture mechanism, and then thrown online layer by layer, eventually causing the thread to stop. It is certain that the scheduled task failed due to the failure of the database connection. Then Why can’t it be reproduced?
Traceback (most recent call last): File "/usr/local/python3/lib/python3.7/threading.py", line 926, in _bootstrap_inner self.run() File "/usr/local/python3/lib/python3.7/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/blocking.py", line 32, in _main_loop wait_seconds = self._process_jobs() File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 1009, in _process_jobs jobstore.update_job(job) File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/util.py", line 105, in func_wrapper result = func(*args, **kwargs) File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/jobstores.py", line 249, in update_job with transaction.atomic(): File "/usr/local/python3/lib/python3.7/site-packages/django/db/transaction.py", line 189, in __enter__ if not connection.get_autocommit(): File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 389, in get_autocommit self.ensure_connection() File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner return func(*args, **kwargs) File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection self.connect() File "/usr/local/python3/lib/python3.7/site-packages/django/db/utils.py", line 90, in __exit__ raise dj_exc_value.with_traceback(traceback) from exc_value File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection self.connect() File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner return func(*args, **kwargs) File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 200, in connect self.connection = self.get_new_connection(conn_params) File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner return func(*args, **kwargs) File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 187, in get_new_connection connection = Database.connect(**conn_params) File "/usr/local/python3/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connect conn = _connect(dsn, connection_factory=connection_factory, **kwasync) django.db.utils.OperationalError: connection to server at "xxxx.postgresql.svc.cluster.local" (xx.xx.xx.xx), port xxxx failed: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request.
Reasons for source code analysis
You can first look at the implementation of BackgroundScheduler
and create a child thread in the start
method.
class BackgroundScheduler(BlockingScheduler): _thread = None def start(self, *args, **kwargs): if self._event is None or self._event.is_set(): self._event = Event() BaseScheduler.start(self, *args, **kwargs) self._thread = Thread(target=self._main_loop, name='APScheduler') self._thread.daemon = self._daemon self._thread.start() def shutdown(self, *args, **kwargs): super(BackgroundScheduler, self).shutdown(*args, **kwargs) self._thread.join() del self._thread
Among them, _main_loop
is implemented in BlockingScheduler
. It is an infinite loop and executes the _process_jobs
method.
class BlockingScheduler(BaseScheduler): ... def _main_loop(self): wait_seconds = TIMEOUT_MAX while self.state != STATE_STOPPED: self._event.wait(wait_seconds) self._event.clear() wait_seconds = self._process_jobs() ...
Let’s look at the content in _process_jobs. It is implemented in BaseScheduler
. The main process is as follows. First, find all the jobs to be executed, then traverse and run them and update the status of the jobs. The previous error log, that is, the update_job here throws an exception, but the exception is not caught here. Finally, it is thrown upward layer by layer, update_job -> _process_jobs -> _main_loop
, and finally the thread terminates abnormally.
def _process_jobs(self): for jobstore_alias, jobstore in six.iteritems(self._jobstores): try: due_jobs = jobstore.get_due_jobs(now) except Exception as e: ... continue ... for job in due_jobs: ... try: executor.submit_job(job, run_times) except BaseException: ... ... jobstore.update_job(job)
So why can’t it be reproduced? This is because when the database connection is closed, the program may not run exactly in update_job
. You can see that the previous get_due_jobs
has exception capture. If a database connection exception is thrown here It can be captured, then skip the subsequent operations and wait for the execution of the next scheduled task. If it still fails, wait again, so the exception here will not be thrown to the top layer and cause the thread to stop.
But if at a certain time, the above connection to the database is successful, and an exception is thrown here in update_job, it will cause the entire thread to stop and the scheduled task will no longer be executed.
So how to solve this problem?
Build demo
First, we build a demo to simulate and reproduce the problem.
- Create django project
django-admin startproject apschedule_demo python manage.py startapp demo python manage.py makemigrations python manage.py migrate
- Configure good database information in settings.py
DATABASES = {<!-- --> "default": {<!-- --> "ENGINE": "django.db.backends.postgresql", "NAME": "apschedule_demo", "HOST": "xxxx", "PORT": 5432, "USER": "xxx", "PASSWORD": "xxx" } }
- Build a demo based on the official documentation provided by django-apschedule
Add the APP in settings.py
INSTALLED_APPS = ( #... "django_apscheduler", )
Create the directory demo/management/commands
and create the runapscheduler.py
file below it. The code content is as follows:
import logging from django.conf import settings from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from django.core.management.base import BaseCommand from django_apscheduler.jobstores import DjangoJobStore logger = logging.getLogger(__name__) def my_job(): # Your job processing logic here... print("job..") class Command(BaseCommand): help = "Runs APScheduler." def handle(self, *args, **options): scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) scheduler.add_jobstore(DjangoJobStore(), "default") scheduler.add_job( my_job, trigger=CronTrigger(second="*/3"), # Every 3 seconds id="my_job", # The `id` assigned to each job MUST be unique max_instances=1, replace_existing=True, ) logger.info("Added job 'my_job'.") try: logger.info("Starting scheduler...") scheduler.start() # Because the above is a non-blocking timed task, it needs to be blocked here to prevent the main thread from ending. while True: time.sleep(10) except KeyboardInterrupt: logger.info("Stopping scheduler...") scheduler.shutdown() logger.info("Scheduler shut down successfully!")
You can run the scheduled task by executing the above command through python manage.py runapscheduler
. This script creates a task that is executed every 3 seconds.
- recurrent
We set a breakpoint on jobstore.update_job(job)
, and then use debug mode for debugging. When the program runs to the breakpoint, close the database, and then the program continues to run, an error will be reported. And an exception is thrown, and the thread stops running. At this point, we have reproduced the problem.
Thread restart
At first I thought, I can determine whether the thread is abnormal, and if it is abnormal, just restart the thread.
while True: if not scheduler._thread.is_alive(): scheduler._thread.start() time.sleep(10)
But contrary to expectations, an exception was thrown. The exception information is as follows:
RuntimeError: threads can only be started once
By checking the official documentation, you can know that the start method of a thread can only be called once.
listener
Apschedule provides a listener mechanism, that is, the success or failure of scheduled tasks can be called back through the listener method registered in advance. However, by analyzing the source code, it cannot capture the exception of the scheduled task thread.
The following is the principle process of listeners that simplifies the code:
- Externally register the callback method through the
add_listener
method - Each event that occurs in the main process of the scheduled task thread
_process_jobs
is added to events - Traverse the events, and then match it with the registered callback method mask. If it matches, the callback method is called.
class BaseScheduler: def __init__(...): self._listeners = [] def add_listener(self, callback, mask=EVENT_ALL): self._listeners.append((callback, mask)) def _process_jobs(self): events = [] ... events.append(event) ... for events in events: self._dispatch_event(event) def _dispatch_event(self, event): for cb, mask in listeners: if event.code & mask: try: cb(event) except BaseException: self._logger.exception('Error notifying listener')
If the thread itself hangs, the callback method is not executable.
Capture exceptions from functions in threads
If update_job
throws an exception and causes the thread to stop, then I can catch its exception, then continue, wait for the next scheduled task to run and try again, but this requires changing the source code, can I If you change the source code, try not to change it. So here I adopted the method of inheriting the BackgroundScheduler
class and then rewriting the _process_jobs
method to solve the problem.
In the rewritten _process_jobs
method, capture the exception of the parent class’s _process_jobs()
, and then continuously retry, so that even if update_job throws an exception, You can also keep trying to recover until you succeed.
class DemoBackgroundScheduler(BackgroundScheduler): def _process_jobs(self): while True: try: return super()._process_jobs() except BaseException: time.sleep(5) class Command(BaseCommand): help = "Runs APScheduler." def handle(self, *args, **options): scheduler = DemoBackgroundScheduler(timezone=settings.TIME_ZONE) ...
Then try to reproduce the problem again. You can find that after disconnecting the database, it can keep retrying without stopping the thread. When the database resumes operation, the job is executed successfully and no exception is thrown.
Related links
- APScheduler official documentation
Welcome to pay attention, learn from each other and make progress together~
My personal blog
Public account: Programming black hole