Flask framework configuration Celery-[2]: Save the files uploaded by the front end through asynchronous tasks, and process the uploaded files asynchronously

1. General directory of the project

It mainly introduces the configuration of celery. Everyone knows the relevant configuration of flask by default.

flask-object

|–apps

|–user

views.py

__init__.py

|–celery_task

__init__.py

asycn_task.py

celery.py

celeryconfig.py

check_task.py

scheduler_task.py

app.py

Dependency packages:

celery==4.4.7
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0
Werkzeug==2.0.2

2. Celery project configuration

1. Celery related configuration: celeryconfig.py

Celery uses json as the serialization tool by default. To operate files uploaded by flask, you need to change to pickle.

from celery.schedules import crontab
from datetime import timedelta
'''
Parameter analysis:
accept_content: Whitelist of allowed content types/serializers, if a message is received that is not in this list, the message will be discarded with an error, default is json only;
task_serializer: a string identifying the default serialization method to use, the default value is json;
result_serializer: result serialization format, the default value is json;
timezone: Configure Celery to use a custom time zone;
enable_utc: Enable the date and time in the message, which will be converted to use UTC time zone and used in conjunction with timezone. When set to false, the system local time zone will be used.
result_expires: The survival time of asynchronous task results
beat_schedule: Set up scheduled tasks
'''
#Manually register celery's asynchronous tasks: find the modules where all celery's asynchronous tasks are located and write them as strings
task_module = [
    'celery_task.async_task', # Write the import path of the task module. This module mainly writes the method of asynchronous tasks.
    'celery_task.scheduler_task', # Write the import path of the task module. This module mainly writes the method of scheduled tasks.
]

#celery's configuration
config = {
    "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:[email protected]:6379/1' When there is a password, 123456 is the password
    "result_backend" : 'redis://127.0.0.1:6379/1',
    "task_serializer" : 'pickle', #json format supports fewer data formats, change to pickle
    "result_serializer" : 'pickle', #json format supports fewer data formats, change to pickle
    "accept_content" : ['pickle'], #json format supports fewer data formats, change to pickle
    "timezone" : 'Asia/Shanghai',
    "enable_utc" : False,
    "result_expires" : 1*60*60,
    "beat_schedule" : { #Scheduled task configuration
            # Name it as you like
            'add-func-30-seconds': {
                # Execute the addy function under add_task
                'task': 'celery_task.scheduler_task.add_func', # Import path of task function, from celery_task.scheduler_task import add_func
                # Execute every 10 seconds
                'schedule': timedelta(seconds=30),
                #Parameters passed by add function
                'args': (10, 21)
            },
            # The name can be chosen as desired
            'add-func-5-minutes': {
                'task': 'celery_task.scheduler_task.add_func', # Import path of task function, from celery_task.scheduler_task import add_func
                # Parameters not passed by crontab default to meaning every day. For example, here is the task to be executed every 5 minutes every day, every month, every year.
                'schedule': crontab(minute='5'), # Execute at the previous time point, execute the task at the 5th minute of every hour, change it to hours, minutes, seconds, which is the hour, minute and second of the day.
                'args': (19, 22) # Parameters required for scheduled tasks
            },
            #Cache user data into cache
            'cache-user-func': {
                'task': 'celery_task.scheduler_task.cache_user_func',
                #Import task function: from celery_task.scheduler_task import cache_user_func
                'schedule': timedelta(minutes=1), # Execute once every 1 minute to cache user messages in the cache
            }
        }
}

2. Create celery object: celery.py

from celery import Celery,Task
from .celeryconfig import config,task_module
importsys
import os
'1. Add the flask project path to the system environment variable'
project_path = os.path.dirname(os.path.dirname(__file__))
sys.path.append(project_path)

'''
2. Create celery application object
  'task' can be the name of the celery object, used to distinguish celery objects
  Broker is the designated message middleware
  backend is the storage location of the specified task results.
  include is to manually specify the location of the module where the asynchronous task is located.
'''
#Create celery asynchronous object
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
#Import some basic configuration
celery.conf.update(**config)

'3. Add the flask application context to all celery tasks, and you can call objects in flask in celery asynchronous tasks'
class ContextTask(celery.Task):
    def __call__(self, *args, **kwargs):
        from apps import create_app
        app = create_app()
        with app.app_context():
            return self.run(*args, **kwargs)
celery.Task = ContextTask

3. Asynchronous task module: async_task.py

import time
# Import celery object app
from celery_task.celery import celery
import os
#Import the module for sending emails
from base.email_ import send_email
from ext import cache


'''
All asynchronous tasks:
1. There is no return value, @app.task(ignore_result=True)
2. For tasks with return values, @app.task defaults to (ignore_result=False)
'''


# No return value, disable the result backend
@celery.task
def send_email_task(receiver_email,code): # At this time, you can send the email directly and reduce the IO operation of the database.
    '''
    :param email: The email address for receiving messages, the user’s email address
    :return:
    '''
    # Enable threads to send emails, it is best to add a thread pool here
    ret = send_email(
        receiver_email=receiver_email,
        subject='Login verification code',
        message=f'Your verification code is: {code}',
    )
    return {'result':ret,receiver_email:code}

@celery.task
def cache_user_task():
    from apps.user.models import UserModel
    user = UserModel.query.all()
    lis = []
    for u in user:
        id = u.id
        name = u.name
        dic = {'id':id,'name':name}
        lis.append(dic)
        print(dic)
    cache.set('all-user-data',lis)
    return {'code':200,'msg':'Query data successfully'}

@celery.task
def test_check_fun():
    print('Time-consuming start: 30 seconds')
    time.sleep(30)
    print('Time-consuming ended...')
    return {'result':'Return value after asynchronous task execution','time':30}

@celery.task(ignore_result=True)
def save_file_task(file_bytes,file_path,delete_file_path=None):
    '''
    file_bytes: The flask call uses the passed file byte stream
    file_path: the absolute path where the file is saved
    1. All logic related to path splicing and file name duplication is handled in the view function.
    2. Here, we are only responsible for saving files to the system, regardless of other logic.
    '''
    #1. Check whether the path where the file is located exists. If it does not exist, create it.
    file_path_dir = os.path.dirname(file_path)
    if not os.path.exists(file_path_dir):
        os.makedirs(file_path_dir)

    #2. Save the file
    with open(file_path, 'wb + ') as f:
        f.write(file_bytes)

    #3. Delete old files
    if delete_file_path:
        #If you want an avatar picture, a user only needs to save one avatar file, and delete the old avatar file.
        try:
            os.remove(delete_file_path)
        except Exception as _:
            print('Deletion failed')
            pass

4. Scheduled task module: scheduler_task.py

from celery_task.celery import celery
import time
'''
All scheduled tasks
'''

# There is a return value, which can be obtained from the result backend
@celery.task
def add_func(a, b):
    print('The addition function was executed', a + b)
    return a + b


# No return value is required, disable the result backend
@celery.task(ignore_result=True)
def cache_user_func():
    print('all')


5. Verify whether the task is successful: check_task.py

from celery.result import AsyncResult
from celery_task.celery import celery

'''Verify the execution status of the task'''


def check_task_status(task_id):
    '''
    Task execution status:
        PENDING: waiting for execution
        STARTED: Start execution
        RETRY: Retry execution
        SUCCESS: execution successful
        FAILURE :Execution failed
    :param task_id:
    :return:
    '''
    result = AsyncResult(id=task_id, app=celery)
    dic = {
        'type': result.status,
        'msg': '',
        'data': None,
        'code': 400
    }
    if result.status == 'PENDING':
        dic['msg'] = 'Task waiting'
    elif result.status == 'STARTED':
        dic['msg'] = 'Task begins execution'
    elif result.status == 'RETRY':
        dic['msg'] = 'Task retry execution'
    elif result.status == 'FAILURE':
        dic['msg'] = 'Task execution failed'
    elif result.status == 'SUCCESS':
        result = result.get()
        dic['msg'] = 'Task execution successful'
        dic['data'] = result
        dic['code'] = 200
        # result.forget() # Delete the result
        # async.revoke(terminate=True) # No matter what time it is now, terminate
        # async.revoke(terminate=False) # If the task has not started execution, it can be terminated.
    return dic

3. View function in flask

Write a view function to receive the avatar picture uploaded by the user, save the picture to the system, and put the saved operation into an asynchronous task.

class UserOneResource(Resource):
   
    def put(self,id):
        '''
        Interface for users to update their avatar
        '''
        file = request.files.get('picture')
        if not file:
            return NewResponse(error='Please bring avatar file')
        user = models.UserModel.query.filter_by(id=id).first()

        if not user:
            return NewResponse(error='User does not exist')
        # print(file.filename,'=============')
        _,suffix = file.filename.split('.')
        #1. Create file name using uuid
        file_name = str(uuid.uuid4()) + f'-{id}' + '.' + suffix
        file_path = os.path.join(STATIC_PATH,'picture',file_name).replace('','/')
        #2. Read out the old avatar path
        delete_file_path = None
        if user.picture:
            picture = user.picture
            if picture[0] in ['','/']:
                picture = picture[1:]
            delete_file_path = os.path.join(BASE_PATH,picture).replace('','/')
            print(delete_file_path)
        #3. Read file bytecode
        file_bytes = file.read()
        #4. Call the asynchronous save file task
        res = save_file_task.delay(file_bytes,file_path,delete_file_path)
        task_id = res.id
        #5. Save the new path of the avatar to the database

        picture = file_path.split('static/')[-1]
        picture = 'static/' + picture.replace('','/')
        user.picture = picture
        db.session.add(user)
        db.session.commit()
        return NewResponse(data={'task_id':task_id},msg='Operation successful')

4. Start the project

1. Start celery
Windows system requires the help of: eventlet

    celery -A celery_task.celery worker -l info -P eventlet

linux system:

    celery -A celery_task.celery worker -l info


2. Start the scheduled task

    celery -A celery_task beat -l info