Celery is often used for web asynchronous tasks, scheduled tasks, etc.
Use redis as Celery’s “message broker/message middleware”.
Here is an example of using qq mailbox to delay sending emails through Flask-Mail.
pip install celery pip install redis pip install Flask-Mail
1. Use flask to send emails
Using Flask-Mail to send emails requires some configuration. The method of obtaining the QQ mailbox authorization code is as follows:
app = Flask(__name__) app.config['SECRET_KEY'] = 'top-secret!' # Flask-Mail configuration app.config['MAIL_SERVER'] = 'smtp.qq.com' app.config['MAIL_PORT'] = 465 # Enable/disable transport security layer encryption app.config['MAIL_USE_TLS'] = False # Enable/disable Secure Socket Layer encryption app.config['MAIL_USE_SSL'] = True app.config['MAIL_USERNAME'] = 'My QQ [email protected]' app.config['MAIL_PASSWORD'] = 'My QQ mailbox authorization code' app.config['MAIL_DEFAULT_SENDER'] = 'My QQ [email protected]' # Celery configuration app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' #Initialize extensions mail = Mail(app) @app.route("/send_mail") def index11(): # sender: sender recipients: recipients msg = Message('Hello', sender = app.config['MAIL_DEFAULT_SENDER'], recipients = ['Target [email protected]']) msg.body = "Email content sent from python--flask framework~" mail.send(msg)#Send the content of the Message class object return "Sent successfully"
Click into the QQ mailbox, click on the account in the settings, scroll down to enable the pop3 service to obtain the authorization code.
2. Delay sending emails
Defining celery tasks is basically the same as flask, except that it has the modifier @celery.task in front.
@celery.task def send_async_email(email_data): """Background task to send an email with Flask-Mail.""" msg = Message(email_data['subject'], sender=app.config['MAIL_DEFAULT_SENDER'], recipients=[email_data['to']]) msg.body = email_data['body'] with app.app_context(): mail.send(msg)
@app.route('/', methods=['GET', 'POST']) def index(): if request.method == 'GET': return render_template('index.html', email=session.get('email', '')) email = request.form['email'] session['email'] = email # send the email email_data = {<!-- --> 'subject': 'Hello from Flask', 'to': email, 'body': 'Content of email sent delayed by python--flask framework~' } if request.form['submit'] == 'Send': # send right away send_async_email.delay(email_data) print('here!--') flash('Sending email to {0}'.format(email)) else: # send in one minute send_async_email.apply_async(args=[email_data], countdown=60) flash('An email will be sent to {0} in one minute'.format(email)) return redirect(url_for('index'))
3. Generate asynchronous tasks with status information progress bars
# If bind is True, self will be passed to the decorated method. @celery.task(bind=True) def long_task(self): """Asynchronous tasks with progress bar and status report""" verb = ['in progress', 'preparing', 'currently', 'in progress', 'in progress'] adjective = ['full speed', 'effort', 'silently', 'seriously', 'quickly'] noun = ['Open', 'Start', 'Repair', 'Load', 'Check'] message = '' total = random.randint(10, 50) # Randomly pick a random number from 10 to 50 for i in range(total): selectnow = random.random() print(selectnow) # Splice the above three lsit to randomly generate some status descriptions if not message or selectnow < 0.25: message = '{0} {1} {2}...'.format(random.choice(verb), random.choice(adjective), random.choice(noun)) # Update Celery task status self.update_state(state='PROGRESS', meta={<!-- -->'current': i, 'total': total, 'status': message}) time.sleep(1) # Return dictionary return {<!-- -->'current': 100, 'total': 100, 'status': 'Task completed!', 'result': 42} @app.route('/longtask', methods=['POST']) def longtask(): task = long_task.apply_async() return jsonify({<!-- -->}), 202, {<!-- -->'Location': url_for('taskstatus', task_id=task.id)} @app.route('/status/<task_id>') def taskstatus(task_id): task = long_task.AsyncResult(task_id) # print(task.state) if task.state == 'PENDING': # When PENDING, if it keeps PENDING, it may be that celery is not turned on. response = {<!-- --> 'state': task.state, 'current': 0, 'total': 1, 'status': 'Pending...' } elif task.state != 'FAILURE': # When loading response = {<!-- --> 'state': task.state, 'current': task.info.get('current', 0), 'total': task.info.get('total', 1), 'status': task.info.get('status', '') } if 'result' in task.info: response['result'] = task.info['result'] else: #Output when error is reported response = {<!-- --> 'state': task.state, 'current': 1, 'total': 1, 'status': str(task.info), # this is the exception raised } return jsonify(response)
4. Complete code
File structure
--- current ---templates ---index.html --- asyn_001.py
This is asyn_001.py
import os import random import time from flask import Flask, request, render_template, session, flash, redirect, \ url_for, jsonify from flask_mail import Mail, Message from celery import celery app = Flask(__name__) app.config['SECRET_KEY'] = 'top-secret!' # Flask-Mail configuration app.config['MAIL_SERVER'] = 'smtp.qq.com' app.config['MAIL_PORT'] = 465 # Enable/disable transport security layer encryption app.config['MAIL_USE_TLS'] = False # Enable/disable Secure Sockets Layer encryption app.config['MAIL_USE_SSL'] = True app.config['MAIL_USERNAME'] = 'My QQ [email protected]' app.config['MAIL_PASSWORD'] = 'My QQ mailbox authorization code' app.config['MAIL_DEFAULT_SENDER'] = 'My QQ [email protected]' # Celery configuration app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' #Initialize extensions mail = Mail(app) @app.route("/send_mail") def index11(): # sender: sender recipients: recipients msg = Message('Hello', sender = app.config['MAIL_DEFAULT_SENDER'], recipients = ['Target [email protected]']) msg.body = "Email content sent from python--flask framework~" mail.send(msg)#Send the content of the Message class object return "Sent successfully" #InitializeCelery celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) @celery.task def send_async_email(email_data): """Background task to send an email with Flask-Mail.""" msg = Message(email_data['subject'], sender=app.config['MAIL_DEFAULT_SENDER'], recipients=[email_data['to']]) msg.body = email_data['body'] with app.app_context(): mail.send(msg) @app.route('/', methods=['GET', 'POST']) def index(): if request.method == 'GET': return render_template('index.html', email=session.get('email', '')) email = request.form['email'] session['email'] = email # send the email email_data = {<!-- --> 'subject': 'Hello from Flask', 'to': email, 'body': 'Content of email sent delayed by python--flask framework~' } if request.form['submit'] == 'Send': # send right away send_async_email.delay(email_data) print('here!--') flash('Sending email to {0}'.format(email)) else: # send in one minute send_async_email.apply_async(args=[email_data], countdown=60) flash('An email will be sent to {0} in one minute'.format(email)) return redirect(url_for('index')) # If bind is True, self will be passed to the decorated method. @celery.task(bind=True) def long_task(self): """Asynchronous tasks with progress bar and status report""" verb = ['in progress', 'preparing', 'currently', 'in progress', 'in progress'] adjective = ['full speed', 'effort', 'silently', 'seriously', 'quickly'] noun = ['Open', 'Start', 'Repair', 'Load', 'Check'] message = '' total = random.randint(10, 50) # Randomly pick a random number from 10 to 50 for i in range(total): selectnow = random.random() print(selectnow) # Splice the above three lsit to randomly generate some status descriptions if not message or selectnow < 0.25: message = '{0} {1} {2}...'.format(random.choice(verb), random.choice(adjective), random.choice(noun)) # Update Celery task status self.update_state(state='PROGRESS', meta={<!-- -->'current': i, 'total': total, 'status': message}) time.sleep(1) # Return dictionary return {<!-- -->'current': 100, 'total': 100, 'status': 'Task completed!', 'result': 42} @app.route('/longtask', methods=['POST']) def longtask(): task = long_task.apply_async() return jsonify({<!-- -->}), 202, {<!-- -->'Location': url_for('taskstatus', task_id=task.id)} @app.route('/status/<task_id>') def taskstatus(task_id): task = long_task.AsyncResult(task_id) # print(task.state) if task.state == 'PENDING': # When PENDING, if it keeps PENDING, it may be that celery is not turned on. response = {<!-- --> 'state': task.state, 'current': 0, 'total': 1, 'status': 'Pending...' } elif task.state != 'FAILURE': # When loading response = {<!-- --> 'state': task.state, 'current': task.info.get('current', 0), 'total': task.info.get('total', 1), 'status': task.info.get('status', '') } if 'result' in task.info: response['result'] = task.info['result'] else: #Output when error is reported response = {<!-- --> 'state': task.state, 'current': 1, 'total': 1, 'status': str(task.info), # this is the exception raised } return jsonify(response) if __name__ == '__main__': app.run(debug=True)
This is index.html
<html> <head> <title>Flask + Celery Example</title> <style> .progress {<!-- --> width: 100%; text-align: center; } </style> </head> <body> <h1>Flask + Celery example</h1> <h2>Example 1: Send asynchronous email</h2> {% for message in get_flashed_messages() %} <p style="color: red;">{<!-- -->{ message }}</p> {% endfor %} <form method="POST"> <p>Send test email to: <input type="text" name="email" value="{<!-- -->{ email }}"></p> <input type="submit" name="submit" value="Send"> <input type="submit" name="submit" value="Send in 1 minute"> </form> <h2>Example 2: Generate progress bar and status report</h2> <!--<button οnclick="start_long_task();">Start Long Calculation</button><br><br>--> <button id="start-bg-job">Start Long Calculation</button><br><br> <div id="progress"></div> <script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script> <script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script> <script> function start_long_task() {<!-- --> // add task status elements div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> & amp;nbsp; </div></div>'); $('#progress').append(div); // create a progress bar var nanobar = new Nanobar({<!-- --> bg: '#44f', target: div[0].childNodes[0] }); // send ajax POST request to start background job $.ajax({<!-- --> type: 'POST', url: '/longtask', success: function(data, status, request) {<!-- --> status_url = request.getResponseHeader('Location'); console.log("status_url", status_url,"nanobar", nanobar, "div[0]", div[0]) console.log("data", data) update_progress(status_url, nanobar, div[0]); }, error: function() {<!-- --> alert('Unexpected error'); } }); } function update_progress(status_url, nanobar, status_div) {<!-- --> // send GET request to status URL $.getJSON(status_url, function(data) {<!-- --> // update UI percent = parseInt(data['current'] * 100 / data['total']); nanobar.go(percent); $(status_div.childNodes[1]).text(percent + '%'); $(status_div.childNodes[2]).text(data['status']); if (data['state'] != 'PENDING' & amp; & amp; data['state'] != 'PROGRESS') {<!-- --> if ('result' in data) {<!-- --> //show result $(status_div.childNodes[3]).text('Result: ' + data['result']); } else {<!-- --> // something unexpected happened $(status_div.childNodes[3]).text('Result: ' + data['state']); } } else {<!-- --> //rerun in 2 seconds setTimeout(function() {<!-- --> update_progress(status_url, nanobar, status_div); }, 2000); } }); } $(function() {<!-- --> $('#start-bg-job').click(start_long_task); }); </script> </body> </html>
5. Start the task
Terminal cd to the directory where the current folder is located
Start the asyn_001 program to observe the execution of asynchronous tasks.
6. Problems encountered
After starting with 5 startup methods, the running code accidentally encountered an error like the picture below.
A quick fix is to make things single-threaded. To do this, set celery’s worker pool type to solo when starting the celery worker
celery -A your_proj worker -P solo -l info
Reference 1 A simple example of Celery implementing asynchronous tasks and scheduled tasks
Reference 2 Using Celery with Flask