Table of Contents
Introduction to Celery
Relevant environment
Related configuration
1. Create a new celery.py module in the proj/proj/ directory
Define Celery instance:
2. Import this application in the proj/proj/__init__.py module.
3. Define the task file tasks.py in the respective modules
4. settings.py configuration
Service start
asynchronous call
Celery introduction
Celery is a simple, flexible and reliable distributed system that can handle large amounts of messages. It is a task queue that focuses on real-time processing and also supports task scheduling.
Celery can do queues, asynchronous calls, decoupling, high concurrency, traffic peak shaving, scheduled tasks, etc.
A Celery system can be composed of multiple workers and brokers to achieve high availability and horizontal scalability.
Celery can be used independently or in combination with Django
Principle: Celery throws pending tasks into the message queue, and then consumes them by the worker process
Related environment
Python 3.8 django-celery-beat==2.5.0 django-celery-results==2.5.1 celery==5.2.7 redis==4.6.0
In this article, redis is used as the message queue, and Mysql5.7 is used to save the execution results.
Related configuration
The Django project layout is as follows:
- proj/ - manage.py - proj/ - __init__.py -settings.py - urls.py
1. Create a new celery.py module in the proj/proj/ directory
Define Celery instance:
import os from celery import celery # Set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django apps. app.autodiscover_tasks() @app.task(bind=True, ignore_result=True) def debug_task(self): print(f'Request: {self.request!r}')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') passes the setting module to the setting module to the celery program app.config_from_object('django.conf:settings', namespace='CELERY') gets the configuration starting with CELERY from the configuration file
2. Import this application in the proj/proj/__init__.py
module.
Mainly for the application to load when Django starts, so that the @shared_task
decorator:
proj/proj/__init__.py
:
# This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
3. Define the task files tasks.py
in the respective modules
Celery will automatically discover these tasks from tasks.py
#A simple tasks task from celery import shared_task @shared_task def add(x, y): return x + y
The directory structure is as follows
- proj/ - manage.py - proj1/ - tasks.py -admin.py - models.py - views.py - tests.py - apps.py - proj/ - __init__.py -settings.py -celery.py - urls.py
4. settings.py configuration
CELERY_BROKER_URL= xxxx.xxx.xxx.xxx # Message queue address, which can be redis, mq, mysql, etc. CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_TIMEZONE = "Asia/Shanghai" CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60 CELERY_ENABLE_UTC = False DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' CELERY_RESULT_BACKEND = 'django-db' # How to save task execution results CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS= { 'global_keyprefix': '{}_celery_prefix_'.format(ENV_PROFILE) } CELERYD_CONCURRENCY = 5 # The number of concurrent worker processes/threads/threads executing the task. CELERYD_MAX_TASKS_PER_CHILD = 100 # The maximum number of tasks that a task pool worker process can execute before being replaced by a new process. There is no limit by default. CELERYD_FORCE_EXECV = True
CELERY_TASK_TRACK_STARTED: When True
, the task determines whether to report its status as “Started”. The default value is False
. Any long running tasks and need to report on currently running tasks.
CELERY_TASK_TIME_LIMIT: Task time limit in seconds. When this value is exceeded, the process handling the task will be killed and replaced by a new process.
CELERY_ACCEPT_CONTENT: Received messages are serialized, by default only json is enabled, but any content type can be added, including pickle and yaml;
Notice:
1. Celery introduced lowercase configuration from 4.0, but when loading Celery configuration from the Django settings module, you need to continue to use uppercase names. You also need to use the CELERY_
prefix so that Celery settings don’t conflict with Django settings used by other applications.
2. Use Mysql5.7 as task result storage, and set the database to utf8. If you use utf8mb4, you cannot successfully create related tables.
Service Start
celery -A proj worker -l INFO #The scheduled task startup method is as follows: celery -A itmanager beat
Asynchronous call
views.py code is as follows
#A simple tasks task from proj1.tasks import add def add(request): x = int(request.POST.get('x', '0')) y = int(request.POST.get('y', '0')) addInfo.delay(x, y)
Saving results is not enabled by default. In this article we have Celery results backend. You can see the task running status in the admin management background.
FAQ
1. The results are not saved or the task is always in PENDING state.
All tasks default to PENDING, so the status is “Unknown”. Celery does not update status when sending tasks, and any tasks without history are assumed to be pending.
-
Make sure the task
ignore_result
is not enabled.Enabling this option will force workers to skip updating status
-
Make sure there aren’t any old threads running.
It’s easy to accidentally start multiple worker threads, so before starting a new worker thread, make sure the previous worker thread has been shut down properly.
Old workers without the expected result backend configured may be running and hijacking tasks.
The parameter –pidfile can be set to an absolute path to ensure this does not happen.
-
Make sure the client is configured with the correct backend.
If for some reason the client is configured to use a different backend than the worker thread, you will not receive results. Make sure the backend is configured correctly:
<strong>>>> </strong>result = task.delay() <strong>>>> </strong>print(result.backend)