Django+Celery+Flower implements asynchronous and scheduled tasks and their monitoring alarms

Original text: [Selected] Django + Celery + Flower implements asynchronous and scheduled tasks and their monitoring and alarming_django flower-CSDN blog

In this article, the original author explained with pictures and texts how to use Celery to perform asynchronous tasks and scheduled tasks in Django. I followed the operation sequence of this article to implement the asynchronous tasks very well.

I hope everyone will support the original author. I only found this high-quality article among many articles.

Original address: Django + Celery + Flower implement asynchronous and scheduled tasks and their monitoring and alarming | XieJava’s blog

Section 7 of the article is my experience as a newbie. Corrections and discussions are welcome.

Original author’s WeChat public account:

Web development using the Django framework is very fast and convenient, but the Django framework request/response is synchronous. However, in actual projects, we often encounter some time-consuming tasks that cannot return request results immediately, such as data crawling, sending emails, etc. If waiting for a long time is not good for the user experience, in this case, asynchronous implementation is required. Implemented, the response request is returned immediately, but the real time-consuming task is executed asynchronously in the background. The Django framework itself cannot implement asynchronous responses, but it can quickly implement asynchronous and scheduled tasks through Celery. This article will introduce how to implement asynchronous and scheduled tasks and their task monitoring alarms through Django + Celery + Flower.

There are two common types of tasks, one is asynchronous tasks, and the other is scheduled tasks (executed regularly or at a certain period). Celery supports both very well.

Celery is a distributed asynchronous message task queue developed based on python. It can easily implement asynchronous processing of tasks. If you need to use asynchronous tasks in your business scenario, you can consider using celery. Here are a few example scenarios available example of:

  • Asynchronous tasks: Submit time-consuming operation tasks to Celery for asynchronous execution, such as sending text messages/emails, message push, audio processing, etc.
  • Do a scheduled task, such as executing a crawler every day to crawl specified content.

When Celery executes tasks, it needs to receive and send task messages and store task results through a message middleware (Broker). Generally, rabbitMQ, Redis or other DB are used.

This article uses redis as message middleware and result storage. In the following cases of database monitoring task execution, the use of the database as result storage will be introduced.

1. Introducing Celery into Django

1. Installation library

1
2
3
pip install celery
pip install redis
pip install eventlet #In Windows environment, the eventlet package needs to be installed

2. Introduce celery

In the main project directory, create a new celary.py file with the following content:

1
2
3
4
5
6
7
8
9
10
11
12
13
import os
import django
from celery import celery
from django.conf import settings

#Set system environment variables, otherwise an error will be reported when starting celery
# taskproject is the current project name
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproject.settings')
django.setup()

celery_app = Celery('taskproject')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Add the following code to init.py in the main directory:

1
2
3
from .celery import celery_app

__all__ = ['celery_app']

3. Set the relevant parameters of celery in settings.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
###----Celery redis configuration-----###
# Broker configuration, using Redis as message middleware
BROKER_URL = 'redis://:[email protected]:6379/0'

# BACKEND configuration, using redis
CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/1'


CELERY_ACCEPT_CONTENT=['json']
CELERY_TASK_SERIALIZER='json'
# Result serialization scheme
CELERY_RESULT_SERIALIZER = 'json'

# Task result expiration time, seconds
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

# Time zone configuration
CELERY_TIMEZONE = 'Asia/Shanghai'

At this time, the basic configuration of Celery is completed, and tasks can be implemented and added.

2. Implement asynchronous tasks

1. Create tasks.py

Create corresponding task files tasks.py under the sub-application ( must be the name tasks.py, modification is not allowed)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import datetime
from time import sleep
from celery import shared_task
import logging
logger = logging.getLogger(__name__)


@shared_task()
def task1(x):
    for i in range(int(x)):
        sleep(1)
        logger.info('this is task1 ' + str(i))
    return x


@shared_task
def scheduletask1():
    now = datetime.datetime.now()
    logger.info('this is scheduletask ' + now.strftime("%Y-%m-%d %H:%M:%S"))
    return None

In tasks.py we define two tasks, which must be decorated with @shared_task, otherwise celery cannot manage them.

In order to facilitate execution, we call these two tasks through function methods through views and publish them using URLs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
views.py
from django.http import JsonResponse
from .import tasks
# Create your views here.


def runtask(request):
    x=request.GET.get('x')
    tasks.task1.delay(x)
    content= {'200': 'run task1 success!---' + str(x)}
    return JsonResponse(content)


def runscheduletask(request):
    tasks.scheduletask1.delay()
    content= {'200': 'success! '}
    return JsonResponse(content)

Add routes to urls for publishing

1
2
3
4
5
6
7
8
from django.urls import path

from taskapp import views

urlpatterns = [
    path('task', views.runtask),
    path('runscheduletask', views.runscheduletask),
]

Add the urls of the subprojects to the main urls of the project

2. Start celery

Before starting celery, you must first start the redis service, because celery is configured in settings to use redis as message middleware and result storage.
The command to start redis in windows environment is redis-server.exe redis.windows.conf

Start the celery worker in the console

1
celery -A taskproject worker -l debug -P eventlet

Start the django access url call task and see the asynchronous effect

3. View tasks

Use the console to check the execution of asynchronous tasks. You can see that the web URL quickly returns the response result. The background console has been executing asynchronous tasks.

3. Implement scheduled tasks

Celery is also very convenient to implement scheduled tasks

1. Define scheduler

You can implement scheduled tasks by adding the definition of scheduled tasks in settings.py

1
2
3
4
5
6
7
8
9
10
11
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'every_5_seconds': {
        #Task path
        'task': 'taskapp.tasks.scheduletask1',
        # Execute every 5 seconds
        'schedule': 5,
        'args': ()
    }
}

The scheduletask1 here is the task defined in tasks.py earlier. Of course, you can also define multiple scheduled tasks, such as adding a task1. Task1 has parameters. You can pass in the parameters in ‘args’: ()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CELERYBEAT_SCHEDULE = {
    'every_5_seconds': {
        #Task path
        'task': 'taskapp.tasks.scheduletask1',
        # Execute every 5 seconds
        'schedule': 5,
        'args': ()
    },
    'every_10_seconds': {
        #Task path
        'task': 'taskapp.tasks.task1',
        # Executed every 10 seconds, the parameter of task1 is 5
        'schedule': 10,
        'args': ([5])
    }
}

It is defined here that task1 is executed once every 10 seconds, and the parameter passed in is 5.

2. Start beat

You need to keep the worker process and open another console to start beat.

1
celery -A taskproject beat -l debug

3. View tasks

After starting the task, you can see that the logs printed on the console, task1 and scheduletask1, were executed regularly as planned.

3. Configure scheduled tasks through the database

Although the configuration of scheduled tasks can be achieved through the configuration of settings.py, it may not be practical enough for actual projects. A more engineering approach is to put the configuration of scheduled tasks in the database and configure it through the interface. Celery also provides good support for this, which requires the installation of the django-celery-beat plug-in. The usage process will be introduced below.

1. Install djiango-celery-beat

1
pip install django-celery-beat

2. Register djiango-celery-beat in APP

1
2
3
4
INSTALLED_APPS = [
....
'django_celery_beat',
]

3. Set the scheduler and time zone in settings.py

Shield the original scheduler in settings.py and add

1
CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'

Set the language, time zone, etc. in settings.py

1
2
3
4
5
6
7
LANGUAGE_CODE = 'zh-hans'

TIME_ZONE = 'Asia/Shanghai'

USE_I18N = True

USE_TZ = False

4. Perform database migration

1
python manage.py migrate django_celery_beat

5. Start woker and beta respectively

Start woker and beta on two consoles respectively

1
celery -A taskproject worker -l debug -P eventlet
1
celery -A taskproject beat -l debug

6. Start the django service and access the admin’s web management terminal

Visit http://localhost:8000/admin/ to see the management menu of periodic tasks. It is very convenient to manage scheduled tasks.

7. Configure scheduled tasks

Click “Interval”

Click “Add Interval” to add the configuration of the scheduled task and add a timer that executes every 5 seconds.

I saw a timer every 5 seconds

At this time, you can use this timer to create a new scheduling task. Select a periodic task and click “Add Periodic Task”

Fill in the task name and select the task that needs to be executed regularly

Because task1 requires parameters, the parameters will be set in the subsequent parameter settings.

After saving, you can see that a new scheduling task “Execute task1 every 5 seconds” has been added.

8. View scheduling effect

You can see information about scheduled task execution in both the woker and beta consoles, indicating that the task was successfully scheduled and executed.

4. Monitor task execution through Django’s web interface

It is not very convenient to monitor the execution of tasks on the console. It is best to be able to see the execution of tasks through the web interface, such as how many tasks are being executed, how many tasks have failed, etc. Celery can also do this, which is to write the task execution results to the database and display them through the web interface. The django-celery-results plug-in is used here. Django’s ORM can be used as result storage through plug-ins. The advantage of this is that we can directly view the task status through Django data, and at the same time, more operations can be formulated. Here is how to use ORM as result storage.

1. Install django-celery-results

1
pip install django-celery-results

2. Configure settings.py, register app

1
2
3
4
INSTALLED_APPS = (
...,
'django_celery_results',
)

3. Modify the backend configuration and change Redis to django-db

1
2
3
4
# BACKEND configuration, using redis
#CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/1'
# Use django orm as result storage
CELERY_RESULT_BACKEND = 'django-db' #Use django orm as result storage

4. Migrate database

1
python manage.py migrate django_celery_results

You can see that the tables related to django_celery_results were created

5. View tasks

After starting the Django service and executing asynchronous and scheduled tasks, you can see the execution of the tasks in the management interface, which tasks were executed, which tasks failed to execute, etc.

5. Monitor task execution through Flower

If you don’t want to monitor the execution of tasks through the Django management interface, you can also monitor the tasks through the Flower plug-in. FLower’s interface is richer and more comprehensive information can be monitored. The following introduces task monitoring through Flower.

1. Install flower

1
pip install flower

2. Start flower

1
celery -A taskproject flower --port-5566

3. Use flower for task monitoring

If we click on the failure, we can see the details of the execution failure. Here, an ‘a’ character is deliberately passed to the parameter of task1, causing it to report an error during execution. You can see that the error message of task execution is also displayed.

6. Implement automatic email alerts for task anomalies

Although monitoring can be done through the interface, we want more. People cannot stare at the interface every day. It would be great if we could automatically send email alerts if task execution fails. Of course there is no problem with this Celery.
Trigger email notifications when exceptions occur through hook programs.

1. Add hook program

The modifications to tasks.py are as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
twenty one
twenty two
twenty three
twenty four
25
26
27
28
29
30
31
32
33
34
35
36
import datetime
from time import sleep
from celery import shared_task
from celery import Task
from django.core.mail import send_mail
import logging
logger = logging.getLogger(__name__)

class MyHookTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        info=f'Task successful--0task id:{task_id}, arg:{args}, successful!'
        logger.info(info)
        send_mail('celery task monitoring', info, '[email protected]', ['[email protected]'])

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        info=f'Task failed--task id:{task_id}, arg:{args}, failed! erros: {exc}'
        logger.info(info)
        send_mail('celery task monitoring exception', info, '[email protected]', ['[email protected]'])

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id}, arg:{args}, retry! erros: {exc}')


@shared_task(base=MyHookTask, bind=True)
def task1(self,x):
    for i in range(int(x)):
        sleep(1)
        logger.info('this is task1 ' + str(i))
    return x


@shared_task
def scheduletask1():
    now = datetime.datetime.now()
    logger.info('this is scheduletask ' + now.strftime("%Y-%m-%d %H:%M:%S"))
    return None

2. Restart service

Turn off the work and beta services, and restart woker and beta on the two consoles respectively.

1
celery -A taskproject worker -l debug -P eventlet
1
celery -A taskproject beat -l debug

3. Verification effect

Send email notifications when tasks succeed or fail.

Task execution success notification

Task execution exception alarm notification

How to send emails with Django, see Send emails through Django_django Send emails-CSDN Blog

So far, this article has introduced Django + Celery + Flower to implement asynchronous and scheduled tasks and their monitoring alarms through several simple applications.

Task execution success notification

Task execution exception alarm notification

How to send emails with Django, see Send emails through Django_django Send emails-CSDN Blog

So far, this article has introduced Django + Celery + Flower to implement asynchronous and scheduled tasks and their monitoring alarms through several simple applications.

7. Practical experience

In this framework, the Django application plays the role of an HTTP application. The front end makes HTTP requests, and Django’s view processes the HTTP requests, and then generates tasks through tasks.task1.delay(x) and sends them to the redis database. Django only plays the role of processing requests, connecting to celery applications, and producing tasks (producers). The Django process will not have any printing related to task execution, so it is impossible for you to see the execution of tasks in Django. This is also the difficulty of debugging.

The real consumer is the celery service executed through the command line, as follows:

celery -A taskproject worker -l debug -P eventlet

If you only start the django application and do not start the celery service, then this task will not be consumed. There are always multiple tasks (initiated in Django view) in the queue with key celery in the redis database. If the celery service is not enabled, these tasks will not be consumed. As shown below:

If the celery service is enabled, celery will consume the tasks in the queue and then generate the key celery-task-meta-taskid.

So celery is an independent consumer process. It has nothing to do with the Django process (the execution of the task has nothing to do with the Django process).

Practical experience is inevitably one-sided, and corrections and corrections are welcome.

Original author’s WeChat public account