Django framework integrates Celery asynchronously-[2]: Django integrates celery, ready to use, and can operate Django’s ORM and other functions

1. Project structure and dependencies

study_celery

| –user

|– models.py

|–views.py

|–urls.py

|–celery_task

|–__init__.py

|–async_task.py

|– celery.py

| –check_task.py

| –config.py

| –scheduler_task.py

| –study_celery

| –settings.py

| –manage.py

Dependency: redis database

redis==4.6.0
Django==3.2
django-redis==5.3.0
celery==5.3.1

2. Celery framework configuration details

1. Configuration file

config.py

from celery.schedules import crontab
from datetime import timedelta
'''
Parameter analysis:
accept_content: Whitelist of allowed content types/serializers, if a message is received that is not in this list, the message will be discarded with an error, default is json only;
task_serializer: a string identifying the default serialization method to use, the default value is json;
result_serializer: result serialization format, the default value is json;
timezone: Configure Celery to use a custom time zone;
enable_utc: Enable the date and time in the message, which will be converted to use UTC time zone and used in conjunction with timezone. When set to false, the system local time zone will be used.
result_expires: The survival time of asynchronous task results
beat_schedule: Set up scheduled tasks
'''
#Manually register celery's asynchronous tasks: find the modules where all celery's asynchronous tasks are located and write them as strings
task_module = [
    'celery_task.async_task', # Write the import path of the task module. This module mainly writes the method of asynchronous tasks.
    'celery_task.scheduler_task', # Write the import path of the task module. This module mainly writes the method of scheduled tasks.
]

#celery's configuration
config = {
    "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:[email protected]:6379/1' When there is a password, 123456 is the password
    "result_backend" : 'redis://127.0.0.1:6379/1',
    "task_serializer" : 'json',
    "result_serializer" : 'json',
    "accept_content" : ['json'],
    "timezone" : 'Asia/Shanghai',
    "enable_utc" : False,
    "result_expires" : 1*60*60,
    "beat_schedule" : { #Scheduled task configuration
            # Name it as you like
            'add-func-30-seconds': {
                # Execute the addy function under add_task
                'task': 'celery_task.scheduler_task.add_func', # Import path of task function, from celery_task.scheduler_task import add_func
                # Execute every 10 seconds
                'schedule': timedelta(seconds=30),
                #Parameters passed by add function
                'args': (10, 21)
            },
            # The name can be chosen as desired
            'add-func-5-minutes': {
                'task': 'celery_task.scheduler_task.add_func', # Import path of task function, from celery_task.scheduler_task import add_func
                # Parameters not passed by crontab default to meaning every day. For example, here is the task to be executed every 5 minutes every day, every month, every year.
                'schedule': crontab(minute='5'), # Execute at the previous time point, execute the task at the 5th minute of every hour, change it to hours, minutes, seconds, which is the hour, minute and second of the day.
                'args': (19, 22) # Parameters required for scheduled tasks
            },
            #Cache user data into cache
            'cache-user-func': {
                'task': 'celery_task.scheduler_task.cache_user_func',
                #Import task function: from celery_task.scheduler_task import cache_user_func
                'schedule': timedelta(minutes=1), # Execute every 1 minute to cache user messages in cache
            }
        }
}

2. Celery object creation

celery.py

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
from .config import config,task_module

# Generate celery objects, 'task' is equivalent to key, used to distinguish celery objects
# broker specifies message processing, and backend specifies the storage location of the result backend. The include parameter needs to specify the task module.
app = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
app.conf.update(**config)

3. Asynchronous task module

async_task.py

'1. Because you need to use the content in django, you need to configure the django environment'
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#study_celery/settings.py in the root directory
import django
django.setup()

# Import celery object app
from celery_task.celery import app
#Import the email sending module that comes with django
from django.core.mail import send_mail
import threading
from study_celery import settings
from apps.user.models import UserModel
'''
2. Asynchronous tasks
Do not save the function return value, @app.task(ignore_result=True)
The task that saves the function return value, @app.task
'''

#No return value, disable the result backend
@app.task
def send_email_task(email,code): # At this time, you can send the email directly and reduce the IO operation of the database.
    '''
    :param email: The email address for receiving messages, the user’s email address
    :return:
    '''
    # Enable threads to send emails, it is best to add a thread pool here
    t = threading.Thread(
        target=send_mail,
        args=(
            "Verification code obtained before logging in", # Email title
            'Click this email to activate your account, otherwise you will not be able to log in', # After passing the value to the html_message parameter, the parameter information will become invalid.
            settings.EMAIL_HOST_USER, # Email address used to send emails
            [email], # The email address for receiving emails, you can write multiple
         ),
         # The string defined in html_message is information in HTML format. It can be written in an html file, copied and placed in the string.
         kwargs={
                'html_message': f"<p></p> <p>Verification code: {code}</p>"
         }
        )
    t.start()
    return {'email':email,'code':code}

@app.task
def search_user_task():
    users = UserModel.objects.all()
    lis = []
    for user in users:
        dic = {'id':user.id,'name':user.name}
        lis.append(dic)
    return {'users':lis}

4. Scheduled task module

scheduler_task.py

'1. Because you need to use the content in django, you need to configure the django environment '
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#study_celery/settings.py in the root directory
import django
django.setup()

from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict

'2. Scheduled tasks'
#There is a return value, which can be obtained from the result backend
@app.task
def add_func(a,b):
    print('The addition function was executed')
    cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a + b})
    return a + b

#No return value required, disable the result backend
@app.task(ignore_result=True)
def cache_user_func():
    user = user_models.UserModel.objects.all()
    user_dict = {}
    for obj in user:
        user_dict[obj.account] = model_to_dict(obj)
    cache.set('all-user-data',user_dict,timeout=35*60)



5. Detect task completion status

check_task.py

from celery.result import AsyncResult
from celery_task.celery import app
'''Verify the execution status of the task'''

def check_task_status(task_id):
    '''
    Task execution status:
        PENDING: waiting for execution
        STARTED: Start execution
        RETRY: Retry execution
        SUCCESS: execution successful
        FAILURE :Execution failed
    :param task_id:
    :return:
    '''
    result = AsyncResult(id=task_id, app=app)
    dic = {
        'type':result.status,
        'msg':'',
        'data':'',
        'code':400
    }
    if result.status == 'PENDING':
       dic['msg'] = 'Task waiting'
    elif result.status == 'STARTED':
        dic['msg'] = 'Task begins execution'
    elif result.status == 'RETRY':
        dic['msg']='Try task execution again'
    elif result.status =='FAILURE':
        dic['msg'] = 'Task execution failed'
    elif result.status == 'SUCCESS':
        result = result.get()
        dic['msg'] = 'Task execution successful'
        dic['data'] = result
        dic['code'] = 200
        # result.forget() # Delete the result
        # async.revoke(terminate=True) # No matter what time it is now, terminate
        # async.revoke(terminate=False) # If the task has not started execution, it can be terminated.
    return dic


3. Configuration of django project

1.settings.py

Module registration

#appRegistration
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'user.apps.UserConfig',
]

#cachecaching
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/2",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 1000}
            # "PASSWORD": "123",
        },
        'TIMEOUT':30*60 #Cache expiration time
    }
}

#Mail configuration
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com' # If it is 163, change it to smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '[email protected]' # Email account for sending emails
EMAIL_HOST_PASSWORD = 'xxx' # Authorization code, obtained when smtp service is enabled in the settings of each mailbox
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# The email received in this way will be displayed like this at the recipient
# DEFAULT_FROM_EMAIL = '2333<'[email protected]>'
EMAIL_USE_SSL = True # use ssl
# EMAIL_USE_TLS = False # Use tls
# EMAIL_USE_SSL and EMAIL_USE_TLS are mutually exclusive, that is, only one can be True

2. Methods about view functions

1. Obtain the cache object and operate the cache

from django.core.cache import cache

cache.set(key,value)

cache.get(key)

2. Execute asynchronous tasks

from celery_task.async_task import send_email_task

res = send_email_task.delay(‘[email protected]’,’23456′)

task_id = res.id #Get the id of the asynchronous task. You can use this id to get the running status of the task.

from celery_task.async_task import search_user_task
res = search_user_task.delay()

4. Start the project: execute in the project root directory

django project

python manage.py runserver

celery framework

#windowssystem

celery -A celery_task.celery worker -l info -P eventlet

#linuxsystem

celery -A celery_task.celery worker -l info

Start scheduled task

celery -A celery_task beat -l info

5. Code cloud address

Django configuration celery: Django configuration uses celery, django uses celery, django + celeryicon-default.png?t=N7T8https://gitee.com/liuhaizhang/django- configuration-celery