Celery’s task distribution and scheduled tasks

Table of contents

  • celery application scenarios
  • Use of celery
    • 1.1 Setting up the environment
    • 1.2 Quick use
      • Task timeout limit
    • 1.3 Applying celery in django
    • 1.4 Celery scheduled execution
    • 1.5 Periodic scheduled tasks
    • 1.6 Task binding, logging, and retrying
    • 1.7 Enable task monitoring
  • celery interview summary

celery application scenario

  • celery, a Python module that handles tasks.

    • scene 1:

      For [time-consuming tasks], add the task to the broker (queue) through celery, and then immediately return a task ID to the user.
      After the task is added to the broker, the worker goes to the broker to obtain the task and process the task.
      After the task is completed, the results are placed in the backend.
      
      The user wants to check the results and provides the task ID, and we can go to the backend to help him find it. 
    • Scenario 2:

      Timed tasks (timed release/timed auction)

Use of celery

Celery is a simple, flexible, and reliable distribution system developed in Python to handle a large number of tasks. It not only supports real-time processing, but also supports task scheduling.

Support multiple brokers and workers to achieve high availability and distribution.

Throw some time-consuming tasks into the broker queue, and a task ID will be returned. You can use the task ID to get the results from the backend queue. The worker obtains tasks from the broker to execute and returns the results to the backend queue.

Function name, parameters passed in broker
1.1 Environment Construction
pip3 install celery==4.4
Install broker: redis or rabbitMQ
pip3 install redis/pika
1.2 Quick Use
  • s1.py

    from celery import Celery
    
    app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
    
    @app.task # Indicates that this function can be used as a celery task
    def x1(x, y):
        return x + y
    
    @app.task
    def x2(x, y):
        return x - y
  • s2.py

    from s1 import x1
    
    # Immediately tell celery to create and execute the x1 task and pass two parameters
    result = x1.delay(4, 4)
    print(result.id) #Task ID
  • s3.py gets task results

    from celery.result import AsyncResult
    from s1 import app
    
    result_object = AsyncResult(id="task ID", app=app)
    print(result_object.status) #Task status
    
    data = result_object.get() # Get task results
    print(data)
Task timeout limit

This prevents certain tasks from remaining in an abnormal in-progress state and blocking other tasks in the queue. A timeout should be set for task execution. If the task times out and is not completed, the Worker will be killed and a new Worker will be started instead.

@app.task(time_limit=1800) # You can set the task timeout limit

Run the program:

  1. Start redis

  2. Start worker

    # First enter the current directory
    celery worker -A s1 -l info
    
    # -A s1 find item
    # -l info is to print the log. If you do not add info when the code is online
    an error will be reported under windows:
    
    Traceback (most recent call last):
      File "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\billiard\pool.py", line 362, in workloop
        result = (True, prepare_result(fun(*args, **kwargs)))
      File "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\celery\app\trace.py", line 546, in _fast_trace_task
        tasks, accept, hostname = _loc
    ValueError: not enough values to unpack (expected 3, got 0)
    
    
    To solve the installation:
    pip install eventlet
    
    celery worker -A s1 -l info -P eventlet
  3. Create a task and put it into the broker

    python s2.py
    python s2.py
  4. View task status

    # Fill in the task ID in s3.py
    ptyhon s3.py

    Cancel task

    from s1 import app
    from celery.app.control import Control
    
    celery_control = Control(app=app)
    celery_control.revoke(id, terminate=True)
1.3 Application of celery in django

Use Django-celery in Django.

# pip3 install django-celery (not used, still using the celery module)

After that, you need to write code according to the requirements of django-celery.

  • Step 1: [Project/Project/settings.py] Add configuration

    CELERY_BROKER_URL = 'redis://192.168.16.85:6379'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_RESULT_BACKEND = 'redis://192.168.16.85:6379'
    CELERY_TASK_SERIALIZER = 'json'
    # CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # Task expiration time

    Summary of Celery configuration parameters

    Configuration item Description
    CELERY_DEFAULT_QUEUE Default Queue
    CELERY_BROKER_URL Broker Address
    CELERY_RESULT_BACKEND Result Storage address
    CELERY_TASK_SERIALIZER Task serialization method
    CELERY_RESULT_SERIALIZER Task execution result serialization method
    CELERY_TASK_RESULT_EXPIRES Task expiration time
    CELERY_ACCEPT_CONTENT Specify the content type accepted by the task (serialization)
  • Step 2: [Project/Project/celery.py] Create celery.py in the directory with the same name as the project

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import os
    from celery import celery
    
    # set the default Django settings module for the 'celery' program.
    
    # Configuration file specified by celery
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demos.settings')
    
    app = Celery('demos') # The name can be chosen casually, the broker configuration is omitted, and the configuration file has been configured.
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    # should have a `CELERY_` prefix.
    # All celery configurations start with CELERY
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    # Read the tasks.py file in each registered app
    app.autodiscover_tasks()
  • The third step, [project/app name/tasks.py]

    from celery import shared_task
    
    @shared_task
    def add(x, y):
        return x + y
    
    @shared_task
    def mul(x, y):
        return x * y
  • Step 4, [Project/Project/__init__.py]

    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
  • Start worker

    # First enter the project directory
    
    celery worker -A demos -l info -P eventlet
  • Write the view function and call celery to create the task.

    • url

      # from api.views import task
      
      url(r'^create/task/$', task.create_task),
      url(r'^get/result/$', task.get_result),
    • view function

      from django.shortcuts import HttpResponse
      from api.tasks import x1
      from celery.result import AsyncResult
      
      # from demos.celery import app
      from demos import celery_app
      
      def create_task(request):
          print('The request is coming')
          result = x1.delay(2,2) #Add x1 task and return task ID
          print('Execution completed')
          return HttpResponse(result.id)
      
      
      def get_result(request):
          nid = request.GET.get('nid')
          result_object = AsyncResult(id=nid, app=celery_app)
          # print(result_object.status)
          data = result_object.get() # Get data
          return HttpResponse(data)
  • Start the django program

    python manage.py....
1.4 celery scheduled execution
from app01 import tasks
from celery.result import AsyncResult

def time_task(request):
    """
    Timed execution
    :param request:
    :return:
    """
    # Get local time
    ctime = datetime.datetime.now()
    # Convert to UTC time
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())

    s10 = datetime.timedelta(seconds=60) # Execute after 60s
    ctime_x = utc_ctime + s10 #Execution time

    # Use apply_async and set the time
    result = tasks.mul.apply_async(args=[2, 5], eta=ctime_x)
    return HttpResponse(result.id)


def time_result(request):
    nid = request.GET.get('nid')
    from celery.result import AsyncResult
    # from demos.celery import app
    from celerytest import celery_app
    result_object = AsyncResult(id=nid, app=celery_app)
    # print(result_object.status) # Get status
    # data = result_object.get() # Get data
    # result_object.forget() # Remove data in backend
    # result_object.revoke(terminate=True) # Cancel the task terminate=True to force cancellation
\t
    # Absolute return method through status
    if result_object.successful():
        # success
        data = result_object.get()
        result_object.forget()
    elif result_object.failed():
        # fail
        data = 'Execution failed! '
    else:
        data = 'Executing! '
    return HttpResponse(data)

Supported parameters:

  • countdown: wait for a period of time before executing.

    tasks.add.apply_async((2,3), countdown=5)
  • eta: Defines the start time of the task.

    tasks.add.apply_async((2,3), eta=now + tiedelta(second=10))
  • expires: Set the timeout.

    tasks.add.apply_async((2,3), expires=60)
  • retry: If the scheduled task fails, whether to retry.

    tasks.add.apply_async((2,3), retry=False)
  • retry_policy: Retry policy.

    • max_retries: Maximum number of retries, default is 3 times.
    • interval_start: The number of seconds between retries and waiting. The default is 0, which means retrying directly without waiting.
    • interval_step: The number of seconds to increase the retry interval for each retry, which can be a number or a floating point number. The default is 0.2
    • interval_max: The maximum number of seconds for the retry interval, that is, after the number of seconds increased by interval_step, it will no longer increase. It can be a number or a floating point number. The default is 0.2.
1.5 Periodic scheduled task
  • celery
  • It can also be used in combination with django

The difference between task and shared_task decorators:

task is called through the created Celery object
For example:
    app1 = Celery('tasks', broker='redis://192.168.16.48:6379',)
    app2 = Celery('tasks', broker='redis://192.168.16.48:6379',)
    
    @app1.task
    def x1(x, y):
        return x-y
        
    @app2.task
    def x2(x, y):
        return x*y
    
    
It is mostly used in a single file and does not need to be loaded directly into memory. When there are multiple Celery objects, the task function can be decorated with one of them explicitly.

shared_task is mostly used with multiple files using celery. Generally, only one Celery object is created in celery.py, for example, Django integrates celery. When the project starts, the celery object will be loaded into the memory, and @shared_task will automatically transfer the tasks written in task.py under each application to the Celery object in the memory, which is highly reusable.
For example:
    @shared_task
    def x1(x, y):
        return x-y
        
from web import tasks
tasks.x1.delay(1,5)

But when multiple Celery objects are created in celery.py and different objects are used for different tasks, the object name needs to be specified at this time.
For example:
from web import tasks
\t
app1.tasks.x1.delay(1,5)
app2.tasks.x2.delay(1,5)
1.6 Task binding, recording log, retry
# Modify tasks.py file.
 
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
 
@app.task(bind=True)
def div(self, x, y):
    logger.info(('Executing task id {0.id}, args: {0.args!r}'
                 'kwargs: {0.kwargs!r}').format(self.request))
    try:
        result = x/y
    except ZeroDivisionError as e:
        raise self.retry(exc=e, countdown=5, max_retries=3) # When a ZeroDivisionError error occurs, retry every 5s, up to 3 times.
 
    return result

When the bind=True parameter is used, the parameters of the function change, and the parameter self is added. This is equivalent to programming a bound method into the div, and the context of the task can be obtained through self.

1.7 Enable task monitoring

Flower is a real-time monitoring tool officially recommended by Celery for monitoring the running status of Tasks and Workers. Flower provides the following features:

  • View Task list, history, parameters, start time, execution status, etc.
  • Cancel and terminate tasks
  • View Worker list and status
  • Remotely start, shut down, and restart Worker processes
  • Provide HTTP API to facilitate integration into operation and maintenance systems

Compared to viewing logs, Flower’s web interface is more user-friendly.

Flower’s supervisor management configuration file:

[program:flower]
command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555
directory=/opt/PyProjects/app
autostart=true
autorestart=true
startretries=3
user=derby
stdout_logfile=/var/logs/%(program_name)s.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=30
stderr_logfile=/var/logs/%(program_name)s-error.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3

celery interview summary

1. Celery is a simple, flexible, reliable system developed by python that can handle a large number of tasks. It can distribute tasks and perform scheduled tasks. Mostly used for time-consuming operations. For example, functions such as sending text messages and email can use Celery for task distribution.
2. The function description of @shared_task/@task decoration is this celery task, which will be added to the broker.
3. Function name.delay (parameter) will call and execute the task, and return the task ID.
4. You can go to backend to get the task status and results based on the task ID. AsyncResult(id=task ID, app=celery_app).get() gets the result of the task;
5. apply_async(args=[parameter], eta) sets the scheduled execution task, eta is the execution time of the scheduled task (utc time).
6. revoke() can cancel the task.