Celery combines flask to complete asynchronous tasks and scheduled tasks

Celery is often used for web asynchronous tasks, scheduled tasks, etc.
Use redis as Celery’s “message broker/message middleware”.
Here is an example of using qq mailbox to delay sending emails through Flask-Mail.

pip install celery
pip install redis
pip install Flask-Mail



1. Use flask to send emails

Using Flask-Mail to send emails requires some configuration. The method of obtaining the QQ mailbox authorization code is as follows:

app = Flask(__name__)
app.config['SECRET_KEY'] = 'top-secret!'

# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.qq.com'
app.config['MAIL_PORT'] = 465
# Enable/disable transport security layer encryption
app.config['MAIL_USE_TLS'] = False
# Enable/disable Secure Socket Layer encryption
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = 'My QQ [email protected]'
app.config['MAIL_PASSWORD'] = 'My QQ mailbox authorization code'
app.config['MAIL_DEFAULT_SENDER'] = 'My QQ [email protected]'

# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'


#Initialize extensions
mail = Mail(app)

@app.route("/send_mail")
def index11():
    # sender: sender recipients: recipients
   msg = Message('Hello', sender = app.config['MAIL_DEFAULT_SENDER'], recipients = ['Target [email protected]'])
   msg.body = "Email content sent from python--flask framework~"
   mail.send(msg)#Send the content of the Message class object
   return "Sent successfully"

Click into the QQ mailbox, click on the account in the settings, scroll down to enable the pop3 service to obtain the authorization code.






2. Delay sending emails

Defining celery tasks is basically the same as flask, except that it has the modifier @celery.task in front.

@celery.task
def send_async_email(email_data):
    """Background task to send an email with Flask-Mail."""
    msg = Message(email_data['subject'],
                  sender=app.config['MAIL_DEFAULT_SENDER'],
                  recipients=[email_data['to']])
    msg.body = email_data['body']
    with app.app_context():
        mail.send(msg)
@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html', email=session.get('email', ''))
    email = request.form['email']
    session['email'] = email

    # send the email
    email_data = {<!-- -->
        'subject': 'Hello from Flask',
        'to': email,
        'body': 'Content of email sent delayed by python--flask framework~'
    }
    if request.form['submit'] == 'Send':
        # send right away
        send_async_email.delay(email_data)
        print('here!--')
        flash('Sending email to {0}'.format(email))
    else:
        # send in one minute
        send_async_email.apply_async(args=[email_data], countdown=60)
        flash('An email will be sent to {0} in one minute'.format(email))

    return redirect(url_for('index'))

3. Generate asynchronous tasks with status information progress bars

# If bind is True, self will be passed to the decorated method.
@celery.task(bind=True)
def long_task(self):
    """Asynchronous tasks with progress bar and status report"""
    verb = ['in progress', 'preparing', 'currently', 'in progress', 'in progress']
    adjective = ['full speed', 'effort', 'silently', 'seriously', 'quickly']
    noun = ['Open', 'Start', 'Repair', 'Load', 'Check']
    message = ''
    total = random.randint(10, 50) # Randomly pick a random number from 10 to 50
    for i in range(total):
        selectnow = random.random()
        print(selectnow)
        # Splice the above three lsit to randomly generate some status descriptions
        if not message or selectnow < 0.25:
            message = '{0} {1} {2}...'.format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
        # Update Celery task status
        self.update_state(state='PROGRESS',
                          meta={<!-- -->'current': i, 'total': total,
                                'status': message})
        time.sleep(1)
        # Return dictionary
    return {<!-- -->'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}
               



@app.route('/longtask', methods=['POST'])
def longtask():
    task = long_task.apply_async()
    return jsonify({<!-- -->}), 202, {<!-- -->'Location': url_for('taskstatus', task_id=task.id)}



@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = long_task.AsyncResult(task_id)
    # print(task.state)
    if task.state == 'PENDING':
        # When PENDING, if it keeps PENDING, it may be that celery is not turned on.
        response = {<!-- -->
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        # When loading
        response = {<!-- -->
            'state': task.state,
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1),
            'status': task.info.get('status', '')
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        #Output when error is reported
        response = {<!-- -->
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info), # this is the exception raised
        }
    return jsonify(response)

4. Complete code

File structure

--- current
    ---templates
        ---index.html
    --- asyn_001.py
This is asyn_001.py
import os
import random
import time
from flask import Flask, request, render_template, session, flash, redirect, \
    url_for, jsonify
from flask_mail import Mail, Message
from celery import celery


app = Flask(__name__)
app.config['SECRET_KEY'] = 'top-secret!'

# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.qq.com'
app.config['MAIL_PORT'] = 465
# Enable/disable transport security layer encryption
app.config['MAIL_USE_TLS'] = False
# Enable/disable Secure Sockets Layer encryption
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = 'My QQ [email protected]'
app.config['MAIL_PASSWORD'] = 'My QQ mailbox authorization code'
app.config['MAIL_DEFAULT_SENDER'] = 'My QQ [email protected]'

# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'


#Initialize extensions
mail = Mail(app)

@app.route("/send_mail")
def index11():
    # sender: sender recipients: recipients
   msg = Message('Hello', sender = app.config['MAIL_DEFAULT_SENDER'], recipients = ['Target [email protected]'])
   msg.body = "Email content sent from python--flask framework~"
   mail.send(msg)#Send the content of the Message class object
   return "Sent successfully"


#InitializeCelery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)


@celery.task
def send_async_email(email_data):
    """Background task to send an email with Flask-Mail."""
    msg = Message(email_data['subject'],
                  sender=app.config['MAIL_DEFAULT_SENDER'],
                  recipients=[email_data['to']])
    msg.body = email_data['body']
    with app.app_context():
        mail.send(msg)

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html', email=session.get('email', ''))
    email = request.form['email']
    session['email'] = email

    # send the email
    email_data = {<!-- -->
        'subject': 'Hello from Flask',
        'to': email,
        'body': 'Content of email sent delayed by python--flask framework~'
    }
    if request.form['submit'] == 'Send':
        # send right away
        send_async_email.delay(email_data)
        print('here!--')
        flash('Sending email to {0}'.format(email))
    else:
        # send in one minute
        send_async_email.apply_async(args=[email_data], countdown=60)
        flash('An email will be sent to {0} in one minute'.format(email))

    return redirect(url_for('index'))



# If bind is True, self will be passed to the decorated method.
@celery.task(bind=True)
def long_task(self):
    """Asynchronous tasks with progress bar and status report"""
    verb = ['in progress', 'preparing', 'currently', 'in progress', 'in progress']
    adjective = ['full speed', 'effort', 'silently', 'seriously', 'quickly']
    noun = ['Open', 'Start', 'Repair', 'Load', 'Check']
    message = ''
    total = random.randint(10, 50) # Randomly pick a random number from 10 to 50
    for i in range(total):
        selectnow = random.random()
        print(selectnow)
        # Splice the above three lsit to randomly generate some status descriptions
        if not message or selectnow < 0.25:
            message = '{0} {1} {2}...'.format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
        # Update Celery task status
        self.update_state(state='PROGRESS',
                          meta={<!-- -->'current': i, 'total': total,
                                'status': message})
        time.sleep(1)
        # Return dictionary
    return {<!-- -->'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}
               



@app.route('/longtask', methods=['POST'])
def longtask():
    task = long_task.apply_async()
    return jsonify({<!-- -->}), 202, {<!-- -->'Location': url_for('taskstatus', task_id=task.id)}



@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = long_task.AsyncResult(task_id)
    # print(task.state)
    if task.state == 'PENDING':
        # When PENDING, if it keeps PENDING, it may be that celery is not turned on.
        response = {<!-- -->
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        # When loading
        response = {<!-- -->
            'state': task.state,
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1),
            'status': task.info.get('status', '')
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        #Output when error is reported
        response = {<!-- -->
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info), # this is the exception raised
        }
    return jsonify(response)


if __name__ == '__main__':
    app.run(debug=True)



This is index.html
<html>
  <head>
    <title>Flask + Celery Example</title>
    <style>
        .progress {<!-- -->
            width: 100%;
            text-align: center;
        }
    </style>
  </head>
  <body>
    <h1>Flask + Celery example</h1>
    <h2>Example 1: Send asynchronous email</h2>
    {% for message in get_flashed_messages() %}
    <p style="color: red;">{<!-- -->{ message }}</p>
    {% endfor %}
    <form method="POST">
      <p>Send test email to: <input type="text" name="email" value="{<!-- -->{ email }}"></p>
      <input type="submit" name="submit" value="Send">
      <input type="submit" name="submit" value="Send in 1 minute">
    </form>
    
    <h2>Example 2: Generate progress bar and status report</h2>
    <!--<button οnclick="start_long_task();">Start Long Calculation</button><br><br>-->
    <button id="start-bg-job">Start Long Calculation</button><br><br>
    <div id="progress"></div>

    <script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
    <script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
    <script>
        function start_long_task() {<!-- -->
            // add task status elements
            div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> & amp;nbsp; </div></div>');
            $('#progress').append(div);

            // create a progress bar
            var nanobar = new Nanobar({<!-- -->
                bg: '#44f',
                target: div[0].childNodes[0]
            });

            // send ajax POST request to start background job
            $.ajax({<!-- -->
                type: 'POST',
                url: '/longtask',
                success: function(data, status, request) {<!-- -->
                    status_url = request.getResponseHeader('Location');
                    console.log("status_url", status_url,"nanobar", nanobar, "div[0]", div[0])
                    console.log("data", data)
                    update_progress(status_url, nanobar, div[0]);
                },
                error: function() {<!-- -->
                    alert('Unexpected error');
                }
            });
        }
        function update_progress(status_url, nanobar, status_div) {<!-- -->
            // send GET request to status URL
            $.getJSON(status_url, function(data) {<!-- -->
                // update UI
                percent = parseInt(data['current'] * 100 / data['total']);
                nanobar.go(percent);
                $(status_div.childNodes[1]).text(percent + '%');
                $(status_div.childNodes[2]).text(data['status']);
                if (data['state'] != 'PENDING' & amp; & amp; data['state'] != 'PROGRESS') {<!-- -->
                    if ('result' in data) {<!-- -->
                        //show result
                        $(status_div.childNodes[3]).text('Result: ' + data['result']);
                    }
                    else {<!-- -->
                        // something unexpected happened
                        $(status_div.childNodes[3]).text('Result: ' + data['state']);
                    }
                }
                else {<!-- -->
                    //rerun in 2 seconds
                    setTimeout(function() {<!-- -->
                        update_progress(status_url, nanobar, status_div);
                    }, 2000);
                }
            });
        }
        $(function() {<!-- -->
            $('#start-bg-job').click(start_long_task);
        });
    </script>
  </body>
</html>

5. Start the task

Terminal cd to the directory where the current folder is located

Start the asyn_001 program to observe the execution of asynchronous tasks.

6. Problems encountered

After starting with 5 startup methods, the running code accidentally encountered an error like the picture below.
A quick fix is to make things single-threaded. To do this, set celery’s worker pool type to solo when starting the celery worker

celery -A your_proj worker -P solo -l info

Reference 1 A simple example of Celery implementing asynchronous tasks and scheduled tasks
Reference 2 Using Celery with Flask