Using celery in Django to implement scheduled tasks and asynchronous tasks

First step installation

pip install celery

Select message agent

? Message brokers used in production environments include RabbitMQ and Redis. RabbitMQ is officially recommended

Celery serialization
?Transmitting data between clients and consumers requires serialization and deserialization. The serialization scheme expended by Celery is as follows:

5. Installation, configuration and simple examples
Summary of Celery configuration parameters

Code example:

Note that the upper part is the configuration of asynchronous tasks. If you want to configure scheduled tasks, add the commented out part below.

Scheduled tasks do not have to be written in Django’s view methods. You can use Celery or other task scheduling tools to execute tasks regularly without relying on view methods.

# Broker configuration, using Redis as message middleware
BROKER_URL = 'redis://:6379/0'

# BACKEND configuration, here use redis to specify the Redis connection address for storing task results
CELERY_RESULT_BACKEND = 'redis://6379/0'

# Format settings for messages such as celery content, default json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# Task result expiration time, seconds
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True


# Time zone configuration
CELERY_TIMEZONE='Asia/Shanghai'

#Specify the imported task module, you can specify multiple
CELERY_IMPORTS = (
   # 'bbs.tasks',
    'echartsapp.tasks',
)
CELERY_TASK_TIME_LIMIT = 5



# Define the configuration of scheduled tasks
# CELERY_BEAT_SCHEDULE = {
# 'sync-redis-to-mysql': {
# # 'task': 'bbs.tasks.sync_redis_to_mysql', # Replace with your task function path
# # 'schedule': crontab(minute=0, hour=0), # Trigger the task at midnight every day
# 'task': 'bbs.tasks.scheduled_task',
# # Execute every 10 seconds
# 'schedule': timedelta(seconds=10),
# },
# }

Configure celery.py in the root project path

import os
import django
from celery import celery
from django.conf import settings

# Set system environment variables and install django. They must be set, otherwise an error will be reported when starting celery.
#djangotest is the current project name
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoTest.settings')
#Instantiate a celery class
django.setup()

celery_app = Celery('djangoTest')
#Specify the location of the configuration file
celery_app.config_from_object('django.conf:settings', namespace='CELERY')
#Automatically load tasks.py from the application directory in the settings configuration INSTALLED_APPS
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Configure init.py in the root project path. This is generally not required.

from __future__ import absolute_import, unicode_literals
from .celery import celery_app

__all__ = ['celery_app']

Method to call the task:

1. delay
task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)

2. apply_async
delay is actually an alias of apply_async, and can also be called using the following method, but apply_async supports more parameters:

task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})

Supported parameters:

countdown: wait for a period of time before executing.

add.apply_async((2,3), countdown=5)

eta: Defines the start time of the task.

add.apply_async((2,3), eta=now + tiedelta(second=10))

expires: Set the timeout.

add.apply_async((2,3), expires=60)

retry: If the scheduled task fails, whether to retry.

add.apply_async((2,3), retry=False)

retry_policy: Retry policy.

? max_retries: Maximum number of retries, default is 3 times.

? interval_start: The number of seconds between retries and waiting. The default is 0, which means retrying directly without waiting.

? interval_step: The number of seconds to increase the retry interval for each retry, which can be a number or a floating point number. The default is 0.2

? interval_max: The maximum number of seconds for the retry interval, that is, after the number of seconds increased by interval_step, it will no longer increase. It can be a number or a floating point number. The default is 0.2.

Custom publishers, switches, routing keys, queues, priorities, sequencing schemes and compression methods:

task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange')</ pre>
<h4>Writing tasks</h4>
<p>Generally, a new task file is created under the app, and periodic tasks are written in it, defined using the @app.task decorator.<br> task.py: under your app</p>
<pre>@shared_task
def sync_redis_to_mysql(key,pk):
    # Get the count value in Redis
    #celery - A djangoTest beat

    redis_count = cache.get(key) or 0
    dbv = key.split('-')[0]
    # Get the value in the database
    # view_count = serializer.data.get('view_count')
    db_count = TTopic.objects.values_list(dbv, flat=True).get(article_id=pk)

    # Add the Redis count value to the database
    TTopic.objects.filter(pk).update(dbv=db_count + redis_count)

    #Clear the count value in Redis
    cache.delete(key)

Asynchronously calling tasks
Celery has three methods for calling asynchronous tasks:

app.send_task is not very commonly used

# tasks.py
from celery import celery
app = Celery()
def add(x,y):
    return x + y

app.send_task('tasks.add',args=[10,20])
 # The parameters are basically the same as those of the apply_async function # However, send_task will not check whether the tasks.add function exists when sending. Even if it is empty, it will be sent successfully, so celery execution may not find the function and report an error;

Task.delay

def inset(request):
    test.delay(args=[3,5])
    return HttpResponse('ok')
 Task.delay(parameter 1, parameter 1, key value, kwargs2=value_2)

Task.apply_async is similar to delay, but supports more parameters

 def inset(request):
    result = test.apply_async(args=[4,5])
    return HttpResponse('ok')
 task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})

Test: When you access the link through the browser, you will not feel the 2s delay at all. The page can be opened in seconds. At the same time, you will find that the terminal output shows that the task was successfully executed.

Using the apply_async method to call the test task can also print the task id and task status

def inset(request):
    result = test.apply_async(args=[3,5])
    print(result.id)
    print(result.status)
    return HttpResponse('ok')

Perform tasks

When using Celery to perform asynchronous tasks, you typically need to run the Celery service (worker process) before running your project. This is because the Celery worker process runs independently and listens to the queue to process asynchronous tasks at any time.

Here are the general steps: write the task,

  1. First run the Celery service:

    celery -A your_project_name worker --loglevel=info

    This will start a Celery worker process that will keep running waiting for asynchronous tasks to be performed.

  2. Then run your project:

    python manage.py runserver

    Or start a Django project based on how your project is configured. After your project is running, you can trigger the execution of asynchronous tasks through view methods or other means.

  3. If you have scheduled tasks, you can also start the Celery Beat service to schedule these tasks:

    celery -A your_project_name beat --loglevel=info

    celery -A your_project_name beat --loglevel=info

    This will start the Celery Beat service, which will execute scheduled tasks according to a predetermined schedule.

Note that Celery and your project run independently and communicate with each other through message queues. When your project triggers an asynchronous task, it puts the task into a message queue, and a Celery worker process gets the task from the queue and executes it. This way you ensure that asynchronous tasks do not block or affect your main project.

More Chinese documents can be viewed
Preface – Celery Chinese Manual

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Python entry skill treeWeb application development Django376716 people are learning the system