- celery application scenarios
- Use of celery
- 1.1 Setting up the environment
- 1.2 Quick use
- Task timeout limit
- 1.3 Applying celery in django
- 1.4 Celery scheduled execution
- 1.5 Periodic scheduled tasks
- 1.6 Task binding, logging, and retrying
- 1.7 Enable task monitoring
- celery interview summary
celery application scenario
-
celery, a Python module that handles tasks.
-
scene 1:
For [time-consuming tasks], add the task to the broker (queue) through celery, and then immediately return a task ID to the user. After the task is added to the broker, the worker goes to the broker to obtain the task and process the task. After the task is completed, the results are placed in the backend. The user wants to check the results and provides the task ID, and we can go to the backend to help him find it.
-
Scenario 2:
Timed tasks (timed release/timed auction)
-
Use of celery
Celery is a simple, flexible, and reliable distribution system developed in Python to handle a large number of tasks. It not only supports real-time processing, but also supports task scheduling.
Support multiple brokers and workers to achieve high availability and distribution.
Throw some time-consuming tasks into the broker queue, and a task ID will be returned. You can use the task ID to get the results from the backend queue. The worker obtains tasks from the broker to execute and returns the results to the backend queue. Function name, parameters passed in broker
1.1 Environment Construction
pip3 install celery==4.4 Install broker: redis or rabbitMQ pip3 install redis/pika
1.2 Quick Use
-
s1.py
from celery import Celery app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379') @app.task # Indicates that this function can be used as a celery task def x1(x, y): return x + y @app.task def x2(x, y): return x - y
-
s2.py
from s1 import x1 # Immediately tell celery to create and execute the x1 task and pass two parameters result = x1.delay(4, 4) print(result.id) #Task ID
-
s3.py gets task results
from celery.result import AsyncResult from s1 import app result_object = AsyncResult(id="task ID", app=app) print(result_object.status) #Task status data = result_object.get() # Get task results print(data)
Task timeout limit
This prevents certain tasks from remaining in an abnormal in-progress state and blocking other tasks in the queue. A timeout should be set for task execution. If the task times out and is not completed, the Worker will be killed and a new Worker will be started instead.
@app.task(time_limit=1800) # You can set the task timeout limit
Run the program:
-
Start redis
-
Start worker
# First enter the current directory celery worker -A s1 -l info # -A s1 find item # -l info is to print the log. If you do not add info when the code is online
an error will be reported under windows: Traceback (most recent call last): File "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\billiard\pool.py", line 362, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\celery\app\trace.py", line 546, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0) To solve the installation: pip install eventlet celery worker -A s1 -l info -P eventlet
-
Create a task and put it into the broker
python s2.py python s2.py
-
View task status
# Fill in the task ID in s3.py ptyhon s3.py
Cancel task
from s1 import app from celery.app.control import Control celery_control = Control(app=app) celery_control.revoke(id, terminate=True)
1.3 Application of celery in django
Use Django-celery in Django.
# pip3 install django-celery (not used, still using the celery module)
After that, you need to write code according to the requirements of django-celery.
-
Step 1: [Project/Project/settings.py] Add configuration
CELERY_BROKER_URL = 'redis://192.168.16.85:6379' CELERY_ACCEPT_CONTENT = ['json'] CELERY_RESULT_BACKEND = 'redis://192.168.16.85:6379' CELERY_TASK_SERIALIZER = 'json' # CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # Task expiration time
Summary of Celery configuration parameters
Configuration item Description CELERY_DEFAULT_QUEUE Default Queue CELERY_BROKER_URL Broker Address CELERY_RESULT_BACKEND Result Storage address CELERY_TASK_SERIALIZER Task serialization method CELERY_RESULT_SERIALIZER Task execution result serialization method CELERY_TASK_RESULT_EXPIRES Task expiration time CELERY_ACCEPT_CONTENT Specify the content type accepted by the task (serialization) -
Step 2: [Project/Project/celery.py] Create celery.py in the directory with the same name as the project
#!/usr/bin/env python # -*- coding:utf-8 -*- import os from celery import celery # set the default Django settings module for the 'celery' program. # Configuration file specified by celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demos.settings') app = Celery('demos') # The name can be chosen casually, the broker configuration is omitted, and the configuration file has been configured. # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. # All celery configurations start with CELERY app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. # Read the tasks.py file in each registered app app.autodiscover_tasks()
-
The third step, [project/app name/tasks.py]
from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y
-
Step 4, [Project/Project/
__init__.py
]from .celery import app as celery_app __all__ = ('celery_app',)
-
Start worker
# First enter the project directory celery worker -A demos -l info -P eventlet
-
Write the view function and call celery to create the task.
-
url
# from api.views import task url(r'^create/task/$', task.create_task), url(r'^get/result/$', task.get_result),
-
view function
from django.shortcuts import HttpResponse from api.tasks import x1 from celery.result import AsyncResult # from demos.celery import app from demos import celery_app def create_task(request): print('The request is coming') result = x1.delay(2,2) #Add x1 task and return task ID print('Execution completed') return HttpResponse(result.id) def get_result(request): nid = request.GET.get('nid') result_object = AsyncResult(id=nid, app=celery_app) # print(result_object.status) data = result_object.get() # Get data return HttpResponse(data)
-
-
Start the django program
python manage.py....
1.4 celery scheduled execution
from app01 import tasks from celery.result import AsyncResult def time_task(request): """ Timed execution :param request: :return: """ # Get local time ctime = datetime.datetime.now() # Convert to UTC time utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) s10 = datetime.timedelta(seconds=60) # Execute after 60s ctime_x = utc_ctime + s10 #Execution time # Use apply_async and set the time result = tasks.mul.apply_async(args=[2, 5], eta=ctime_x) return HttpResponse(result.id) def time_result(request): nid = request.GET.get('nid') from celery.result import AsyncResult # from demos.celery import app from celerytest import celery_app result_object = AsyncResult(id=nid, app=celery_app) # print(result_object.status) # Get status # data = result_object.get() # Get data # result_object.forget() # Remove data in backend # result_object.revoke(terminate=True) # Cancel the task terminate=True to force cancellation \t # Absolute return method through status if result_object.successful(): # success data = result_object.get() result_object.forget() elif result_object.failed(): # fail data = 'Execution failed! ' else: data = 'Executing! ' return HttpResponse(data)
Supported parameters:
-
countdown: wait for a period of time before executing.
tasks.add.apply_async((2,3), countdown=5)
-
eta: Defines the start time of the task.
tasks.add.apply_async((2,3), eta=now + tiedelta(second=10))
-
expires: Set the timeout.
tasks.add.apply_async((2,3), expires=60)
-
retry: If the scheduled task fails, whether to retry.
tasks.add.apply_async((2,3), retry=False)
-
retry_policy: Retry policy.
- max_retries: Maximum number of retries, default is 3 times.
- interval_start: The number of seconds between retries and waiting. The default is 0, which means retrying directly without waiting.
- interval_step: The number of seconds to increase the retry interval for each retry, which can be a number or a floating point number. The default is 0.2
- interval_max: The maximum number of seconds for the retry interval, that is, after the number of seconds increased by interval_step, it will no longer increase. It can be a number or a floating point number. The default is 0.2.
1.5 Periodic scheduled task
- celery
- It can also be used in combination with django
The difference between task and shared_task decorators:
task is called through the created Celery object For example: app1 = Celery('tasks', broker='redis://192.168.16.48:6379',) app2 = Celery('tasks', broker='redis://192.168.16.48:6379',) @app1.task def x1(x, y): return x-y @app2.task def x2(x, y): return x*y It is mostly used in a single file and does not need to be loaded directly into memory. When there are multiple Celery objects, the task function can be decorated with one of them explicitly. shared_task is mostly used with multiple files using celery. Generally, only one Celery object is created in celery.py, for example, Django integrates celery. When the project starts, the celery object will be loaded into the memory, and @shared_task will automatically transfer the tasks written in task.py under each application to the Celery object in the memory, which is highly reusable. For example: @shared_task def x1(x, y): return x-y from web import tasks tasks.x1.delay(1,5) But when multiple Celery objects are created in celery.py and different objects are used for different tasks, the object name needs to be specified at this time. For example: from web import tasks \t app1.tasks.x1.delay(1,5) app2.tasks.x2.delay(1,5)
1.6 Task binding, recording log, retry
# Modify tasks.py file. from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def div(self, x, y): logger.info(('Executing task id {0.id}, args: {0.args!r}' 'kwargs: {0.kwargs!r}').format(self.request)) try: result = x/y except ZeroDivisionError as e: raise self.retry(exc=e, countdown=5, max_retries=3) # When a ZeroDivisionError error occurs, retry every 5s, up to 3 times. return result
When the bind=True
parameter is used, the parameters of the function change, and the parameter self is added. This is equivalent to programming a bound method into the div, and the context of the task can be obtained through self.
1.7 Enable task monitoring
Flower is a real-time monitoring tool officially recommended by Celery for monitoring the running status of Tasks and Workers. Flower provides the following features:
- View Task list, history, parameters, start time, execution status, etc.
- Cancel and terminate tasks
- View Worker list and status
- Remotely start, shut down, and restart Worker processes
- Provide HTTP API to facilitate integration into operation and maintenance systems
Compared to viewing logs, Flower’s web interface is more user-friendly.
Flower’s supervisor management configuration file:
[program:flower] command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555 directory=/opt/PyProjects/app autostart=true autorestart=true startretries=3 user=derby stdout_logfile=/var/logs/%(program_name)s.log stdout_logfile_maxbytes=50MB stdout_logfile_backups=30 stderr_logfile=/var/logs/%(program_name)s-error.log stderr_logfile_maxbytes=50MB stderr_logfile_backups=3
celery interview summary
1. Celery is a simple, flexible, reliable system developed by python that can handle a large number of tasks. It can distribute tasks and perform scheduled tasks. Mostly used for time-consuming operations. For example, functions such as sending text messages and email can use Celery for task distribution. 2. The function description of @shared_task/@task decoration is this celery task, which will be added to the broker. 3. Function name.delay (parameter) will call and execute the task, and return the task ID. 4. You can go to backend to get the task status and results based on the task ID. AsyncResult(id=task ID, app=celery_app).get() gets the result of the task; 5. apply_async(args=[parameter], eta) sets the scheduled execution task, eta is the execution time of the scheduled task (utc time). 6. revoke() can cancel the task.