Django combines Celery to make asynchronous calls

Table of Contents

Introduction to Celery

Relevant environment

Related configuration

1. Create a new celery.py module in the proj/proj/ directory

Define Celery instance:

2. Import this application in the proj/proj/__init__.py module.

3. Define the task file tasks.py in the respective modules

4. settings.py configuration

Service start

asynchronous call


Celery introduction

Celery is a simple, flexible and reliable distributed system that can handle large amounts of messages. It is a task queue that focuses on real-time processing and also supports task scheduling.

Celery can do queues, asynchronous calls, decoupling, high concurrency, traffic peak shaving, scheduled tasks, etc.

A Celery system can be composed of multiple workers and brokers to achieve high availability and horizontal scalability.

Celery can be used independently or in combination with Django

Principle: Celery throws pending tasks into the message queue, and then consumes them by the worker process

Python 3.8
django-celery-beat==2.5.0
django-celery-results==2.5.1
celery==5.2.7
redis==4.6.0

In this article, redis is used as the message queue, and Mysql5.7 is used to save the execution results.

The Django project layout is as follows:

- proj/
  - manage.py
  - proj/
    - __init__.py
    -settings.py
    - urls.py

1. Create a new celery.py module in the proj/proj/ directory

Define Celery instance:

import os

from celery import celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') passes the setting module to the setting module to the celery program
app.config_from_object('django.conf:settings', namespace='CELERY') gets the configuration starting with CELERY from the configuration file

2. Import this application in the proj/proj/__init__.py module.

Mainly for the application to load when Django starts, so that the @shared_task decorator:

proj/proj/__init__.py

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

3. Define the task files tasks.py

in the respective modules

Celery will automatically discover these tasks from tasks.py

#A simple tasks task

from celery import shared_task


@shared_task
def add(x, y):
    return x + y

The directory structure is as follows

- proj/
  - manage.py
  - proj1/
    - tasks.py
    -admin.py
    - models.py
    - views.py
    - tests.py
    - apps.py
  - proj/
    - __init__.py
    -settings.py
    -celery.py
    - urls.py

4. settings.py configuration

CELERY_BROKER_URL= xxxx.xxx.xxx.xxx
# Message queue address, which can be redis, mq, mysql, etc.
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_RESULT_BACKEND = 'django-db'
# How to save task execution results
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS= {
    'global_keyprefix': '{}_celery_prefix_'.format(ENV_PROFILE)
}
CELERYD_CONCURRENCY = 5
# The number of concurrent worker processes/threads/threads executing the task.
CELERYD_MAX_TASKS_PER_CHILD = 100
# The maximum number of tasks that a task pool worker process can execute before being replaced by a new process. There is no limit by default.
CELERYD_FORCE_EXECV = True

CELERY_TASK_TRACK_STARTED: When True, the task determines whether to report its status as “Started”. The default value is False. Any long running tasks and need to report on currently running tasks.

CELERY_TASK_TIME_LIMIT: Task time limit in seconds. When this value is exceeded, the process handling the task will be killed and replaced by a new process.

CELERY_ACCEPT_CONTENT: Received messages are serialized, by default only json is enabled, but any content type can be added, including pickle and yaml;

Notice:

1. Celery introduced lowercase configuration from 4.0, but when loading Celery configuration from the Django settings module, you need to continue to use uppercase names. You also need to use the CELERY_ prefix so that Celery settings don’t conflict with Django settings used by other applications.

2. Use Mysql5.7 as task result storage, and set the database to utf8. If you use utf8mb4, you cannot successfully create related tables.

Service Start

celery -A proj worker -l INFO
#The scheduled task startup method is as follows:
celery -A itmanager beat

Asynchronous call

views.py code is as follows

#A simple tasks task

from proj1.tasks import add


def add(request):
    x = int(request.POST.get('x', '0'))
    y = int(request.POST.get('y', '0'))
    addInfo.delay(x, y)

Saving results is not enabled by default. In this article we have Celery results backend. You can see the task running status in the admin management background.

FAQ

1. The results are not saved or the task is always in PENDING state.

All tasks default to PENDING, so the status is “Unknown”. Celery does not update status when sending tasks, and any tasks without history are assumed to be pending.

  1. Make sure the task ignore_result is not enabled.

    Enabling this option will force workers to skip updating status

  2. Make sure there aren’t any old threads running.

    It’s easy to accidentally start multiple worker threads, so before starting a new worker thread, make sure the previous worker thread has been shut down properly.

    Old workers without the expected result backend configured may be running and hijacking tasks.

    The parameter –pidfile can be set to an absolute path to ensure this does not happen.

  3. Make sure the client is configured with the correct backend.

    If for some reason the client is configured to use a different backend than the worker thread, you will not receive results. Make sure the backend is configured correctly:

    <strong>>>> </strong>result = task.delay()
    <strong>>>> </strong>print(result.backend)