Celery-4.1 User Guide: Routing Tasks

Notice:
Routing concepts like topics and fanout are not available for all transports, see the Transport Comparison Table.

Basics

auto-routing

The easiest way to route is to use the task_create_missing_queues setting (enabled by default).

With this setting, a named queue not yet defined in task_queues will be created automatically. This makes doing simple routing tasks very easy.

Say you have two servers, x and y that handle general tasks, and a server z that handles only feed related tasks. You can use this configuration:

task_routes = {<!-- -->'feed.tasks.import_feed': {<!-- -->'queue': 'feeds'}}

Using this route causes the import feed task to be routed to the feeds queue, while all other tasks will be routed to the default queue (named celery for historical reasons).

Alternatively, you can use glob pattern matching, or even regular expressions, to match all tasks in the feed.tasks namespace:

app.conf.task_routes = {<!-- -->'feed.tasks.*': {<!-- -->'queue': 'feeds'}}

If the order of matching patterns is important, you should declare routes in the form of a list of items:

task_routes = ([
    ('feed.tasks.*', {<!-- -->'queue': 'feeds'}),
    ('web.tasks.*', {<!-- -->'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {<!-- -->'queue': 'media'}),
],)

Notice:
The task_routes setting can be a dictionary, or a list of route objects, so in the above case you need to declare task_routes as a tuple containing the list.

Once the router is installed, you can start server z dedicated to the feeds feed queue:

user@z:/$ celery -A proj worker -Q feeds

You can declare as many queues as you need, so you can also have your server handle messages from the default queue:

user@z:/$ celery -A proj worker -Q feeds,celery

Modify the name of the default queue

You can change the name of the default queue with the following configuration:

app.conf.task_default_queue = 'default'

How the queue is defined

The point of this feature is to hide the complex AMQP protocol from users with only basic needs. But – you might still be interested in how the queue is declared.

A queue named video will be created with the following configuration:

{'exchange': 'video',
'exchange_type': 'direct',
'routing_key': 'video'}

Non AMQP backends like Redis or SQS do not support message exchanges, so they require the message exchange to have the same name as the queue. Using this design allows it to work well with backends that don’t eat message exchanges.

Manual routing

If you have two servers x and y for routine tasks, and another server z for processing news source-related tasks, you can use the following configuration:

from kombu import Queue

app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('default', routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
task_default_exchange = 'tasks'
task_default_exchange_type = 'topic'
task_default_routing_key = 'task.default'

task_queue is a list of queue instances. If you do not set a message exchange or exchange type for a key, this information will be taken from the task_default_exchange and task_default_exchange_type configurations.

To route a task to the feed_tasks queue, you can add an entry to the task_routes configuration:

task_routes = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

You can override this routing behavior with the Task.apply_async() method or the routing_key parameter of the send_task() method:

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
... queue='feed_tasks',
...routing_key='feed.import')

To make server z only get messages from the feed_tasks queue, you can use the -Q option when starting the worker:

user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h

Servers x and y must be configured to get messages from the default queue:

user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

If you want, you can even have the message source processing unit of work also handle regular tasks, perhaps when there are many regular tasks:

user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h

If you want to add a queue on another message exchange, just declare a custom message exchange and its type.

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('feed_tasks', routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
)

If you are unclear about these terms, you should take a look at AMQP.

See also:
Work on AMQP Primer below, and Rabbits and Warrens which is an excellent blog about queuing and message exchange. Also, there is a CloudAMQP tutorial, and for RabbitMQ users, the RabbitMQ FAQ will be very useful.

Special Routing Options

RabbitMQ message priority

supported transports:
RabbitMQ

New features in version 4.0.

Queues can support priority by setting the x-max-priority parameter:

from kombu import Exchange, Queue

app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={<!-- -->'x-max-priority': 10},
]

The priority default for all queues is set using task_queue_max_priority:

app.conf.task_queue_max_priority = 10

AMQP Primer

message

A message consists of a header and a body. Celery uses message headers to store the content-type and content-encoding of the message. The content type is usually the serialization format used by the message. The message body contains the name of the task to execute, the id (UUID) of the task, the parameters of the task function and an additional meta information – such as retry count or ETA.

Here is an example of a task message represented using a python dictionary:

{<!-- -->'task': 'myapp.tasks.add',
 'id': '54086c5e-6193-4575-8308-dbab76798756',
 'args': [4, 4],
 'kwargs': {}}

producer, consumer, message middleware

Clients that send messages are often called publishers, or producers, and entities that receive messages are called consumers.

Messaging middleware is a message server that routes messages from producers to consumers.

The following terms are often seen in AMQP related documentation.

Exchanges, queue, routing-key

  1. The message is sent to the message exchange
  2. A message exchange routes messages to one or more queues. There are several different types of message exchanges, they provide different routing methods, or implement different message scenarios
  3. The message is waiting in the queue to direct someone to consume it
  4. When the message is acknowledged it will be removed from the queue

The necessary steps to send and receive messages include:
1. Create a message exchange
2. Create a queue
3. Bind the queue to the message exchange

Celery automatically creates the required entities for queues defined in task_queues (unless auto_declare is set to False for the queue).

Here is an example queue configuration with three queues; one for Video processing, one for images processing, and the default queue for other processing:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos', Exchange('media'), routing_key='media. video'),
    Queue('images', Exchange('media'), routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'

Exchange type

The message exchange type defines how messages are routed through the message exchange. The standard message exchange types are direct, topic, fanout and headers. Additionally, non-standard message exchange types are available via RabbitMQ plugins, such as the last-value-cache plugin by Michael Bridgen.

Direct exchanges

The direct message exchange type implements routing through exact routing key matching, so a queue bound by the routing key video can only receive the message with this routing key.

Topic exchanges

The topic message exchange type uses . to separate words, the wild-card character * (to match a whole word), and the character # (to match zero or more words) to match routing keys.

For routing keys like usa.news, usa.weather, norway.news, and norway.weather, the binding would be *.news(all news), usa. # (all items in the USA), or usa.weather (all USA weather items)

  • exchange.declare(exchange_name, type, passive,
    durable, auto_delete, internal)

Declare a message exchange with a name.

See the amqp:Channel.exchange_declaration.

Keyword arguments:

passive – Passive means that the message exchange will not be created automatically, you can use it to detect if the message exchange exists.
durable – Durable message exchanges are durable (i.e. they persist across message middleware restarts).
auto_delete – this means that the messaging middleware will delete it automatically if no queues are using it.

  • queue.declare(queue_name, passive, durable, exclusive, auto_delete)
    Declare a queue with a name.

See amqp:Channel.queue_declaration.

Private queues can only be consumed in the current connection. Private also means auto_delete.

  • queue. bind(queue_name, exchange_name, routing_key)
    Bind a queue to a message exchange using a routing key.

Unbound queues will not receive messages, so this is required.

See amqp:Channel.queue_bind.

  • queue.delete(name, if_unused=False, if_empty=False)
    Deletes a queue and its binding.

See amqp:Channel.queue_delete

exchange.delete(name, if_unused=False)
Deletes an exchange.

See amqp:Channel.exchange_delete

Notice:
Declare does not mean “create”. When you declare, you are just asserting that this entity exists and is operable. There is no rule as to who should create the exchange/queue/binding, whether it is a consumer or a producer. Usually whoever needs it first creates it.

API hands-on

Celery has a tool celery amqp for accessing the AMQP API from the command line, allowing access to administrator tasks such as creating/deleting queues and message exchanges, deleting queue messages or Send a message. It can also be used for non AMQP messaging middleware, but different implementations may not implement all commands.

You can write commands directly in the celery amqp command-line parameters, or start without any parameters to enter interactive mode:

$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>

Here 1> is a prompt. The number 1 represents the command you have currently executed. Type help to get a list of available commands, it also supports autocompletion, so you can start typing a command, then press tab to display available matches.

The following creates a queue to which you can send messages:

$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok.queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.

Here a direct type message exchange testexchange and a queue named testqueue are created. This queue is bound to the message exchange with the routing key testkey .

From then on, all messages sent to message exchange testexchange with routing key testkey will be delivered to this queue. You can send a message using the basic.publish command:

4> basic. publish 'This is a message!' testexchange testkey
ok.

Now that the message has been sent, you can retrieve it. You can use the basic.get command which will get messages from the queue asynchronously (for maintenance tasks this is fine, but for services you should use basic.consume).

Get a message from the queue:

5> basic. get testqueue
{<!-- -->'body': 'This is a message!',
 'delivery_info': {<!-- -->'delivery_tag': 1,
                   'exchange': u'testexchange',
                   'message_count': 0,
                   'redelivered': False,
                   'routing_key': u'testkey'},
 'properties': {}}

AMQP uses an acknowledgment mechanism to indicate that a message has been received and processed successfully. If the message is not acknowledged and the consumer channel is closed, then the message will be re-delivered to another consumer.

Pay attention to the delivery_tag in the above structure. In a connection channel, each received message has a unique delivery_tag, which is used to confirm the message. Also, note that delivery_tag is not unique across different connection channels, so on another client, delivery tag 1 may point to another message on a different channel than this one.

You can use basic.ack to acknowledge the message you received:

6> basic.ack 1
ok.

To clean up the environment of our test session, you should delete the entities you created:

7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.

routing task

Define Queue

In Celery, the available queues are set via task_queue.

Here is an example queue configuration with three queues; one for Video processing, one for images processing, and the default queue for other processing:

default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

app.conf.task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media. video'),
    Queue('images', media_exchange, routing_key='media. image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

Here, task_default_queue will be used to route tasks that do not have an explicit route.

The default message interactor, message exchange type, and routing key will be used as default routing values for tasks, and as default configuration values for queues defined in task_queues .

Multiple bindings per queue are also supported. In the following example both routing keys are bound to the same queue:

from kombu import Exchange, Queue, binding

media_exchange = Exchange('media', type='direct')

CELERY_QUEUES = (
    Queue('media', [
        binding(media_exchange, routing_key='media. video'),
        binding(media_exchange, routing_key='media. image'),
    ]),
)

declare task destination

The purpose of a task is determined by the following factors (in order)
1. The routes defined in task_routes
2. Routing parameters of Task.apply_async() method
3. The routing-related attributes defined by Task itself

Best practice is not to hardcode these settings, but to make them available as configuration options via Routers ; this is the most flexible way, but sensible defaults can still be set as task properties.

router

A router is a function that determines the routing options for a task.

To define a router, you only need to define a function whose signature is not (name, args, kwargs, options, task=None, **kw) :

def route_task(name, args, kwargs, options, task=None, **kw):
        if name == 'myapp.tasks.compress_video':
            return {<!-- -->'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}

If you return the queue key, it will be expanded with the settings for that queue in task_queue:

{<!-- -->'queue': 'video', 'routing_key': 'video.compress'}

expands to ->

{<!-- -->'queue': 'video',
 'exchange': 'video',
 'exchange_type': 'topic',
 'routing_key': 'video.compress'}

You can install routing classes by adding routes to the task_routes setting:

task_routes = (route_task,)

Routing functions can also be added by name:

task_routes = ('myapp.routers.route_task',)

For simple task name -> route mappings like the above, you can use a dictionary in the task_routes setting to achieve the same effect:

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
    },
}

The routers will be traversed in order until the first router that returns a true value is encountered and used as the final route for the task.

You can define multiple routers in a sequence:

task_routes = [
    route_task,
    {
        'myapp.tasks.compress_video': {
            'queue': 'video',
            'routing_key': 'video.compress',
    },
]

Routers will be visited in order, and the one that returns a value first will be selected.

Broadcast

Celery also supports broadcast routing. The following message exchange broadcast_task delivers a copy of the task to all workers connected to it:

from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
    'tasks. reload_cache': {
        'queue': 'broadcast_tasks',
        'exchange': 'broadcast_tasks'
    }
}

Now, the tasks.reload_cache task will be delivered to all units of work consuming from this queue.

Here’s another example of a broadcast route, this time using the celery beat scheduler:

from kombu.common import Broadcast
from celery.schedules import crontab

app.conf.task_queues = (Broadcast('broadcast_tasks'),)

app.conf.beat_schedule = {
    'test-task': {
        'task': 'tasks. reload_cache',
        'schedule': crontab(minute=0, hour='*/3'),
        'options': {<!-- -->'exchange': 'broadcast_tasks'}
    },
}

Broadcast result:

Note that Celery results do not define what happens if two tasks have the same task ID. If the same task is dispatched to more than one unit of work, state history may not be preserved.

In this case, setting the task.ignore_result attribute is a good idea.