Notice:
Routing concepts like topics and fanout are not available for all transports, see the Transport Comparison Table.
Basics
auto-routing
The easiest way to route is to use the task_create_missing_queues
setting (enabled by default).
With this setting, a named queue not yet defined in task_queues
will be created automatically. This makes doing simple routing tasks very easy.
Say you have two servers, x and y that handle general tasks, and a server z that handles only feed
related tasks. You can use this configuration:
task_routes = {<!-- -->'feed.tasks.import_feed': {<!-- -->'queue': 'feeds'}}
Using this route causes the import feed task to be routed to the feeds
queue, while all other tasks will be routed to the default queue (named celery
for historical reasons).
Alternatively, you can use glob
pattern matching, or even regular expressions, to match all tasks in the feed.tasks
namespace:
app.conf.task_routes = {<!-- -->'feed.tasks.*': {<!-- -->'queue': 'feeds'}}
If the order of matching patterns is important, you should declare routes in the form of a list of items:
task_routes = ([ ('feed.tasks.*', {<!-- -->'queue': 'feeds'}), ('web.tasks.*', {<!-- -->'queue': 'web'}), (re.compile(r'(video|image)\.tasks\..*'), {<!-- -->'queue': 'media'}), ],)
Notice:
The task_routes
setting can be a dictionary, or a list of route objects, so in the above case you need to declare task_routes
as a tuple containing the list.
Once the router is installed, you can start server z dedicated to the feeds
feed queue:
user@z:/$ celery -A proj worker -Q feeds
You can declare as many queues as you need, so you can also have your server handle messages from the default queue:
user@z:/$ celery -A proj worker -Q feeds,celery
Modify the name of the default queue
You can change the name of the default queue with the following configuration:
app.conf.task_default_queue = 'default'
How the queue is defined
The point of this feature is to hide the complex AMQP
protocol from users with only basic needs. But – you might still be interested in how the queue is declared.
A queue named video
will be created with the following configuration:
{'exchange': 'video',
'exchange_type': 'direct',
'routing_key': 'video'}
Non AMQP
backends like Redis or SQS do not support message exchanges, so they require the message exchange to have the same name as the queue. Using this design allows it to work well with backends that don’t eat message exchanges.
Manual routing
If you have two servers x and y for routine tasks, and another server z for processing news source-related tasks, you can use the following configuration:
from kombu import Queue app.conf.task_default_queue = 'default' app.conf.task_queues = ( Queue('default', routing_key='task.#'), Queue('feed_tasks', routing_key='feed.#'), ) task_default_exchange = 'tasks' task_default_exchange_type = 'topic' task_default_routing_key = 'task.default'
task_queue
is a list of queue instances. If you do not set a message exchange or exchange type for a key, this information will be taken from the task_default_exchange
and task_default_exchange_type
configurations.
To route a task to the feed_tasks
queue, you can add an entry to the task_routes
configuration:
task_routes = { 'feeds.tasks.import_feed': { 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, }
You can override this routing behavior with the Task.apply_async()
method or the routing_key
parameter of the send_task()
method:
>>> from feeds.tasks import import_feed >>> import_feed.apply_async(args=['http://cnn.com/rss'], ... queue='feed_tasks', ...routing_key='feed.import')
To make server z only get messages from the feed_tasks
queue, you can use the -Q
option when starting the worker:
user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h
Servers x and y must be configured to get messages from the default
queue:
user@x:/$ celery -A proj worker -Q default --hostname=x@%h user@y:/$ celery -A proj worker -Q default --hostname=y@%h
If you want, you can even have the message source processing unit of work also handle regular tasks, perhaps when there are many regular tasks:
user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h
If you want to add a queue on another message exchange, just declare a custom message exchange and its type.
from kombu import Exchange, Queue app.conf.task_queues = ( Queue('feed_tasks', routing_key='feed.#'), Queue('regular_tasks', routing_key='task.#'), Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'), routing_key='image.compress'), )
If you are unclear about these terms, you should take a look at AMQP
.
See also:
Work on AMQP Primer
below, and Rabbits and Warrens
which is an excellent blog about queuing and message exchange. Also, there is a CloudAMQP tutorial
, and for RabbitMQ
users, the RabbitMQ FAQ
will be very useful.
Special Routing Options
RabbitMQ message priority
supported transports:
RabbitMQ
New features in version 4.0.
Queues can support priority by setting the x-max-priority
parameter:
from kombu import Exchange, Queue app.conf.task_queues = [ Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={<!-- -->'x-max-priority': 10}, ]
The priority default for all queues is set using task_queue_max_priority
:
app.conf.task_queue_max_priority = 10
AMQP Primer
message
A message consists of a header and a body. Celery uses message headers to store the content-type and content-encoding of the message. The content type is usually the serialization format used by the message. The message body contains the name of the task to execute, the id (UUID) of the task, the parameters of the task function and an additional meta information – such as retry count or ETA.
Here is an example of a task message represented using a python dictionary:
{<!-- -->'task': 'myapp.tasks.add', 'id': '54086c5e-6193-4575-8308-dbab76798756', 'args': [4, 4], 'kwargs': {}}
producer, consumer, message middleware
Clients that send messages are often called publishers, or producers, and entities that receive messages are called consumers.
Messaging middleware is a message server that routes messages from producers to consumers.
The following terms are often seen in AMQP
related documentation.
Exchanges, queue, routing-key
- The message is sent to the message exchange
- A message exchange routes messages to one or more queues. There are several different types of message exchanges, they provide different routing methods, or implement different message scenarios
- The message is waiting in the queue to direct someone to consume it
- When the message is acknowledged it will be removed from the queue
The necessary steps to send and receive messages include:
1. Create a message exchange
2. Create a queue
3. Bind the queue to the message exchange
Celery automatically creates the required entities for queues defined in task_queues
(unless auto_declare
is set to False
for the queue).
Here is an example queue configuration with three queues; one for Video processing, one for images processing, and the default queue for other processing:
from kombu import Exchange, Queue app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('videos', Exchange('media'), routing_key='media. video'), Queue('images', Exchange('media'), routing_key='media.image'), ) app.conf.task_default_queue = 'default' app.conf.task_default_exchange_type = 'direct' app.conf.task_default_routing_key = 'default'
Exchange type
The message exchange type defines how messages are routed through the message exchange. The standard message exchange types are direct, topic, fanout and headers
. Additionally, non-standard message exchange types are available via RabbitMQ
plugins, such as the last-value-cache plugin
by Michael Bridgen.
Direct exchanges
The direct message exchange type implements routing through exact routing key matching, so a queue bound by the routing key video
can only receive the message with this routing key.
Topic exchanges
The topic message exchange type uses . to separate words, the wild-card
character * (to match a whole word), and the character # (to match zero or more words) to match routing keys.
For routing keys like usa.news, usa.weather, norway.news, and norway.weather
, the binding would be *.news
(all news), usa. # (all items in the USA), or usa.weather (all USA weather items)
Related API commands
- exchange.declare(exchange_name, type, passive,
durable, auto_delete, internal)
Declare a message exchange with a name.
See the amqp:Channel.exchange_declaration.
Keyword arguments:
passive – Passive means that the message exchange will not be created automatically, you can use it to detect if the message exchange exists.
durable – Durable message exchanges are durable (i.e. they persist across message middleware restarts).
auto_delete – this means that the messaging middleware will delete it automatically if no queues are using it.
- queue.declare(queue_name, passive, durable, exclusive, auto_delete)
Declare a queue with a name.
See amqp:Channel.queue_declaration.
Private queues can only be consumed in the current connection. Private also means auto_delete
.
- queue. bind(queue_name, exchange_name, routing_key)
Bind a queue to a message exchange using a routing key.
Unbound queues will not receive messages, so this is required.
See amqp:Channel.queue_bind.
- queue.delete(name, if_unused=False, if_empty=False)
Deletes a queue and its binding.
See amqp:Channel.queue_delete
exchange.delete(name, if_unused=False)
Deletes an exchange.
See amqp:Channel.exchange_delete
Notice:
Declare does not mean “create”. When you declare, you are just asserting that this entity exists and is operable. There is no rule as to who should create the exchange/queue/binding
, whether it is a consumer or a producer. Usually whoever needs it first creates it.
API hands-on
Celery has a tool celery amqp
for accessing the AMQP
API from the command line, allowing access to administrator tasks such as creating/deleting queues and message exchanges, deleting queue messages or Send a message. It can also be used for non AMQP
messaging middleware, but different implementations may not implement all commands.
You can write commands directly in the celery amqp
command-line parameters, or start without any parameters to enter interactive mode:
$ celery -A proj amqp -> connecting to amqp://guest@localhost:5672/. -> connected. 1>
Here 1> is a prompt. The number 1 represents the command you have currently executed. Type help to get a list of available commands, it also supports autocompletion, so you can start typing a command, then press tab to display available matches.
The following creates a queue to which you can send messages:
$ celery -A proj amqp 1> exchange.declare testexchange direct ok. 2> queue.declare testqueue ok.queue:testqueue messages:0 consumers:0. 3> queue.bind testqueue testexchange testkey ok.
Here a direct type message exchange testexchange
and a queue named testqueue
are created. This queue is bound to the message exchange with the routing key testkey
.
From then on, all messages sent to message exchange testexchange
with routing key testkey
will be delivered to this queue. You can send a message using the basic.publish
command:
4> basic. publish 'This is a message!' testexchange testkey ok.
Now that the message has been sent, you can retrieve it. You can use the basic.get
command which will get messages from the queue asynchronously (for maintenance tasks this is fine, but for services you should use basic.consume
).
Get a message from the queue:
5> basic. get testqueue {<!-- -->'body': 'This is a message!', 'delivery_info': {<!-- -->'delivery_tag': 1, 'exchange': u'testexchange', 'message_count': 0, 'redelivered': False, 'routing_key': u'testkey'}, 'properties': {}}
AMQP
uses an acknowledgment mechanism to indicate that a message has been received and processed successfully. If the message is not acknowledged and the consumer channel is closed, then the message will be re-delivered to another consumer.
Pay attention to the delivery_tag
in the above structure. In a connection channel, each received message has a unique delivery_tag
, which is used to confirm the message. Also, note that delivery_tag
is not unique across different connection channels, so on another client, delivery tag 1 may point to another message on a different channel than this one.
You can use basic.ack
to acknowledge the message you received:
6> basic.ack 1 ok.
To clean up the environment of our test session, you should delete the entities you created:
7> queue.delete testqueue ok. 0 messages deleted. 8> exchange.delete testexchange ok.
routing task
Define Queue
In Celery, the available queues are set via task_queue
.
Here is an example queue configuration with three queues; one for Video processing, one for images processing, and the default queue for other processing:
default_exchange = Exchange('default', type='direct') media_exchange = Exchange('media', type='direct') app.conf.task_queues = ( Queue('default', default_exchange, routing_key='default'), Queue('videos', media_exchange, routing_key='media. video'), Queue('images', media_exchange, routing_key='media. image') ) app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'default' app.conf.task_default_routing_key = 'default'
Here, task_default_queue
will be used to route tasks that do not have an explicit route.
The default message interactor, message exchange type, and routing key will be used as default routing values for tasks, and as default configuration values for queues defined in task_queues
.
Multiple bindings per queue are also supported. In the following example both routing keys are bound to the same queue:
from kombu import Exchange, Queue, binding media_exchange = Exchange('media', type='direct') CELERY_QUEUES = ( Queue('media', [ binding(media_exchange, routing_key='media. video'), binding(media_exchange, routing_key='media. image'), ]), )
declare task destination
The purpose of a task is determined by the following factors (in order)
1. The routes defined in task_routes
2. Routing parameters of Task.apply_async()
method
3. The routing-related attributes defined by Task
itself
Best practice is not to hardcode these settings, but to make them available as configuration options via Routers
; this is the most flexible way, but sensible defaults can still be set as task properties.
router
A router is a function that determines the routing options for a task.
To define a router, you only need to define a function whose signature is not (name, args, kwargs, options, task=None, **kw)
:
def route_task(name, args, kwargs, options, task=None, **kw): if name == 'myapp.tasks.compress_video': return {<!-- -->'exchange': 'video', 'exchange_type': 'topic', 'routing_key': 'video.compress'}
If you return the queue key, it will be expanded with the settings for that queue in task_queue
:
{<!-- -->'queue': 'video', 'routing_key': 'video.compress'}
expands to ->
{<!-- -->'queue': 'video', 'exchange': 'video', 'exchange_type': 'topic', 'routing_key': 'video.compress'}
You can install routing classes by adding routes to the task_routes
setting:
task_routes = (route_task,)
Routing functions can also be added by name:
task_routes = ('myapp.routers.route_task',)
For simple task name -> route mappings like the above, you can use a dictionary in the task_routes
setting to achieve the same effect:
task_routes = { 'myapp.tasks.compress_video': { 'queue': 'video', 'routing_key': 'video.compress', }, }
The routers will be traversed in order until the first router that returns a true value is encountered and used as the final route for the task.
You can define multiple routers in a sequence:
task_routes = [ route_task, { 'myapp.tasks.compress_video': { 'queue': 'video', 'routing_key': 'video.compress', }, ]
Routers will be visited in order, and the one that returns a value first will be selected.
Broadcast
Celery also supports broadcast routing. The following message exchange broadcast_task
delivers a copy of the task to all workers connected to it:
from kombu.common import Broadcast app.conf.task_queues = (Broadcast('broadcast_tasks'),) app.conf.task_routes = { 'tasks. reload_cache': { 'queue': 'broadcast_tasks', 'exchange': 'broadcast_tasks' } }
Now, the tasks.reload_cache
task will be delivered to all units of work consuming from this queue.
Here’s another example of a broadcast route, this time using the celery beat
scheduler:
from kombu.common import Broadcast from celery.schedules import crontab app.conf.task_queues = (Broadcast('broadcast_tasks'),) app.conf.beat_schedule = { 'test-task': { 'task': 'tasks. reload_cache', 'schedule': crontab(minute=0, hour='*/3'), 'options': {<!-- -->'exchange': 'broadcast_tasks'} }, }
Broadcast result:
Note that Celery results do not define what happens if two tasks have the same task ID. If the same task is dispatched to more than one unit of work, state history may not be preserved.
In this case, setting the task.ignore_result
attribute is a good idea.