First step installation
pip install celery
Select message agent
? Message brokers used in production environments include RabbitMQ and Redis. RabbitMQ is officially recommended
Celery serialization
?Transmitting data between clients and consumers requires serialization and deserialization. The serialization scheme expended by Celery is as follows:
5. Installation, configuration and simple examples
Summary of Celery configuration parameters
Code example:
Note that the upper part is the configuration of asynchronous tasks. If you want to configure scheduled tasks, add the commented out part below.
Scheduled tasks do not have to be written in Django’s view methods. You can use Celery or other task scheduling tools to execute tasks regularly without relying on view methods.
# Broker configuration, using Redis as message middleware BROKER_URL = 'redis://:6379/0' # BACKEND configuration, here use redis to specify the Redis connection address for storing task results CELERY_RESULT_BACKEND = 'redis://6379/0' # Format settings for messages such as celery content, default json CELERY_ACCEPT_CONTENT = ['application/json', ] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' # Task result expiration time, seconds CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True # Time zone configuration CELERY_TIMEZONE='Asia/Shanghai' #Specify the imported task module, you can specify multiple CELERY_IMPORTS = ( # 'bbs.tasks', 'echartsapp.tasks', ) CELERY_TASK_TIME_LIMIT = 5 # Define the configuration of scheduled tasks # CELERY_BEAT_SCHEDULE = { # 'sync-redis-to-mysql': { # # 'task': 'bbs.tasks.sync_redis_to_mysql', # Replace with your task function path # # 'schedule': crontab(minute=0, hour=0), # Trigger the task at midnight every day # 'task': 'bbs.tasks.scheduled_task', # # Execute every 10 seconds # 'schedule': timedelta(seconds=10), # }, # }
Configure celery.py in the root project path
import os import django from celery import celery from django.conf import settings # Set system environment variables and install django. They must be set, otherwise an error will be reported when starting celery. #djangotest is the current project name os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoTest.settings') #Instantiate a celery class django.setup() celery_app = Celery('djangoTest') #Specify the location of the configuration file celery_app.config_from_object('django.conf:settings', namespace='CELERY') #Automatically load tasks.py from the application directory in the settings configuration INSTALLED_APPS celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Configure init.py in the root project path. This is generally not required.
from __future__ import absolute_import, unicode_literals from .celery import celery_app __all__ = ['celery_app']
Method to call the task:
1. delay task.delay(args1, args2, kwargs=value_1, kwargs2=value_2) 2. apply_async delay is actually an alias of apply_async, and can also be called using the following method, but apply_async supports more parameters: task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value}) Supported parameters: countdown: wait for a period of time before executing. add.apply_async((2,3), countdown=5) eta: Defines the start time of the task. add.apply_async((2,3), eta=now + tiedelta(second=10)) expires: Set the timeout. add.apply_async((2,3), expires=60) retry: If the scheduled task fails, whether to retry. add.apply_async((2,3), retry=False) retry_policy: Retry policy. ? max_retries: Maximum number of retries, default is 3 times. ? interval_start: The number of seconds between retries and waiting. The default is 0, which means retrying directly without waiting. ? interval_step: The number of seconds to increase the retry interval for each retry, which can be a number or a floating point number. The default is 0.2 ? interval_max: The maximum number of seconds for the retry interval, that is, after the number of seconds increased by interval_step, it will no longer increase. It can be a number or a floating point number. The default is 0.2. Custom publishers, switches, routing keys, queues, priorities, sequencing schemes and compression methods: task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange')</ pre> <h4>Writing tasks</h4> <p>Generally, a new task file is created under the app, and periodic tasks are written in it, defined using the @app.task decorator.<br> task.py: under your app</p> <pre>@shared_task def sync_redis_to_mysql(key,pk): # Get the count value in Redis #celery - A djangoTest beat redis_count = cache.get(key) or 0 dbv = key.split('-')[0] # Get the value in the database # view_count = serializer.data.get('view_count') db_count = TTopic.objects.values_list(dbv, flat=True).get(article_id=pk) # Add the Redis count value to the database TTopic.objects.filter(pk).update(dbv=db_count + redis_count) #Clear the count value in Redis cache.delete(key)
Asynchronously calling tasks
Celery has three methods for calling asynchronous tasks:
app.send_task is not very commonly used
# tasks.py from celery import celery app = Celery() def add(x,y): return x + y app.send_task('tasks.add',args=[10,20]) # The parameters are basically the same as those of the apply_async function # However, send_task will not check whether the tasks.add function exists when sending. Even if it is empty, it will be sent successfully, so celery execution may not find the function and report an error;
Task.delay
def inset(request): test.delay(args=[3,5]) return HttpResponse('ok') Task.delay(parameter 1, parameter 1, key value, kwargs2=value_2)
Task.apply_async is similar to delay, but supports more parameters
def inset(request): result = test.apply_async(args=[4,5]) return HttpResponse('ok') task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
Test: When you access the link through the browser, you will not feel the 2s delay at all. The page can be opened in seconds. At the same time, you will find that the terminal output shows that the task was successfully executed.
Using the apply_async method to call the test task can also print the task id and task status
def inset(request): result = test.apply_async(args=[3,5]) print(result.id) print(result.status) return HttpResponse('ok')
Perform tasks
When using Celery to perform asynchronous tasks, you typically need to run the Celery service (worker process) before running your project. This is because the Celery worker process runs independently and listens to the queue to process asynchronous tasks at any time.
Here are the general steps: write the task,
-
First run the Celery service:
celery -A your_project_name worker --loglevel=info
This will start a Celery worker process that will keep running waiting for asynchronous tasks to be performed.
-
Then run your project:
python manage.py runserver
Or start a Django project based on how your project is configured. After your project is running, you can trigger the execution of asynchronous tasks through view methods or other means.
-
If you have scheduled tasks, you can also start the Celery Beat service to schedule these tasks:
celery -A your_project_name beat --loglevel=info
celery -A your_project_name beat --loglevel=info
This will start the Celery Beat service, which will execute scheduled tasks according to a predetermined schedule.
Note that Celery and your project run independently and communicate with each other through message queues. When your project triggers an asynchronous task, it puts the task into a message queue, and a Celery worker process gets the task from the queue and executes it. This way you ensure that asynchronous tasks do not block or affect your main project.
More Chinese documents can be viewed
Preface – Celery Chinese Manual
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Python entry skill treeWeb application development Django376716 people are learning the system