uwsgi deploys multi-process django apscheduler and troubleshoots

  • About the author: Hello everyone, I am Zeeland, an open source builder and high-quality creator in the full-stack field.
  • CSDN homepage: Zeeland
  • My blog: Zeeland
  • Github homepage: Undertone0809 (Zeeland)
  • Support me: Like + Collection + Leave a message
  • Introduction: The mixture of software dev + Iot + ml + anything

Introduction

The apscheduler that was written in Django before has always been built using the decorator method, because the scheduled tasks of the business are fixed and there are no other problems. Recently, scheduled tasks need to be dynamically increased or decreased, and dynamically set up scheduled tasks. Therefore, the traditional decorator writing method does not work. You need to use scheduler.add_job() to add jobs.

At first I thought it was just a simple function replacement, but I didn’t expect that many different problems would arise after the replacement, so I plan to record the problems that occurred.

This article will introduce how to use uwsgi to deploy a django + apscheduler system. In order to focus on the key points, this article will not go into detail on the configuration process of uwsgi.ini and other files. It will only introduce how to write and build an apscheduler system in django and deploy it on the uwsgi server.

Background introduction

It has been briefly introduced above. Let’s introduce the author’s original apscheduler construction method. A jobs/core_job.py file is built in the project root directory, and the core task scheduling is written in it. The approximate code is as follows:

from typing import List, Dict, Any

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore, register_job

from app import models
from commons.log_util import get_logger, enable_log
from services.db import db_service
from services.huasi import huasi_storage

logger = get_logger()
enable_log()

#Create background scheduler
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")


@register_job(
    scheduler,
    "interval",
    seconds=60 * 120 * 1,
    id="store A device data",
    replace_existing=True,
)
def store_pingxiang_device_job():
    ...
    logger.info("[store A device job] finish")


@register_job(
    scheduler,
    "interval",
    seconds=60 * 60 * 1,
    id="store B device data",
    replace_existing=True,
)
def store_baiyun_device_job():
    ...
    logger.info("[store B device job] finish")


def start_scheduler():
    """Get the project that needs to store task scheduling and enable task scheduling"""
    scheduler.start()
    logger.info("[django scheduler] start scheduler")


def resume_scheduler():
    logger.info("[django scheduler] resume schedule")
    scheduler.resume()


def stop_scheduler():
    logger.info("[django scheduler] pause schedule")
    scheduler.pause()

Initially, I put it inside the submodule’s apps.py ready() like this:

from django.apps import AppConfig


class AppConfig(AppConfig):
    default_auto_field = "django.db.models.BigAutoField"
    name = "app"
    verbose_name = "xxx management system"

    def ready(self):
        from jobs.core_job import start_scheduler

        start_scheduler()

Later I discovered that this was a very unwise choice, because in uwsgi, if your processes is not 1, that is, when you want to enable multi-process mode, ready() will be executed multiple times, and if your job is Fixed, then you will have the following problem. When django apscheduler initializes and adds tasks to the database, pymysql.err.IntegrityError: (1062, "Duplicate entry 'store A device job' for key 'django_apscheduler_djangojob. PRIMARY'")This kind of problem.

According to the error message provided, the problem is an integrity error (IntegrityError) caused by duplicate entries in the data table django_apscheduler_djangojob. The specific error message is: “Duplicate entry ‘store A device job’ for key ‘django_apscheduler_djangojob.PRIMARY'”, which means that there are duplicate entries on the primary key of the django_apscheduler_djangojob table in the database. The specific inserted entry is ‘store A device job’.

To solve this problem, you can check whether there are repeated insertions of the same data in your code, or manually clean up the duplicate data in the database to ensure that each entry is unique on the primary key. You can use SQL commands to remove duplicate entries, or use the methods provided by the Django ORM to perform cleanup operations, depending on your application’s needs and code implementation.

In the database, executing the added job multiple times will lead to duplication problems, which is why apps.py ready() placed in the submodule is not a wise choice.

In fact, if you set multi-process mode in uwsgi.ini, then placing it in settings.py in the project directory will also cause the same problem, because the variables of the two processes are independent, and executing two processes at the same time A program will have the problem of duplication of database fields, resulting in an error.

Solve the problem of multiple processes starting apscheduler multiple times

Option 1

To run the apscheduler program only once in multiple workers, you can use the concept of distributed locks. In this scenario, only one worker can acquire the lock and execute the apscheduler program. Other workers will detect that the lock has been acquired and wait or skip execution as needed.

Here is a possible solution:

  1. Use a shared reliable storage system such as a database or a distributed cache (such as Redis) to store lock status.

  2. In your Django application, use a decorator or middleware to implement locking logic. The function of this decorator/middleware is to check the status of the lock before the apscheduler program is executed, and decide whether to execute the program based on the situation.

Here is a simple implementation example using Redis as the storage system:

import redis
from functools import wraps

def apscheduler_lock(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        lock_key = "apscheduler_lock"
        lock_value = "locked"
        # Connect to Redis
        redis_client = redis.Redis(host='localhost', port=6379)
        # Try to acquire the lock
        acquired_lock = redis_client.set(lock_key, lock_value, nx=True, ex=60)
        if acquired_lock:
            # If the lock is successfully obtained, execute the apscheduler program
            result = func(*args, **kwargs)
            # Release the lock after execution is completed
            redis_client.delete(lock_key)
            return result
        else:
            # No lock is obtained, skip execution of apscheduler program
            return None # You can return other information as needed

    return wrapper

Apply this decorator on your apscheduler task function, for example:

from apscheduler.schedulers.background import BackgroundScheduler

@scheduller_lock
def my_apscheduler_task():
    # Execute your apscheduler task
    pass

#Create a scheduled task
scheduler = BackgroundScheduler()
scheduler.add_job(my_apscheduler_task, 'interval', minutes=10)
scheduler.start()

In this way, no matter how many workers you start (you can also use multiple uwsgi processes), only one worker can obtain the lock and execute the apscheduler program, and other workers will skip execution.

Please adapt this example to your actual situation and ensure that your shared storage system’s configuration and connection code are correct.

Option 2

If you think distributed locks are troublesome, you can also refer to the solution provided by the author of django-apscheduler. Use manage.py to operate the command line to start apscheduler. This method is more free. When multi-process Django starts, execute the command again. Starting apscheduler can avoid the problem of repeatedly adding jobs described above.

Add a custom Django management command to your project that schedules the APScheduler jobs and starts the scheduler:

# runapscheduler.py
import logging

from django.conf import settings

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import util

logger = logging.getLogger(__name__)


def my_job():
  # Your job processing logic here...
  pass


# The `close_old_connections` decorator ensures that database connections, that have become
# unusable or are obsolete, are closed before and after your job has run. You should use it
# to wrap any jobs that you schedule that access the Django database in any way.
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):
  """
  This job deletes APScheduler job execution entries older than `max_age` from the database.
  It helps to prevent the database from filling up with old historical records that are no
  longer useful.
  
  :param max_age: The maximum length of time to retain historical job execution records.
                  Defaults to 7 days.
  """
  DjangoJobExecution.objects.delete_old_job_executions(max_age)


class Command(BaseCommand):
  help = "Runs APScheduler."

  def handle(self, *args, **options):
    scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
    scheduler.add_jobstore(DjangoJobStore(), "default")

    scheduler.add_job(
      my_job,
      trigger=CronTrigger(second="*/10"), # Every 10 seconds
      id="my_job", # The `id` assigned to each job MUST be unique
      max_instances=1,
      replace_existing=True,
    )
    logger.info("Added job 'my_job'.")

    scheduler.add_job(
      delete_old_job_executions,
      trigger=CronTrigger(
        day_of_week="mon", hour="00", minute="00"
      ), # Midnight on Monday, before start of the next work week.
      id="delete_old_job_executions",
      max_instances=1,
      replace_existing=True,
    )
    logger.info(
      "Added weekly job: 'delete_old_job_executions'."
    )

    try:
      logger.info("Starting scheduler...")
      scheduler.start()
    except KeyboardInterrupt:
      logger.info("Stopping scheduler...")
      scheduler.shutdown()
      logger.info("Scheduler shut down successfully!")

The management command defined above should be invoked via ./manage.py runapscheduler whenever the webserver serving your Django application is started. The details of how and where this should be done is implementation specific, and depends on which webserver you are using and how you are deploying your application to production. For most people this should involve configuring a supervisor process of sorts.

Register any APScheduler jobs as you would normally. Note that if you haven’t set DjangoJobStore as the default’ job store, then you will need to include jobstore=djangojobstore’ in your scheduler.add_job() calls.

Summary

This article summarizes the problems that may arise when using uwsgi to deploy multi-process Django apscheduler, and provides two solutions. If you have better ideas, please discuss them in the comment area.