This document describes the current stable version of Celery (3.1). For development docs, go here.

Celery - Distributed Task Queue

Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

It’s a task queue with focus on real-time processing, while also supporting task scheduling.

Celery has a large and diverse community of users and contributors, you should come join us on IRC or our mailing-list.

Celery is Open Source and licensed under the BSD License.

Getting Started

Contents

Getting Started

Release:3.1
Date:January 23, 2016

Introduction to Celery

What is a Task Queue?

Task queues are used as a mechanism to distribute work across threads or machines.

A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, a client adds a message to the queue, which the broker then delivers to a worker.

A Celery system can consist of multiple workers and brokers, giving way to high availability and horizontal scaling.

Celery is written in Python, but the protocol can be implemented in any language. So far there’s RCelery for the Ruby programming language, node-celery for Node.js and a PHP client. Language interoperability can also be achieved by using webhooks.

What do I need?

Celery requires a message transport to send and receive messages. The RabbitMQ and Redis broker transports are feature complete, but there’s also support for a myriad of other experimental solutions, including using SQLite for local development.

Celery can run on a single machine, on multiple machines, or even across data centers.

Get Started

If this is the first time you’re trying to use Celery, or you are new to Celery 3.0 coming from previous versions then you should read our getting started tutorials:

Celery is…

  • Simple

    Celery is easy to use and maintain, and it doesn’t need configuration files.

    It has an active, friendly community you can talk to for support, including a mailing-list and an IRC channel.

    Here’s one of the simplest applications you can make:

    from celery import Celery
    
    app = Celery('hello', broker='amqp://guest@localhost//')
    
    @app.task
    def hello():
        return 'hello world'
    
  • Highly Available

    Workers and clients will automatically retry in the event of connection loss or failure, and some brokers support HA in way of Master/Master or Master/Slave replication.

  • Fast

    A single Celery process can process millions of tasks a minute, with sub-millisecond round-trip latency (using RabbitMQ, py-librabbitmq, and optimized settings).

  • Flexible

    Almost every part of Celery can be extended or used on its own, Custom pool implementations, serializers, compression schemes, logging, schedulers, consumers, producers, autoscalers, broker transports and much more.

It supports

  • Result Stores

    • AMQP, Redis
    • memcached, MongoDB
    • SQLAlchemy, Django ORM
    • Apache Cassandra
  • Serialization

    • pickle, json, yaml, msgpack.
    • zlib, bzip2 compression.
    • Cryptographic message signing.
Features

  • Monitoring

    A stream of monitoring events is emitted by workers and is used by built-in and external tools to tell you what your cluster is doing – in real-time.

    Read more….

  • Workflows

    Simple and complex workflows can be composed using a set of powerful primitives we call the “canvas”, including grouping, chaining, chunking and more.

    Read more….

  • Time & Rate Limits

    You can control how many tasks can be executed per second/minute/hour, or how long a task can be allowed to run, and this can be set as a default, for a specific worker or individually for each task type.

    Read more….

  • Scheduling

    You can specify the time to run a task in seconds or a datetime, or or you can use periodic tasks for recurring events based on a simple interval, or crontab expressions supporting minute, hour, day of week, day of month, and month of year.

    Read more….

  • Autoreloading

    In development workers can be configured to automatically reload source code as it changes, including inotify(7) support on Linux.

    Read more….

  • Autoscaling

    Dynamically resizing the worker pool depending on load, or custom metrics specified by the user, used to limit memory usage in shared hosting/cloud environments or to enforce a given quality of service.

    Read more….

  • Resource Leak Protection

    The --maxtasksperchild option is used for user tasks leaking resources, like memory or file descriptors, that are simply out of your control.

    Read more….

  • User Components

    Each worker component can be customized, and additional components can be defined by the user. The worker is built up using “bootsteps” — a dependency graph enabling fine grained control of the worker’s internals.

Framework Integration

Celery is easy to integrate with web frameworks, some of which even have integration packages:

The integration packages are not strictly necessary, but they can make development easier, and sometimes they add important hooks like closing database connections at fork(2).

Installation

You can install Celery either via the Python Package Index (PyPI) or from source.

To install using pip,:

$ pip install -U Celery

To install using easy_install,:

$ easy_install -U Celery
Bundles

Celery also defines a group of bundles that can be used to install Celery and the dependencies for a given feature.

You can specify these in your requirements or on the pip comand-line by using brackets. Multiple bundles can be specified by separating them by commas.

$ pip install "celery[librabbitmq]"

$ pip install "celery[librabbitmq,redis,auth,msgpack]"

The following bundles are available:

Serializers
celery[auth]:for using the auth serializer.
celery[msgpack]:
 for using the msgpack serializer.
celery[yaml]:for using the yaml serializer.
Concurrency
celery[eventlet]:
 for using the eventlet pool.
celery[gevent]:for using the gevent pool.
celery[threads]:
 for using the thread pool.
Transports and Backends
celery[librabbitmq]:
 for using the librabbitmq C library.
celery[redis]:for using Redis as a message transport or as a result backend.
celery[mongodb]:
 for using MongoDB as a message transport (experimental), or as a result backend (supported).
celery[sqs]:for using Amazon SQS as a message transport (experimental).
celery[memcache]:
 for using memcached as a result backend.
celery[cassandra]:
 for using Apache Cassandra as a result backend.
celery[couchdb]:
 for using CouchDB as a message transport (experimental).
celery[couchbase]:
 for using CouchBase as a result backend.
celery[beanstalk]:
 for using Beanstalk as a message transport (experimental).
celery[zookeeper]:
 for using Zookeeper as a message transport.
celery[zeromq]:for using ZeroMQ as a message transport (experimental).
celery[sqlalchemy]:
 for using SQLAlchemy as a message transport (experimental), or as a result backend (supported).
celery[pyro]:for using the Pyro4 message transport (experimental).
celery[slmq]:for using the SoftLayer Message Queue transport (experimental).
Downloading and installing from source

Download the latest version of Celery from http://pypi.python.org/pypi/celery/

You can install it by doing the following,:

$ tar xvfz celery-0.0.0.tar.gz
$ cd celery-0.0.0
$ python setup.py build
# python setup.py install

The last command must be executed as a privileged user if you are not currently using a virtualenv.

Using the development version
With pip

The Celery development version also requires the development versions of kombu, amqp and billiard.

You can install the latest snapshot of these using the following pip commands:

$ pip install https://github.com/celery/celery/zipball/master#egg=celery
$ pip install https://github.com/celery/billiard/zipball/master#egg=billiard
$ pip install https://github.com/celery/py-amqp/zipball/master#egg=amqp
$ pip install https://github.com/celery/kombu/zipball/master#egg=kombu
With git

Please the Contributing section.

Brokers

Release:3.1
Date:January 23, 2016

Celery supports several message transport alternatives.

Broker Instructions
Using RabbitMQ
Installation & Configuration

RabbitMQ is the default broker so it does not require any additional dependencies or initial configuration, other than the URL location of the broker instance you want to use:

>>> BROKER_URL = 'amqp://guest:guest@localhost:5672//'

For a description of broker URLs and a full list of the various broker configuration options available to Celery, see Broker Settings.

Installing the RabbitMQ Server

See Installing RabbitMQ over at RabbitMQ’s website. For Mac OS X see Installing RabbitMQ on OS X.

Note

If you’re getting nodedown errors after installing and using rabbitmqctl then this blog post can help you identify the source of the problem:

Setting up RabbitMQ

To use celery we need to create a RabbitMQ user, a virtual host and allow that user access to that virtual host:

$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_user_tags myuser mytag
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

See the RabbitMQ Admin Guide for more information about access control.

Installing RabbitMQ on OS X

The easiest way to install RabbitMQ on OS X is using Homebrew the new and shiny package management system for OS X.

First, install homebrew using the one-line command provided by the Homebrew documentation:

ruby -e "$(curl -fsSL https://raw.github.com/Homebrew/homebrew/go/install)"

Finally, we can install rabbitmq using brew:

$ brew install rabbitmq

After you have installed rabbitmq with brew you need to add the following to your path to be able to start and stop the broker. Add it to your .bash_profile or .profile

`PATH=$PATH:/usr/local/sbin`
Configuring the system host name

If you’re using a DHCP server that is giving you a random host name, you need to permanently configure the host name. This is because RabbitMQ uses the host name to communicate with nodes.

Use the scutil command to permanently set your host name:

$ sudo scutil --set HostName myhost.local

Then add that host name to /etc/hosts so it’s possible to resolve it back into an IP address:

127.0.0.1       localhost myhost myhost.local

If you start the rabbitmq server, your rabbit node should now be rabbit@myhost, as verified by rabbitmqctl:

$ sudo rabbitmqctl status
Status of node rabbit@myhost ...
[{running_applications,[{rabbit,"RabbitMQ","1.7.1"},
                    {mnesia,"MNESIA  CXC 138 12","4.4.12"},
                    {os_mon,"CPO  CXC 138 46","2.2.4"},
                    {sasl,"SASL  CXC 138 11","2.1.8"},
                    {stdlib,"ERTS  CXC 138 10","1.16.4"},
                    {kernel,"ERTS  CXC 138 10","2.13.4"}]},
{nodes,[rabbit@myhost]},
{running_nodes,[rabbit@myhost]}]
...done.

This is especially important if your DHCP server gives you a host name starting with an IP address, (e.g. 23.10.112.31.comcast.net), because then RabbitMQ will try to use rabbit@23, which is an illegal host name.

Starting/Stopping the RabbitMQ server

To start the server:

$ sudo rabbitmq-server

you can also run it in the background by adding the -detached option (note: only one dash):

$ sudo rabbitmq-server -detached

Never use kill to stop the RabbitMQ server, but rather use the rabbitmqctl command:

$ sudo rabbitmqctl stop

When the server is running, you can continue reading Setting up RabbitMQ.

Using Redis
Installation

For the Redis support you have to install additional dependencies. You can install both Celery and these dependencies in one go using the celery[redis] bundle:

$ pip install -U celery[redis]
Configuration

Configuration is easy, just configure the location of your Redis database:

BROKER_URL = 'redis://localhost:6379/0'

Where the URL is in the format of:

redis://:password@hostname:port/db_number

all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.

If a unix socket connection should be used, the URL needs to be in the format:

redis+socket:///path/to/redis.sock
Visibility Timeout

The visibility timeout defines the number of seconds to wait for the worker to acknowledge the task before the message is redelivered to another worker. Be sure to see Caveats below.

This option is set via the BROKER_TRANSPORT_OPTIONS setting:

BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}  # 1 hour.

The default visibility timeout for Redis is 1 hour.

Results

If you also want to store the state and return values of tasks in Redis, you should configure these settings:

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

For a complete list of options supported by the Redis result backend, see Redis backend settings

Caveats
  • Broadcast messages will be seen by all virtual hosts by default.

    You have to set a transport option to prefix the messages so that they will only be received by the active virtual host:

    BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True}
    

    Note that you will not be able to communicate with workers running older versions or workers that does not have this setting enabled.

    This setting will be the default in the future, so better to migrate sooner rather than later.

  • Workers will receive all task related events by default.

    To avoid this you must set the fanout_patterns fanout option so that the workers may only subscribe to worker related events:

    BROKER_TRANSPORT_OPTIONS = {'fanout_patterns': True}
    

    Note that this change is backward incompatible so all workers in the cluster must have this option enabled, or else they will not be able to communicate.

    This option will be enabled by default in the future.

  • If a task is not acknowledged within the Visibility Timeout the task will be redelivered to another worker and executed.

    This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.

    So you have to increase the visibility timeout to match the time of the longest ETA you are planning to use.

    Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.

    Periodic tasks will not be affected by the visibility timeout, as this is a concept separate from ETA/countdown.

    You can increase this timeout by configuring a transport option with the same name:

    BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
    

    The value must be an int describing the number of seconds.

  • Monitoring events (as used by flower and other tools) are global and is not affected by the virtual host setting.

    This is caused by a limitation in Redis. The Redis PUB/SUB channels are global and not affected by the database number.

  • Redis may evict keys from the database in some situations

    If you experience an error like:

    InconsistencyError, Probably the key ('_kombu.binding.celery') has been
    removed from the Redis database.
    

    you may want to configure the redis-server to not evict keys by setting the timeout parameter to 0.

Experimental Transports
Using SQLAlchemy

Experimental Status

The SQLAlchemy transport is unstable in many areas and there are several issues open. Unfortunately we don’t have the resources or funds required to improve the situation, so we’re looking for contributors and partners willing to help.

Installation
Configuration

Celery needs to know the location of your database, which should be the usual SQLAlchemy connection string, but with ‘sqla+’ prepended to it:

BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'

This transport uses only the BROKER_URL setting, which have to be an SQLAlchemy database URI.

Please see SQLAlchemy: Supported Databases for a table of supported databases.

Here’s a list of examples using a selection of other SQLAlchemy Connection Strings:

# sqlite (filename)
BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'

# mysql
BROKER_URL = 'sqla+mysql://scott:tiger@localhost/foo'

# postgresql
BROKER_URL = 'sqla+postgresql://scott:tiger@localhost/mydatabase'

# oracle
BROKER_URL = 'sqla+oracle://scott:tiger@127.0.0.1:1521/sidname'
Results

To store results in the database as well, you should configure the result backend. See Database backend settings.

Limitations

The SQLAlchemy database transport does not currently support:

  • Remote control commands (celery events command, broadcast)
  • Events, including the Django Admin monitor.
  • Using more than a few workers (can lead to messages being executed multiple times).
Using the Django Database

Experimental Status

The Django database transport is in need of improvements in many areas and there are several open bugs. Unfortunately we don’t have the resources or funds required to improve the situation, so we’re looking for contributors and partners willing to help.

Installation
Configuration

The database transport uses the Django DATABASE_* settings for database configuration values.

  1. Set your broker transport:

    BROKER_URL = 'django://'
    
  2. Add kombu.transport.django to INSTALLED_APPS:

    INSTALLED_APPS = ('kombu.transport.django', )
    
  3. Sync your database schema:

$ python manage.py syncdb
Limitations

The Django database transport does not currently support:

  • Remote control commands (celery events command, broadcast)
  • Events, including the Django Admin monitor.
  • Using more than a few workers (can lead to messages being executed multiple times).
Using MongoDB

Experimental Status

The MongoDB transport is in need of improvements in many areas and there are several open bugs. Unfortunately we don’t have the resources or funds required to improve the situation, so we’re looking for contributors and partners willing to help.

Installation

For the MongoDB support you have to install additional dependencies. You can install both Celery and these dependencies in one go using the celery[mongodb] bundle:

$ pip install -U celery[mongodb]
Configuration

Configuration is easy, set the transport, and configure the location of your MongoDB database:

BROKER_URL = 'mongodb://localhost:27017/database_name'

Where the URL is in the format of:

mongodb://userid:password@hostname:port/database_name

The host name will default to localhost and the port to 27017, and so they are optional. userid and password are also optional, but needed if your MongoDB server requires authentication.

Results

If you also want to store the state and return values of tasks in MongoDB, you should see MongoDB backend settings.

Using Amazon SQS

Experimental Status

The SQS transport is in need of improvements in many areas and there are several open bugs. Unfortunately we don’t have the resources or funds required to improve the situation, so we’re looking for contributors and partners willing to help.

Installation

For the Amazon SQS support you have to install the boto library:

$ pip install -U boto
Configuration

You have to specify SQS in the broker URL:

BROKER_URL = 'sqs://ABCDEFGHIJKLMNOPQRST:ZYXK7NiynGlTogH8Nj+P9nlE73sq3@'

where the URL format is:

sqs://aws_access_key_id:aws_secret_access_key@

you must remember to include the “@” at the end.

The login credentials can also be set using the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, in that case the broker url may only be sqs://.

Note

If you specify AWS credentials in the broker URL, then please keep in mind that the secret access key may contain unsafe characters that needs to be URL encoded.

Options
Region

The default region is us-east-1 but you can select another region by configuring the BROKER_TRANSPORT_OPTIONS setting:

BROKER_TRANSPORT_OPTIONS = {'region': 'eu-west-1'}

See also

An overview of Amazon Web Services regions can be found here:

Visibility Timeout

The visibility timeout defines the number of seconds to wait for the worker to acknowledge the task before the message is redelivered to another worker. Also see caveats below.

This option is set via the BROKER_TRANSPORT_OPTIONS setting:

BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}  # 1 hour.

The default visibility timeout is 30 seconds.

Polling Interval

The polling interval decides the number of seconds to sleep between unsuccessful polls. This value can be either an int or a float. By default the value is 1 second, which means that the worker will sleep for one second whenever there are no more messages to read.

You should note that more frequent polling is also more expensive, so increasing the polling interval can save you money.

The polling interval can be set via the BROKER_TRANSPORT_OPTIONS setting:

BROKER_TRANSPORT_OPTIONS = {'polling_interval': 0.3}

Very frequent polling intervals can cause busy loops, which results in the worker using a lot of CPU time. If you need sub-millisecond precision you should consider using another transport, like RabbitMQ <broker-amqp>, or Redis <broker-redis>.

Queue Prefix

By default Celery will not assign any prefix to the queue names, If you have other services using SQS you can configure it do so using the BROKER_TRANSPORT_OPTIONS setting:

BROKER_TRANSPORT_OPTIONS = {'queue_name_prefix': 'celery-'}
Caveats
  • If a task is not acknowledged within the visibility_timeout, the task will be redelivered to another worker and executed.

    This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.

    So you have to increase the visibility timeout to match the time of the longest ETA you are planning to use.

    Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.

    Periodic tasks will not be affected by the visibility timeout, as it is a concept separate from ETA/countdown.

    The maximum visibility timeout supported by AWS as of this writing is 12 hours (43200 seconds):

    BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
    
  • SQS does not yet support worker remote control commands.

  • SQS does not yet support events, and so cannot be used with celery events, celerymon or the Django Admin monitor.

Results

Multiple products in the Amazon Web Services family could be a good candidate to store or publish results with, but there is no such result backend included at this point.

Warning

Do not use the amqp result backend with SQS.

It will create one queue for every task, and the queues will not be collected. This could cost you money that would be better spent contributing an AWS result store backend back to Celery :)

Using CouchDB

Experimental Status

The CouchDB transport is in need of improvements in many areas and there are several open bugs. Unfortunately we don’t have the resources or funds required to improve the situation, so we’re looking for contributors and partners willing to help.

Installation

For the CouchDB support you have to install additional dependencies. You can install both Celery and these dependencies in one go using the celery[couchdb] bundle:

$ pip install -U celery[couchdb]
Configuration

Configuration is easy, set the transport, and configure the location of your CouchDB database:

BROKER_URL = 'couchdb://localhost:5984/database_name'

Where the URL is in the format of:

couchdb://userid:password@hostname:port/database_name

The host name will default to localhost and the port to 5984, and so they are optional. userid and password are also optional, but needed if your CouchDB server requires authentication.

Results

Storing task state and results in CouchDB is currently not supported.

Limitations

The CouchDB message transport does not currently support:

  • Remote control commands (celery inspect, celery control, broadcast)
Using Beanstalk

Out of order

The Beanstalk transport is currently not working well.

We are interested in contributions and donations that can go towards improving this situation.

Installation

For the Beanstalk support you have to install additional dependencies. You can install both Celery and these dependencies in one go using the celery[beanstalk] bundle:

$ pip install -U celery[beanstalk]
Configuration

Configuration is easy, set the transport, and configure the location of your Beanstalk database:

BROKER_URL = 'beanstalk://localhost:11300'

Where the URL is in the format of:

beanstalk://hostname:port

The host name will default to localhost and the port to 11300, and so they are optional.

Results

Using Beanstalk to store task state and results is currently not supported.

Limitations

The Beanstalk message transport does not currently support:

  • Remote control commands (celery control, celery inspect, broadcast)
  • Authentication
Using IronMQ
Installation

For IronMQ support, you’ll need the [iron_celery](http://github.com/iron-io/iron_celery) library:

$ pip install iron_celery

As well as an [Iron.io account](http://www.iron.io). Sign up for free at [iron.io](http://www.iron.io).

Configuration

First, you’ll need to import the iron_celery library right after you import Celery, for example:

from celery import Celery
import iron_celery

app = Celery('mytasks', broker='ironmq://', backend='ironcache://')

You have to specify IronMQ in the broker URL:

BROKER_URL = 'ironmq://ABCDEFGHIJKLMNOPQRST:ZYXK7NiynGlTogH8Nj+P9nlE73sq3@'

where the URL format is:

ironmq://project_id:token@

you must remember to include the “@” at the end.

The login credentials can also be set using the environment variables IRON_TOKEN and IRON_PROJECT_ID, which are set automatically if you use the IronMQ Heroku add-on. And in this case the broker url may only be:

ironmq://
Clouds

The default cloud/region is AWS us-east-1. You can choose the IronMQ Rackspace (ORD) cloud by changing the URL to:

ironmq://project_id:token@mq-rackspace-ord.iron.io
Results

You can store results in IronCache with the same Iron.io credentials, just set the results URL with the same syntax as the broker URL, but changing the start to ironcache:

ironcache:://project_id:token@

This will default to a cache named “Celery”, if you want to change that:

ironcache:://project_id:token@/awesomecache
More Information

You can find more information in the [iron_celery README](http://github.com/iron-io/iron_celery).

Broker Overview

This is comparison table of the different transports supports, more information can be found in the documentation for each individual transport (see Broker Instructions).

Name Status Monitoring Remote Control
RabbitMQ Stable Yes Yes
Redis Stable Yes Yes
Mongo DB Experimental Yes Yes
Beanstalk Experimental No No
Amazon SQS Experimental No No
Couch DB Experimental No No
Zookeeper Experimental No No
Django DB Experimental No No
SQLAlchemy Experimental No No
Iron MQ 3rd party No No

Experimental brokers may be functional but they do not have dedicated maintainers.

Missing monitor support means that the transport does not implement events, and as such Flower, celery events, celerymon and other event-based monitoring tools will not work.

Remote control means the ability to inspect and manage workers at runtime using the celery inspect and celery control commands (and other tools using the remote control API).

First Steps with Celery

Celery is a task queue with batteries included. It is easy to use so that you can get started without learning the full complexities of the problem it solves. It is designed around best practices so that your product can scale and integrate with other languages, and it comes with the tools and support you need to run such a system in production.

In this tutorial you will learn the absolute basics of using Celery. You will learn about;

  • Choosing and installing a message transport (broker).
  • Installing Celery and creating your first task.
  • Starting the worker and calling tasks.
  • Keeping track of tasks as they transition through different states, and inspecting return values.

Celery may seem daunting at first - but don’t worry - this tutorial will get you started in no time. It is deliberately kept simple, so to not confuse you with advanced features. After you have finished this tutorial it’s a good idea to browse the rest of the documentation, for example the Next Steps tutorial, which will showcase Celery’s capabilities.

Choosing a Broker

Celery requires a solution to send and receive messages; usually this comes in the form of a separate service called a message broker.

There are several choices available, including:

RabbitMQ

RabbitMQ is feature-complete, stable, durable and easy to install. It’s an excellent choice for a production environment. Detailed information about using RabbitMQ with Celery:

If you are using Ubuntu or Debian install RabbitMQ by executing this command:

$ sudo apt-get install rabbitmq-server

When the command completes the broker is already running in the background, ready to move messages for you: Starting rabbitmq-server: SUCCESS.

And don’t worry if you’re not running Ubuntu or Debian, you can go to this website to find similarly simple installation instructions for other platforms, including Microsoft Windows:

Redis

Redis is also feature-complete, but is more susceptible to data loss in the event of abrupt termination or power failures. Detailed information about using Redis:

Using a database

Using a database as a message queue is not recommended, but can be sufficient for very small installations. Your options include:

If you’re already using a Django database for example, using it as your message broker can be convenient while developing even if you use a more robust system in production.

Other brokers

In addition to the above, there are other experimental transport implementations to choose from, including Amazon SQS, Using MongoDB and IronMQ.

See Broker Overview for a full list.

Installing Celery

Celery is on the Python Package Index (PyPI), so it can be installed with standard Python tools like pip or easy_install:

$ pip install celery
Application

The first thing you need is a Celery instance, which is called the celery application or just “app” for short. Since this instance is used as the entry-point for everything you want to do in Celery, like creating tasks and managing workers, it must be possible for other modules to import it.

In this tutorial you will keep everything contained in a single module, but for larger projects you want to create a dedicated module.

Let’s create the file tasks.py:

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

The first argument to Celery is the name of the current module, this is needed so that names can be automatically generated, the second argument is the broker keyword argument which specifies the URL of the message broker you want to use, using RabbitMQ here, which is already the default option. See Choosing a Broker above for more choices, e.g. for RabbitMQ you can use amqp://localhost, or for Redis you can use redis://localhost.

You defined a single task, called add, which returns the sum of two numbers.

Running the celery worker server

You now run the worker by executing our program with the worker argument:

$ celery -A tasks worker --loglevel=info

Note

See the Troubleshooting section if the worker does not start.

In production you will want to run the worker in the background as a daemon. To do this you need to use the tools provided by your platform, or something like supervisord (see Running the worker as a daemon for more information).

For a complete listing of the command-line options available, do:

$  celery worker --help

There are also several other commands available, and help is also available:

$ celery help
Calling the task

To call our task you can use the delay() method.

This is a handy shortcut to the apply_async() method which gives greater control of the task execution (see Calling Tasks):

>>> from tasks import add
>>> add.delay(4, 4)

The task has now been processed by the worker you started earlier, and you can verify that by looking at the workers console output.

Calling a task returns an AsyncResult instance, which can be used to check the state of the task, wait for the task to finish or get its return value (or if the task failed, the exception and traceback). But this isn’t enabled by default, and you have to configure Celery to use a result backend, which is detailed in the next section.

Keeping Results

If you want to keep track of the tasks’ states, Celery needs to store or send the states somewhere. There are several built-in result backends to choose from: SQLAlchemy/Django ORM, Memcached, Redis, AMQP (RabbitMQ), and MongoDB – or you can define your own.

For this example you will use the rpc result backend, which sends states back as transient messages. The backend is specified via the backend argument to Celery, (or via the CELERY_RESULT_BACKEND setting if you choose to use a configuration module):

app = Celery('tasks', backend='rpc://', broker='amqp://')

Or if you want to use Redis as the result backend, but still use RabbitMQ as the message broker (a popular combination):

app = Celery('tasks', backend='redis://localhost', broker='amqp://')

To read more about result backends please see Result Backends.

Now with the result backend configured, let’s call the task again. This time you’ll hold on to the AsyncResult instance returned when you call a task:

>>> result = add.delay(4, 4)

The ready() method returns whether the task has finished processing or not:

>>> result.ready()
False

You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

>>> result.get(timeout=1)
8

In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

>>> result.get(propagate=False)

If the task raised an exception you can also gain access to the original traceback:

>>> result.traceback

See celery.result for the complete result object reference.

Configuration

Celery, like a consumer appliance, doesn’t need much to be operated. It has an input and an output, where you must connect the input to a broker and maybe the output to a result backend if so wanted. But if you look closely at the back there’s a lid revealing loads of sliders, dials and buttons: this is the configuration.

The default configuration should be good enough for most uses, but there are many things to tweak so Celery works just the way you want it to. Reading about the options available is a good idea to get familiar with what can be configured. You can read about the options in the Configuration and defaults reference.

The configuration can be set on the app directly or by using a dedicated configuration module. As an example you can configure the default serializer used for serializing task payloads by changing the CELERY_TASK_SERIALIZER setting:

app.conf.CELERY_TASK_SERIALIZER = 'json'

If you are configuring many settings at once you can use update:

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Europe/Oslo',
    CELERY_ENABLE_UTC=True,
)

For larger projects using a dedicated configuration module is useful, in fact you are discouraged from hard coding periodic task intervals and task routing options, as it is much better to keep this in a centralized location, and especially for libraries it makes it possible for users to control how they want your tasks to behave, you can also imagine your SysAdmin making simple changes to the configuration in the event of system trouble.

You can tell your Celery instance to use a configuration module, by calling the app.config_from_object() method:

app.config_from_object('celeryconfig')

This module is often called “celeryconfig”, but you can use any module name.

A module named celeryconfig.py must then be available to load from the current directory or on the Python path, it could look like this:

celeryconfig.py:

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'rpc://'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Oslo'
CELERY_ENABLE_UTC = True

To verify that your configuration file works properly, and doesn’t contain any syntax errors, you can try to import it:

$ python -m celeryconfig

For a complete reference of configuration options, see Configuration and defaults.

To demonstrate the power of configuration files, this is how you would route a misbehaving task to a dedicated queue:

celeryconfig.py:

CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

Or instead of routing it you could rate limit the task instead, so that only 10 tasks of this type can be processed in a minute (10/m):

celeryconfig.py:

CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

If you are using RabbitMQ or Redis as the broker then you can also direct the workers to set a new rate limit for the task at runtime:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
    new rate limit set successfully

See Routing Tasks to read more about task routing, and the CELERY_ANNOTATIONS setting for more about annotations, or Monitoring and Management Guide for more about remote control commands, and how to monitor what your workers are doing.

Where to go from here

If you want to learn more you should continue to the Next Steps tutorial, and after that you can study the User Guide.

Troubleshooting

There’s also a troubleshooting section in the Frequently Asked Questions.

Worker does not start: Permission Error
  • If you’re using Debian, Ubuntu or other Debian-based distributions:

    Debian recently renamed the /dev/shm special file to /run/shm.

    A simple workaround is to create a symbolic link:

    # ln -s /run/shm /dev/shm
    
  • Others:

    If you provide any of the --pidfile, --logfile or --statedb arguments, then you must make sure that they point to a file/directory that is writable and readable by the user starting the worker.

Result backend does not work or tasks are always in PENDING state.

All tasks are PENDING by default, so the state would have been better named “unknown”. Celery does not update any state when a task is sent, and any task with no history is assumed to be pending (you know the task id after all).

  1. Make sure that the task does not have ignore_result enabled.

    Enabling this option will force the worker to skip updating states.

  2. Make sure the CELERY_IGNORE_RESULT setting is not enabled.

  3. Make sure that you do not have any old workers still running.

    It’s easy to start multiple workers by accident, so make sure that the previous worker is properly shutdown before you start a new one.

    An old worker that is not configured with the expected result backend may be running and is hijacking the tasks.

    The –pidfile argument can be set to an absolute path to make sure this doesn’t happen.

  4. Make sure the client is configured with the right backend.

    If for some reason the client is configured to use a different backend than the worker, you will not be able to receive the result, so make sure the backend is correct by inspecting it:

    >>> result = task.delay(…)
    >>> print(result.backend)
    

Next Steps

The First Steps with Celery guide is intentionally minimal. In this guide I will demonstrate what Celery offers in more detail, including how to add Celery support for your application and library.

This document does not document all of Celery’s features and best practices, so it’s recommended that you also read the User Guide

Using Celery in your Application
Our Project

Project layout:

proj/__init__.py
    /celery.py
    /tasks.py
proj/celery.py
from __future__ import absolute_import

from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()

In this module you created our Celery instance (sometimes referred to as the app). To use Celery within your project you simply import this instance.

  • The broker argument specifies the URL of the broker to use.

    See Choosing a Broker for more information.

  • The backend argument specifies the result backend to use,

    It’s used to keep track of task state and results. While results are disabled by default I use the amqp result backend here because I demonstrate how retrieving results work later, you may want to use a different backend for your application. They all have different strengths and weaknesses. If you don’t need results it’s better to disable them. Results can also be disabled for individual tasks by setting the @task(ignore_result=True) option.

    See Keeping Results for more information.

  • The include argument is a list of modules to import when the worker starts. You need to add our tasks module here so that the worker is able to find our tasks.

proj/tasks.py
from __future__ import absolute_import

from proj.celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)
Starting the worker

The celery program can be used to start the worker (you need to run the worker in the directory above proj):

$ celery -A proj worker -l info

When the worker starts you should see a banner and some messages:

-------------- celery@halcyon.local v3.1 (Cipater)
---- **** -----
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events:      OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.

– The broker is the URL you specifed in the broker argument in our celery module, you can also specify a different broker on the command-line by using the -b option.

Concurrency is the number of prefork worker process used to process your tasks concurrently, when all of these are busy doing work new tasks will have to wait for one of the tasks to finish before it can be processed.

The default concurrency number is the number of CPU’s on that machine (including cores), you can specify a custom number using -c option. There is no recommended value, as the optimal number depends on a number of factors, but if your tasks are mostly I/O-bound then you can try to increase it, experimentation has shown that adding more than twice the number of CPU’s is rarely effective, and likely to degrade performance instead.

Including the default prefork pool, Celery also supports using Eventlet, Gevent, and threads (see Concurrency).

Events is an option that when enabled causes Celery to send monitoring messages (events) for actions occurring in the worker. These can be used by monitor programs like celery events, and Flower - the real-time Celery monitor, which you can read about in the Monitoring and Management guide.

Queues is the list of queues that the worker will consume tasks from. The worker can be told to consume from several queues at once, and this is used to route messages to specific workers as a means for Quality of Service, separation of concerns, and emulating priorities, all described in the Routing Guide.

You can get a complete list of command-line arguments by passing in the –help flag:

$ celery worker --help

These options are described in more detailed in the Workers Guide.

Stopping the worker

To stop the worker simply hit Ctrl+C. A list of signals supported by the worker is detailed in the Workers Guide.

In the background

In production you will want to run the worker in the background, this is described in detail in the daemonization tutorial.

The daemonization scripts uses the celery multi command to start one or more workers in the background:

$ celery multi start w1 -A proj -l info
celery multi v3.1.1 (Cipater)
> Starting nodes...
    > w1.halcyon.local: OK

You can restart it too:

$ celery  multi restart w1 -A proj -l info
celery multi v3.1.1 (Cipater)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v3.1.1 (Cipater)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

or stop it:

$ celery multi stop w1 -A proj -l info

The stop command is asynchronous so it will not wait for the worker to shutdown. You will probably want to use the stopwait command instead which will ensure all currently executing tasks is completed:

$ celery multi stopwait w1 -A proj -l info

Note

celery multi doesn’t store information about workers so you need to use the same command-line arguments when restarting. Only the same pidfile and logfile arguments must be used when stopping.

By default it will create pid and log files in the current directory, to protect against multiple workers launching on top of each other you are encouraged to put these in a dedicated directory:

$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log

With the multi command you can start multiple workers, and there is a powerful command-line syntax to specify arguments for different workers too, e.g:

$ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \
    -Q default -L:4,5 debug

For more examples see the multi module in the API reference.

About the --app argument

The --app argument specifies the Celery app instance to use, it must be in the form of module.path:attribute

But it also supports a shortcut form If only a package name is specified, where it’ll try to search for the app instance, in the following order:

With --app=proj:

  1. an attribute named proj.app, or
  2. an attribute named proj.celery, or
  3. any attribute in the module proj where the value is a Celery application, or

If none of these are found it’ll try a submodule named proj.celery:

  1. an attribute named proj.celery.app, or
  2. an attribute named proj.celery.celery, or
  3. Any atribute in the module proj.celery where the value is a Celery application.

This scheme mimics the practices used in the documentation, i.e. proj:app for a single contained module, and proj.celery:app for larger projects.

Calling Tasks

You can call a task using the delay() method:

>>> add.delay(2, 2)

This method is actually a star-argument shortcut to another method called apply_async():

>>> add.apply_async((2, 2))

The latter enables you to specify execution options like the time to run (countdown), the queue it should be sent to and so on:

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

In the above example the task will be sent to a queue named lopri and the task will execute, at the earliest, 10 seconds after the message was sent.

Applying the task directly will execute the task in the current process, so that no message is sent:

>>> add(2, 2)
4

These three methods - delay(), apply_async(), and applying (__call__), represents the Celery calling API, which are also used for subtasks.

A more detailed overview of the Calling API can be found in the Calling User Guide.

Every task invocation will be given a unique identifier (an UUID), this is the task id.

The delay and apply_async methods return an AsyncResult instance, which can be used to keep track of the tasks execution state. But for this you need to enable a result backend so that the state can be stored somewhere.

Results are disabled by default because of the fact that there is no result backend that suits every application, so to choose one you need to consider the drawbacks of each individual backend. For many tasks keeping the return value isn’t even very useful, so it’s a sensible default to have. Also note that result backends are not used for monitoring tasks and workers, for that Celery uses dedicated event messages (see Monitoring and Management Guide).

If you have a result backend configured you can retrieve the return value of a task:

>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4

You can find the task’s id by looking at the id attribute:

>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

You can also inspect the exception and traceback if the task raised an exception, in fact result.get() will propagate any errors by default:

>>> res = add.delay(2)
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/celery/celery/result.py", line 113, in get
    interval=interval)
File "/opt/devel/celery/celery/backends/amqp.py", line 138, in wait_for
    raise meta['result']
TypeError: add() takes exactly 2 arguments (1 given)

If you don’t wish for the errors to propagate then you can disable that by passing the propagate argument:

>>> res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)',)

In this case it will return the exception instance raised instead, and so to check whether the task succeeded or failed you will have to use the corresponding methods on the result instance:

>>> res.failed()
True

>>> res.successful()
False

So how does it know if the task has failed or not? It can find out by looking at the tasks state:

>>> res.state
'FAILURE'

A task can only be in a single state, but it can progress through several states. The stages of a typical task can be:

PENDING -> STARTED -> SUCCESS

The started state is a special state that is only recorded if the CELERY_TRACK_STARTED setting is enabled, or if the @task(track_started=True) option is set for the task.

The pending state is actually not a recorded state, but rather the default state for any task id that is unknown, which you can see from this example:

>>> from proj.celery import app

>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'

If the task is retried the stages can become even more complex, e.g, for a task that is retried two times the stages would be:

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

To read more about task states you should see the States section in the tasks user guide.

Calling tasks is described in detail in the Calling Guide.

Canvas: Designing Workflows

You just learned how to call a task using the tasks delay method, and this is often all you need, but sometimes you may want to pass the signature of a task invocation to another process or as an argument to another function, for this Celery uses something called subtasks.

A subtask wraps the arguments and execution options of a single task invocation in a way such that it can be passed to functions or even serialized and sent across the wire.

You can create a subtask for the add task using the arguments (2, 2), and a countdown of 10 seconds like this:

>>> add.subtask((2, 2), countdown=10)
tasks.add(2, 2)

There is also a shortcut using star arguments:

>>> add.s(2, 2)
tasks.add(2, 2)
And there’s that calling API again…

Subtask instances also supports the calling API, which means that they have the delay and apply_async methods.

But there is a difference in that the subtask may already have an argument signature specified. The add task takes two arguments, so a subtask specifying two arguments would make a complete signature:

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4

But, you can also make incomplete signatures to create what we call partials:

# incomplete partial: add(?, 2)
>>> s2 = add.s(2)

s2 is now a partial subtask that needs another argument to be complete, and this can be resolved when calling the subtask:

# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10

Here you added the argument 8, which was prepended to the existing argument 2 forming a complete signature of add(8, 2).

Keyword arguments can also be added later, these are then merged with any existing keyword arguments, but with new arguments taking precedence:

>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   # debug is now False.

As stated subtasks supports the calling API, which means that:

  • subtask.apply_async(args=(), kwargs={}, **options)

    Calls the subtask with optional partial arguments and partial keyword arguments. Also supports partial execution options.

  • subtask.delay(*args, **kwargs)

    Star argument version of apply_async. Any arguments will be prepended to the arguments in the signature, and keyword arguments is merged with any existing keys.

So this all seems very useful, but what can you actually do with these? To get to that I must introduce the canvas primitives…

The Primitives

The primitives are subtasks themselves, so that they can be combined in any number of ways to compose complex workflows.

Note

These examples retrieve results, so to try them out you need to configure a result backend. The example project above already does that (see the backend argument to Celery).

Let’s look at some examples:

Groups

A group calls a list of tasks in parallel, and it returns a special result instance that lets you inspect the results as a group, and retrieve the return values in order.

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  • Partial group
>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains

Tasks can be linked together so that after one task returns the other is called:

>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

or a partial chain:

# (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

Chains can also be written like this:

>>> (add.s(4, 4) | mul.s(8))().get()
64
Chords

A chord is a group with a callback:

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
90

A group chained to another task will be automatically converted to a chord:

>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
90

Since these primitives are all of the subtask type they can be combined almost however you want, e.g:

>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

Be sure to read more about workflows in the Canvas user guide.

Routing

Celery supports all of the routing facilities provided by AMQP, but it also supports simple routing where messages are sent to named queues.

The CELERY_ROUTES setting enables you to route tasks by name and keep everything centralized in one location:

app.conf.update(
    CELERY_ROUTES = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

You can also specify the queue at runtime with the queue argument to apply_async:

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

You can then make a worker consume from this queue by specifying the -Q option:

$ celery -A proj worker -Q hipri

You may specify multiple queues by using a comma separated list, for example you can make the worker consume from both the default queue, and the hipri queue, where the default queue is named celery for historical reasons:

$ celery -A proj worker -Q hipri,celery

The order of the queues doesn’t matter as the worker will give equal weight to the queues.

To learn more about routing, including taking use of the full power of AMQP routing, see the Routing Guide.

Remote Control

If you’re using RabbitMQ (AMQP), Redis or MongoDB as the broker then you can control and inspect the worker at runtime.

For example you can see what tasks the worker is currently working on:

$ celery -A proj inspect active

This is implemented by using broadcast messaging, so all remote control commands are received by every worker in the cluster.

You can also specify one or more workers to act on the request using the --destination option, which is a comma separated list of worker host names:

$ celery -A proj inspect active --destination=celery@example.com

If a destination is not provided then every worker will act and reply to the request.

The celery inspect command contains commands that does not change anything in the worker, it only replies information and statistics about what is going on inside the worker. For a list of inspect commands you can execute:

$ celery -A proj inspect --help

Then there is the celery control command, which contains commands that actually changes things in the worker at runtime:

$ celery -A proj control --help

For example you can force workers to enable event messages (used for monitoring tasks and workers):

$ celery -A proj control enable_events

When events are enabled you can then start the event dumper to see what the workers are doing:

$ celery -A proj events --dump

or you can start the curses interface:

$ celery -A proj events

when you’re finished monitoring you can disable events again:

$ celery -A proj control disable_events

The celery status command also uses remote control commands and shows a list of online workers in the cluster:

$ celery -A proj status

You can read more about the celery command and monitoring in the Monitoring Guide.

Timezone

All times and dates, internally and in messages uses the UTC timezone.

When the worker receives a message, for example with a countdown set it converts that UTC time to local time. If you wish to use a different timezone than the system timezone then you must configure that using the CELERY_TIMEZONE setting:

app.conf.CELERY_TIMEZONE = 'Europe/London'
Optimization

The default configuration is not optimized for throughput by default, it tries to walk the middle way between many short tasks and fewer long tasks, a compromise between throughput and fair scheduling.

If you have strict fair scheduling requirements, or want to optimize for throughput then you should read the Optimizing Guide.

If you’re using RabbitMQ then you should install the librabbitmq module, which is an AMQP client implemented in C:

$ pip install librabbitmq
What to do now?

Now that you have read this document you should continue to the User Guide.

There’s also an API reference if you are so inclined.

Resources

Getting Help
Mailing list

For discussions about the usage, development, and future of celery, please join the celery-users mailing list.

IRC

Come chat with us on IRC. The #celery channel is located at the Freenode network.

Bug tracker

If you have any suggestions, bug reports or annoyances please report them to our issue tracker at http://github.com/celery/celery/issues/

Contributing

Development of celery happens at Github: http://github.com/celery/celery

You are highly encouraged to participate in the development of celery. If you don’t like Github (for some reason) you’re welcome to send regular patches.

Be sure to also read the Contributing to Celery section in the documentation.

License

This software is licensed under the New BSD License. See the LICENSE file in the top distribution directory for the full license text.

User Guide

Release:3.1
Date:January 23, 2016

Application

The Celery library must be instantiated before use, this instance is called an application (or app for short).

The application is thread-safe so that multiple Celery applications with different configurations, components and tasks can co-exist in the same process space.

Let’s create one now:

>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

The last line shows the textual representation of the application, which includes the name of the celery class (Celery), the name of the current main module (__main__), and the memory address of the object (0x100469fd0).

Main Name

Only one of these is important, and that is the main module name. Let’s look at why that is.

When you send a task message in Celery, that message will not contain any source code, but only the name of the task you want to execute. This works similarly to how host names work on the internet: every worker maintains a mapping of task names to their actual functions, called the task registry.

Whenever you define a task, that task will also be added to the local registry:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

and there you see that __main__ again; whenever Celery is not able to detect what module the function belongs to, it uses the main module name to generate the beginning of the task name.

This is only a problem in a limited set of use cases:

  1. If the module that the task is defined in is run as a program.
  2. If the application is created in the Python shell (REPL).

For example here, where the tasks module is also used to start a worker with app.worker_main():

tasks.py:

from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

When this module is executed the tasks will be named starting with “__main__”, but when the module is imported by another process, say to call a task, the tasks will be named starting with “tasks” (the real name of the module):

>>> from tasks import add
>>> add.name
tasks.add

You can specify another name for the main module:

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

See also

Names

Configuration

There are several options you can set that will change how Celery works. These options can be set directly on the app instance, or you can use a dedicated configuration module.

The configuration is available as app.conf:

>>> app.conf.CELERY_TIMEZONE
'Europe/London'

where you can also set configuration values directly:

>>> app.conf.CELERY_ENABLE_UTC = True

and update several keys at once by using the update method:

>>> app.conf.update(
...     CELERY_ENABLE_UTC=True,
...     CELERY_TIMEZONE='Europe/London',
...)

The configuration object consists of multiple dictionaries that are consulted in order:

  1. Changes made at runtime.
  2. The configuration module (if any)
  3. The default configuration (celery.app.defaults).

You can even add new default sources by using the app.add_defaults() method.

See also

Go to the Configuration reference for a complete listing of all the available settings, and their default values.

config_from_object

The app.config_from_object() method loads configuration from a configuration object.

This can be a configuration module, or any object with configuration attributes.

Note that any configuration that was previously set will be reset when config_from_object() is called. If you want to set additional configuration you should do so after.

Example 1: Using the name of a module
from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

The celeryconfig module may then look like this:

celeryconfig.py:

CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Europe/London'
Example 2: Using a configuration module

Tip

Using the name of a module is recomended as this means that the module doesn’t need to be serialized when the prefork pool is used. If you’re experiencing configuration pickle errors then please try using the name of a module instead.

from celery import Celery

app = Celery()
import celeryconfig
app.config_from_object(celeryconfig)
Example 3: Using a configuration class/object
from celery import Celery

app = Celery()

class Config:
    CELERY_ENABLE_UTC = True
    CELERY_TIMEZONE = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
#   app.config_from_object('module:Config')
config_from_envvar

The app.config_from_envvar() takes the configuration module name from an environment variable

For example – to load configuration from a module specified in the environment variable named CELERY_CONFIG_MODULE:

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

You can then specify the configuration module to use via the environment:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info
Censored configuration

If you ever want to print out the configuration, as debugging information or similar, you may also want to filter out sensitive information like passwords and API keys.

Celery comes with several utilities used for presenting the configuration, one is humanize():

>>> app.conf.humanize(with_defaults=False, censored=True)

This method returns the configuration as a tabulated string. This will only contain changes to the configuration by default, but you can include the default keys and values by changing the with_defaults argument.

If you instead want to work with the configuration as a dictionary, then you can use the table() method:

>>> app.conf.table(with_defaults=False, censored=True)

Please note that Celery will not be able to remove all sensitive information, as it merely uses a regular expression to search for commonly named keys. If you add custom settings containing sensitive information you should name the keys using a name that Celery identifies as secret.

A configuration setting will be censored if the name contains any of these substrings:

API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE

Laziness

The application instance is lazy, meaning that it will not be evaluated until something is actually needed.

Creating a Celery instance will only do the following:

  1. Create a logical clock instance, used for events.
  2. Create the task registry.
  3. Set itself as the current app (but not if the set_as_current argument was disabled)
  4. Call the app.on_init() callback (does nothing by default).

The app.task() decorator does not actually create the tasks at the point when it’s called, instead it will defer the creation of the task to happen either when the task is used, or after the application has been finalized,

This example shows how the task is not created until you use the task, or access an attribute (in this case repr()):

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

Finalization of the app happens either explicitly by calling app.finalize() – or implicitly by accessing the app.tasks attribute.

Finalizing the object will:

  1. Copy tasks that must be shared between apps

    Tasks are shared by default, but if the shared argument to the task decorator is disabled, then the task will be private to the app it’s bound to.

  2. Evaluate all pending task decorators.

  3. Make sure all tasks are bound to the current app.

    Tasks are bound to an app so that they can read default values from the configuration.

The “default app”.

Celery did not always work this way, it used to be that there was only a module-based API, and for backwards compatibility the old API is still there.

Celery always creates a special app that is the “default app”, and this is used if no custom application has been instantiated.

The celery.task module is there to accommodate the old API, and should not be used if you use a custom app. You should always use the methods on the app instance, not the module based API.

For example, the old Task base class enables many compatibility features where some may be incompatible with newer features, such as task methods:

from celery.task import Task   # << OLD Task base class.

from celery import Task        # << NEW base class.

The new base class is recommended even if you use the old module-based API.

Breaking the chain

While it’s possible to depend on the current app being set, the best practice is to always pass the app instance around to anything that needs it.

I call this the “app chain”, since it creates a chain of instances depending on the app being passed.

The following example is considered bad practice:

from celery import current_app

class Scheduler(object):

    def run(self):
        app = current_app

Instead it should take the app as an argument:

class Scheduler(object):

    def __init__(self, app):
        self.app = app

Internally Celery uses the celery.app.app_or_default() function so that everything also works in the module-based compatibility API

from celery.app import app_or_default

class Scheduler(object):
    def __init__(self, app=None):
        self.app = app_or_default(app)

In development you can set the CELERY_TRACE_APP environment variable to raise an exception if the app chain breaks:

$ CELERY_TRACE_APP=1 celery worker -l info

Evolving the API

Celery has changed a lot in the 3 years since it was initially created.

For example, in the beginning it was possible to use any callable as a task:

def hello(to):
    return 'hello {0}'.format(to)

>>> from celery.execute import apply_async

>>> apply_async(hello, ('world!', ))

or you could also create a Task class to set certain options, or override other behavior

from celery.task import Task
from celery.registry import tasks

class Hello(Task):
    send_error_emails = True

    def run(self, to):
        return 'hello {0}'.format(to)
tasks.register(Hello)

>>> Hello.delay('world!')

Later, it was decided that passing arbitrary call-ables was an anti-pattern, since it makes it very hard to use serializers other than pickle, and the feature was removed in 2.0, replaced by task decorators:

from celery.task import task

@task(send_error_emails=True)
def hello(x):
    return 'hello {0}'.format(to)
Abstract Tasks

All tasks created using the task() decorator will inherit from the application’s base Task class.

You can specify a different base class with the base argument:

@app.task(base=OtherTask):
def add(x, y):
    return x + y

To create a custom task class you should inherit from the neutral base class: celery.Task.

from celery import Task

class DebugTask(Task):
    abstract = True

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)

Tip

If you override the tasks __call__ method, then it’s very important that you also call super so that the base call method can set up the default request used when a task is called directly.

The neutral base class is special because it’s not bound to any specific app yet. Concrete subclasses of this class will be bound, so you should always mark generic base classes as abstract

Once a task is bound to an app it will read configuration to set default values and so on.

It’s also possible to change the default base class for an application by changing its app.Task() attribute:

>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
...    abstract = True
...    send_error_emails = True

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
 <unbound MyBaseTask>,
 <unbound Task>,
 <type 'object'>]

Tasks

Tasks are the building blocks of Celery applications.

A task is a class that can be created out of any callable. It performs dual roles in that it defines both what happens when a task is called (sends a message), and what happens when a worker receives that message.

Every task class has a unique name, and this name is referenced in messages so that the worker can find the right function to execute.

A task message does not disappear until the message has been acknowledged by a worker. A worker can reserve many messages in advance and even if the worker is killed – caused by power failure or otherwise – the message will be redelivered to another worker.

Ideally task functions should be idempotent, which means that the function will not cause unintented effects even if called multiple times with the same arguments. Since the worker cannot detect if your tasks are idempotent, the default behavior is to acknowledge the message in advance, before it’s executed, so that a task that has already been started is never executed again..

If your task is idempotent you can set the acks_late option to have the worker acknowledge the message after the task returns instead. See also the FAQ entry Should I use retry or acks_late?.

In this chapter you will learn all about defining tasks, and this is the table of contents:

Basics

You can easily create a task from any callable by using the task() decorator:

from .models import User

@app.task
def create_user(username, password):
    User.objects.create(username=username, password=password)

There are also many options that can be set for the task, these can be specified as arguments to the decorator:

@app.task(serializer='json')
def create_user(username, password):
    User.objects.create(username=username, password=password)
Names

Every task must have a unique name, and a new name will be generated out of the function name if a custom name is not provided.

For example:

>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
...     return x + y

>>> add.name
'sum-of-two-numbers'

A best practice is to use the module name as a namespace, this way names won’t collide if there’s already a task with that name defined in another module.

>>> @app.task(name='tasks.add')
>>> def add(x, y):
...     return x + y

You can tell the name of the task by investigating its name attribute:

>>> add.name
'tasks.add'

Which is exactly the name that would have been generated anyway, if the module name is “tasks.py”:

tasks.py:

@app.task
def add(x, y):
    return x + y

>>> from tasks import add
>>> add.name
'tasks.add'
Automatic naming and relative imports

Relative imports and automatic name generation do not go well together, so if you’re using relative imports you should set the name explicitly.

For example if the client imports the module “myapp.tasks” as ”.tasks”, and the worker imports the module as “myapp.tasks”, the generated names won’t match and an NotRegistered error will be raised by the worker.

This is also the case when using Django and using project.myapp-style naming in INSTALLED_APPS:

INSTALLED_APPS = ['project.myapp']

If you install the app under the name project.myapp then the tasks module will be imported as project.myapp.tasks, so you must make sure you always import the tasks using the same name:

>>> from project.myapp.tasks import mytask   # << GOOD

>>> from myapp.tasks import mytask    # << BAD!!!

The second example will cause the task to be named differently since the worker and the client imports the modules under different names:

>>> from project.myapp.tasks import mytask
>>> mytask.name
'project.myapp.tasks.mytask'

>>> from myapp.tasks import mytask
>>> mytask.name
'myapp.tasks.mytask'

So for this reason you must be consistent in how you import modules, which is also a Python best practice.

Similarly, you should not use old-style relative imports:

from module import foo   # BAD!

from proj.module import foo  # GOOD!

New-style relative imports are fine and can be used:

from .module import foo  # GOOD!

If you want to use Celery with a project already using these patterns extensively and you don’t have the time to refactor the existing code then you can consider specifying the names explicitly instead of relying on the automatic naming:

@task(name='proj.tasks.add')
def add(x, y):
    return x + y
Context

request contains information and state related to the executing task.

The request defines the following attributes:

id:The unique id of the executing task.
group:The unique id a group, if this task is a member.
chord:The unique id of the chord this task belongs to (if the task is part of the header).
args:Positional arguments.
kwargs:Keyword arguments.
retries:How many times the current task has been retried. An integer starting at 0.
is_eager:Set to True if the task is executed locally in the client, and not by a worker.
eta:The original ETA of the task (if any). This is in UTC time (depending on the CELERY_ENABLE_UTC setting).
expires:The original expiry time of the task (if any). This is in UTC time (depending on the CELERY_ENABLE_UTC setting).
logfile:The file the worker logs to. See Logging.
loglevel:The current log level used.
hostname:Hostname of the worker instance executing the task.
delivery_info:Additional message delivery information. This is a mapping containing the exchange and routing key used to deliver this task. Used by e.g. retry() to resend the task to the same destination queue. Availability of keys in this dict depends on the message broker used.
called_directly:
 This flag is set to true if the task was not executed by the worker.
callbacks:A list of subtasks to be called if this task returns successfully.
errback:A list of subtasks to be called if this task fails.
utc:Set to true the caller has utc enabled (CELERY_ENABLE_UTC).

New in version 3.1.

headers:Mapping of message headers (may be None).
reply_to:Where to send reply to (queue name).
correlation_id:Usually the same as the task id, often used in amqp to keep track of what a reply is for.

An example task accessing information in the context is:

@app.task(bind=True)
def dump_context(self, x, y):
    print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
            self.request))

The bind argument means that the function will be a “bound method” so that you can access attributes and methods on the task type instance.

Logging

The worker will automatically set up logging for you, or you can configure logging manually.

A special logger is available named “celery.task”, you can inherit from this logger to automatically get the task name and unique id as part of the logs.

The best practice is to create a common logger for all of your tasks at the top of your module:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

Celery uses the standard Python logger library, for which documentation can be found in the logging module.

You can also use print(), as anything written to standard out/-err will be redirected to the logging system (you can disable this, see CELERY_REDIRECT_STDOUTS).

Note

The worker will not update the redirection if you create a logger instance somewhere in your task or task module.

If you want to redirect sys.stdout and sys.stderr to a custom logger you have to enable this manually, for example:

import sys

logger = get_task_logger(__name__)

@app.task(bind=True)
def add(self, x, y):
    old_outs = sys.stdout, sys.stderr
    rlevel = self.app.conf.CELERY_REDIRECT_STDOUTS_LEVEL
    try:
        self.app.log.redirect_stdouts_to_logger(logger, rlevel)
        print('Adding {0} + {1}'.format(x, y))
        return x + y
    finally:
        sys.stdout, sys.stderr = old_outs
Retrying

retry() can be used to re-execute the task, for example in the event of recoverable errors.

When you call retry it will send a new message, using the same task-id, and it will take care to make sure the message is delivered to the same queue as the originating task.

When a task is retried this is also recorded as a task state, so that you can track the progress of the task using the result instance (see States).

Here’s an example using retry:

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

Note

The retry() call will raise an exception so any code after the retry will not be reached. This is the Retry exception, it is not handled as an error but rather as a semi-predicate to signify to the worker that the task is to be retried, so that it can store the correct state when a result backend is enabled.

This is normal operation and always happens unless the throw argument to retry is set to False.

The bind argument to the task decorator will give access to self (the task type instance).

The exc method is used to pass exception information that is used in logs, and when storing task results. Both the exception and the traceback will be available in the task state (if a result backend is enabled).

If the task has a max_retries value the current exception will be re-raised if the max number of retries has been exceeded, but this will not happen if:

  • An exc argument was not given.

    In this case the MaxRetriesExceeded exception will be raised.

  • There is no current exception

    If there’s no original exception to re-raise the exc argument will be used instead, so:

    self.retry(exc=Twitter.LoginError())
    

    will raise the exc argument given.

Using a custom retry delay

When a task is to be retried, it can wait for a given amount of time before doing so, and the default delay is defined by the default_retry_delay attribute. By default this is set to 3 minutes. Note that the unit for setting the delay is in seconds (int or float).

You can also provide the countdown argument to retry() to override this default.

@app.task(bind=True, default_retry_delay=30 * 60)  # retry in 30 minutes.
def add(self, x, y):
    try:
        …
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)  # override the default and
                                                 # retry in 1 minute
List of Options

The task decorator can take a number of options that change the way the task behaves, for example you can set the rate limit for a task using the rate_limit option.

Any keyword argument passed to the task decorator will actually be set as an attribute of the resulting task class, and this is a list of the built-in attributes.

General
Task.name

The name the task is registered as.

You can set this name manually, or a name will be automatically generated using the module and class name. See Names.

Task.request

If the task is being executed this will contain information about the current request. Thread local storage is used.

See Context.

Task.abstract

Abstract classes are not registered, but are used as the base class for new task types.

Task.max_retries

The maximum number of attempted retries before giving up. If the number of retries exceeds this value a MaxRetriesExceeded exception will be raised. NOTE: You have to call retry() manually, as it will not automatically retry on exception..

The default value is 3. A value of None will disable the retry limit and the task will retry forever until it succeeds.

Task.throws

Optional tuple of expected error classes that should not be regarded as an actual error.

Errors in this list will be reported as a failure to the result backend, but the worker will not log the event as an error, and no traceback will be included.

Example:

@task(throws=(KeyError, HttpNotFound)):
def get_foo():
    something()

Error types:

  • Expected errors (in Task.throws)

    Logged with severity INFO, traceback excluded.

  • Unexpected errors

    Logged with severity ERROR, with traceback included.

Task.trail

By default the task will keep track of subtasks called (task.request.children), and this will be stored with the final result in the result backend, available to the client via AsyncResult.children.

This list of task can grow quite big for tasks starting many subtasks, and you can set this attribute to False to disable it.

Task.default_retry_delay

Default time in seconds before a retry of the task should be executed. Can be either int or float. Default is a 3 minute delay.

Task.rate_limit

Set the rate limit for this task type which limits the number of tasks that can be run in a given time frame. Tasks will still complete when a rate limit is in effect, but it may take some time before it’s allowed to start.

If this is None no rate limit is in effect. If it is an integer or float, it is interpreted as “tasks per second”.

The rate limits can be specified in seconds, minutes or hours by appending “/s”, “/m” or “/h” to the value. Tasks will be evenly distributed over the specified time frame.

Example: “100/m” (hundred tasks a minute). This will enforce a minimum delay of 600ms between starting two tasks on the same worker instance.

Default is the CELERY_DEFAULT_RATE_LIMIT setting, which if not specified means rate limiting for tasks is disabled by default.

Note that this is a per worker instance rate limit, and not a global rate limit. To enforce a global rate limit (e.g. for an API with a maximum number of requests per second), you must restrict to a given queue.

Task.time_limit

The hard time limit, in seconds, for this task. If not set then the workers default will be used.

Task.soft_time_limit

The soft time limit for this task. If not set then the workers default will be used.

Task.ignore_result

Don’t store task state. Note that this means you can’t use AsyncResult to check if the task is ready, or get its return value.

Task.store_errors_even_if_ignored

If True, errors will be stored even if the task is configured to ignore results.

Task.send_error_emails

Send an email whenever a task of this type fails. Defaults to the CELERY_SEND_TASK_ERROR_EMAILS setting. See Error E-Mails for more information.

Task.ErrorMail

If the sending of error emails is enabled for this task, then this is the class defining the logic to send error mails.

Task.serializer

A string identifying the default serialization method to use. Defaults to the CELERY_TASK_SERIALIZER setting. Can be pickle, json, yaml, or any custom serialization methods that have been registered with kombu.serialization.registry.

Please see Serializers for more information.

Task.compression

A string identifying the default compression scheme to use.

Defaults to the CELERY_MESSAGE_COMPRESSION setting. Can be gzip, or bzip2, or any custom compression schemes that have been registered with the kombu.compression registry.

Please see Compression for more information.

Task.backend

The result store backend to use for this task. An instance of one of the backend classes in celery.backends. Defaults to app.backend which is defined by the CELERY_RESULT_BACKEND setting.

Task.acks_late

If set to True messages for this task will be acknowledged after the task has been executed, not just before, which is the default behavior.

Note that this means the task may be executed twice if the worker crashes in the middle of execution, which may be acceptable for some applications.

The global default can be overridden by the CELERY_ACKS_LATE setting.

Task.track_started

If True the task will report its status as “started” when the task is executed by a worker. The default value is False as the normal behaviour is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried. Having a “started” status can be useful for when there are long running tasks and there is a need to report which task is currently running.

The host name and process id of the worker executing the task will be available in the state metadata (e.g. result.info[‘pid’])

The global default can be overridden by the CELERY_TRACK_STARTED setting.

See also

The API reference for Task.

States

Celery can keep track of the tasks current state. The state also contains the result of a successful task, or the exception and traceback information of a failed task.

There are several result backends to choose from, and they all have different strengths and weaknesses (see Result Backends).

During its lifetime a task will transition through several possible states, and each state may have arbitrary metadata attached to it. When a task moves into a new state the previous state is forgotten about, but some transitions can be deducted, (e.g. a task now in the FAILED state, is implied to have been in the STARTED state at some point).

There are also sets of states, like the set of FAILURE_STATES, and the set of READY_STATES.

The client uses the membership of these sets to decide whether the exception should be re-raised (PROPAGATE_STATES), or whether the state can be cached (it can if the task is ready).

You can also define Custom states.

Result Backends

If you want to keep track of tasks or need the return values, then Celery must store or send the states somewhere so that they can be retrieved later. There are several built-in result backends to choose from: SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), MongoDB, and Redis – or you can define your own.

No backend works well for every use case. You should read about the strengths and weaknesses of each backend, and choose the most appropriate for your needs.

RPC Result Backend (RabbitMQ/QPid)

The RPC result backend (rpc://) is special as it does not actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once, and only by the client that initiated the task. Two different processes can not wait for the same result.

Even with that limitation, it is an excellent choice if you need to receive state changes in real-time. Using messaging means the client does not have to poll for new states.

The messages are transient (non-persistent) by default, so the results will disappear if the broker restarts. You can configure the result backend to send persistent messages using the CELERY_RESULT_PERSISTENT setting.

Database Result Backend

Keeping state in the database can be convenient for many, especially for web applications with a database already in place, but it also comes with limitations.

  • Polling the database for new states is expensive, and so you should increase the polling intervals of operations such as result.get().

  • Some databases use a default transaction isolation level that is not suitable for polling tables for changes.

    In MySQL the default transaction isolation level is REPEATABLE-READ, which means the transaction will not see changes by other transactions until the transaction is committed. It is recommended that you change to the READ-COMMITTED isolation level.

Built-in States
PENDING

Task is waiting for execution or unknown. Any task id that is not known is implied to be in the pending state.

STARTED

Task has been started. Not reported by default, to enable please see app.Task.track_started.

metadata:pid and hostname of the worker process executing the task.
SUCCESS

Task has been successfully executed.

metadata:result contains the return value of the task.
propagates:Yes
ready:Yes
FAILURE

Task execution resulted in failure.

metadata:result contains the exception occurred, and traceback contains the backtrace of the stack at the point when the exception was raised.
propagates:Yes
RETRY

Task is being retried.

metadata:result contains the exception that caused the retry, and traceback contains the backtrace of the stack at the point when the exceptions was raised.
propagates:No
REVOKED

Task has been revoked.

propagates:Yes
Custom states

You can easily define your own states, all you need is a unique name. The name of the state is usually an uppercase string. As an example you could have a look at abortable tasks which defines its own custom ABORTED state.

Use update_state() to update a task’s state:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

Here I created the state “PROGRESS”, which tells any application aware of this state that the task is currently in progress, and also where it is in the process by having current and total counts as part of the state metadata. This can then be used to create e.g. progress bars.

Creating pickleable exceptions

A rarely known Python fact is that exceptions must conform to some simple rules to support being serialized by the pickle module.

Tasks that raise exceptions that are not pickleable will not work properly when Pickle is used as the serializer.

To make sure that your exceptions are pickleable the exception MUST provide the original arguments it was instantiated with in its .args attribute. The simplest way to ensure this is to have the exception call Exception.__init__.

Let’s look at some examples that work, and one that doesn’t:

# OK:
class HttpError(Exception):
    pass

# BAD:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code

# OK:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code
        Exception.__init__(self, status_code)  # <-- REQUIRED

So the rule is: For any exception that supports custom arguments *args, Exception.__init__(self, *args) must be used.

There is no special support for keyword arguments, so if you want to preserve keyword arguments when the exception is unpickled you have to pass them as regular args:

class HttpError(Exception):

    def __init__(self, status_code, headers=None, body=None):
        self.status_code = status_code
        self.headers = headers
        self.body = body

        super(HttpError, self).__init__(status_code, headers, body)
Semipredicates

The worker wraps the task in a tracing function which records the final state of the task. There are a number of exceptions that can be used to signal this function to change how it treats the return of the task.

Ignore

The task may raise Ignore to force the worker to ignore the task. This means that no state will be recorded for the task, but the message is still acknowledged (removed from queue).

This can be used if you want to implement custom revoke-like functionality, or manually store the result of a task.

Example keeping revoked tasks in a Redis set:

from celery.exceptions import Ignore

@app.task(bind=True)
def some_task(self):
    if redis.ismember('tasks.revoked', self.request.id):
        raise Ignore()

Example that stores results manually:

from celery import states
from celery.exceptions import Ignore

@app.task(bind=True)
def get_tweets(self, user):
    timeline = twitter.get_timeline(user)
    if not self.request.called_directly:
        self.update_state(state=states.SUCCESS, meta=timeline)
    raise Ignore()
Reject

The task may raise Reject to reject the task message using AMQPs basic_reject method. This will not have any effect unless Task.acks_late is enabled.

Rejecting a message has the same effect as acking it, but some brokers may implement additional functionality that can be used. For example RabbitMQ supports the concept of Dead Letter Exchanges where a queue can be configured to use a dead letter exchange that rejected messages are redelivered to.

Reject can also be used to requeue messages, but please be very careful when using this as it can easily result in an infinite message loop.

Example using reject when a task causes an out of memory condition:

import errno
from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def render_scene(self, path):
    file = get_file(path)
    try:
        renderer.render_scene(file)

    # if the file is too big to fit in memory
    # we reject it so that it's redelivered to the dead letter exchange
    # and we can manually inspect the situation.
    except MemoryError as exc:
        raise Reject(exc, requeue=False)
    except OSError as exc:
        if exc.errno == errno.ENOMEM:
            raise Reject(exc, requeue=False)

    # For any other error we retry after 10 seconds.
    except Exception as exc:
        raise self.retry(exc, countdown=10)

Example requeuing the message:

from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def requeues(self):
    if not self.request.delivery_info['redelivered']:
        raise Reject('no reason', requeue=True)
    print('received two times')

Consult your broker documentation for more details about the basic_reject method.

Retry

The Retry exception is raised by the Task.retry method to tell the worker that the task is being retried.

Custom task classes

All tasks inherit from the app.Task class. The run() method becomes the task body.

As an example, the following code,

@app.task
def add(x, y):
    return x + y

will do roughly this behind the scenes:

class _AddTask(app.Task):

    def run(self, x, y):
        return x + y
add = app.tasks[_AddTask.name]
Instantiation

A task is not instantiated for every request, but is registered in the task registry as a global instance.

This means that the __init__ constructor will only be called once per process, and that the task class is semantically closer to an Actor.

If you have a task,

from celery import Task

class NaiveAuthenticateServer(Task):

    def __init__(self):
        self.users = {'george': 'password'}

    def run(self, username, password):
        try:
            return self.users[username] == password
        except KeyError:
            return False

And you route every request to the same process, then it will keep state between requests.

This can also be useful to cache resources, e.g. a base Task class that caches a database connection:

from celery import Task

class DatabaseTask(Task):
    abstract = True
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db

that can be added to tasks like this:

@app.task(base=DatabaseTask)
def process_rows():
    for row in process_rows.db.table.all():
        …

The db attribute of the process_rows task will then always stay the same in each process.

Abstract classes

Abstract classes are not registered, but are used as the base class for new task types.

from celery import Task

class DebugTask(Task):
    abstract = True

    def after_return(self, *args, **kwargs):
        print('Task returned: {0!r}'.format(self.request))


@app.task(base=DebugTask)
def add(x, y):
    return x + y
Handlers
after_return(self, status, retval, task_id, args, kwargs, einfo)

Handler called after the task returns.

Parameters:
  • status – Current task state.
  • retval – Task return value/exception.
  • task_id – Unique id of the task.
  • args – Original arguments for the task that returned.
  • kwargs – Original keyword arguments for the task that returned.
  • einfoExceptionInfo instance, containing the traceback (if any).

The return value of this handler is ignored.

on_failure(self, exc, task_id, args, kwargs, einfo)

This is run by the worker when the task fails.

Parameters:
  • exc – The exception raised by the task.
  • task_id – Unique id of the failed task.
  • args – Original arguments for the task that failed.
  • kwargs – Original keyword arguments for the task that failed.
  • einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

on_retry(self, exc, task_id, args, kwargs, einfo)

This is run by the worker when the task is to be retried.

Parameters:
  • exc – The exception sent to retry().
  • task_id – Unique id of the retried task.
  • args – Original arguments for the retried task.
  • kwargs – Original keyword arguments for the retried task.
  • einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

on_success(self, retval, task_id, args, kwargs)

Run by the worker if the task executes successfully.

Parameters:
  • retval – The return value of the task.
  • task_id – Unique id of the executed task.
  • args – Original arguments for the executed task.
  • kwargs – Original keyword arguments for the executed task.

The return value of this handler is ignored.

How it works

Here come the technical details. This part isn’t something you need to know, but you may be interested.

All defined tasks are listed in a registry. The registry contains a list of task names and their task classes. You can investigate this registry yourself:

>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
    <@task: celery.chord_unlock>,
 'celery.backend_cleanup':
    <@task: celery.backend_cleanup>,
 'celery.chord':
    <@task: celery.chord>}

This is the list of tasks built-in to celery. Note that tasks will only be registered when the module they are defined in is imported.

The default loader imports any modules listed in the CELERY_IMPORTS setting.

The entity responsible for registering your task in the registry is the metaclass: TaskType.

If you want to register your task manually you can mark the task as abstract:

class MyTask(Task):
    abstract = True

This way the task won’t be registered, but any task inheriting from it will be.

When tasks are sent, no actual function code is sent with it, just the name of the task to execute. When the worker then receives the message it can look up the name in its task registry to find the execution code.

This means that your workers should always be updated with the same software as the client. This is a drawback, but the alternative is a technical challenge that has yet to be solved.

Tips and Best Practices
Ignore results you don’t want

If you don’t care about the results of a task, be sure to set the ignore_result option, as storing results wastes time and resources.

@app.task(ignore_result=True)
def mytask(…):
    something()

Results can even be disabled globally using the CELERY_IGNORE_RESULT setting.

Disable rate limits if they’re not used

Disabling rate limits altogether is recommended if you don’t have any tasks using them. This is because the rate limit subsystem introduces quite a lot of complexity.

Set the CELERY_DISABLE_RATE_LIMITS setting to globally disable rate limits:

CELERY_DISABLE_RATE_LIMITS = True

You find additional optimization tips in the Optimizing Guide.

Avoid launching synchronous subtasks

Having a task wait for the result of another task is really inefficient, and may even cause a deadlock if the worker pool is exhausted.

Make your design asynchronous instead, for example by using callbacks.

Bad:

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get()
    info = parse_page.delay(url, page).get()
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(url, page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)

Good:

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()

@app.task()
def fetch_page(url):
    return myhttplib.get(url)

@app.task()
def parse_page(page):
    return myparser.parse_document(page)

@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

Here I instead created a chain of tasks by linking together different subtask()‘s. You can read about chains and other powerful constructs at Canvas: Designing Workflows.

Performance and Strategies
Granularity

The task granularity is the amount of computation needed by each subtask. In general it is better to split the problem up into many small tasks rather than have a few long running tasks.

With smaller tasks you can process more tasks in parallel and the tasks won’t run long enough to block the worker from processing other waiting tasks.

However, executing a task does have overhead. A message needs to be sent, data may not be local, etc. So if the tasks are too fine-grained the additional overhead may not be worth it in the end.

See also

The book Art of Concurrency has a section dedicated to the topic of task granularity [AOC1].

[AOC1]Breshears, Clay. Section 2.2.1, “The Art of Concurrency”. O’Reilly Media, Inc. May 15, 2009. ISBN-13 978-0-596-52153-0.
Data locality

The worker processing the task should be as close to the data as possible. The best would be to have a copy in memory, the worst would be a full transfer from another continent.

If the data is far away, you could try to run another worker at location, or if that’s not possible - cache often used data, or preload data you know is going to be used.

The easiest way to share data between workers is to use a distributed cache system, like memcached.

See also

The paper Distributed Computing Economics by Jim Gray is an excellent introduction to the topic of data locality.

State

Since celery is a distributed system, you can’t know in which process, or on what machine the task will be executed. You can’t even know if the task will run in a timely manner.

The ancient async sayings tells us that “asserting the world is the responsibility of the task”. What this means is that the world view may have changed since the task was requested, so the task is responsible for making sure the world is how it should be; If you have a task that re-indexes a search engine, and the search engine should only be re-indexed at maximum every 5 minutes, then it must be the tasks responsibility to assert that, not the callers.

Another gotcha is Django model objects. They shouldn’t be passed on as arguments to tasks. It’s almost always better to re-fetch the object from the database when the task is running instead, as using old data may lead to race conditions.

Imagine the following scenario where you have an article and a task that automatically expands some abbreviations in it:

class Article(models.Model):
    title = models.CharField()
    body = models.TextField()

@app.task
def expand_abbreviations(article):
    article.body.replace('MyCorp', 'My Corporation')
    article.save()

First, an author creates an article and saves it, then the author clicks on a button that initiates the abbreviation task:

>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)

Now, the queue is very busy, so the task won’t be run for another 2 minutes. In the meantime another author makes changes to the article, so when the task is finally run, the body of the article is reverted to the old version because the task had the old body in its argument.

Fixing the race condition is easy, just use the article id instead, and re-fetch the article in the task body:

@app.task
def expand_abbreviations(article_id):
    article = Article.objects.get(id=article_id)
    article.body.replace('MyCorp', 'My Corporation')
    article.save()

>>> expand_abbreviations(article_id)

There might even be performance benefits to this approach, as sending large messages may be expensive.

Database transactions

Let’s have a look at another example:

from django.db import transaction

@transaction.commit_on_success
def create_article(request):
    article = Article.objects.create(…)
    expand_abbreviations.delay(article.pk)

This is a Django view creating an article object in the database, then passing the primary key to a task. It uses the commit_on_success decorator, which will commit the transaction when the view returns, or roll back if the view raises an exception.

There is a race condition if the task starts executing before the transaction has been committed; The database object does not exist yet!

The solution is to always commit transactions before sending tasks depending on state from the current transaction:

@transaction.commit_manually
def create_article(request):
    try:
        article = Article.objects.create(…)
    except:
        transaction.rollback()
        raise
    else:
        transaction.commit()
        expand_abbreviations.delay(article.pk)

Note

Django 1.6 (and later) now enables autocommit mode by default, and commit_on_success/commit_manually are deprecated.

This means each SQL query is wrapped and executed in individual transactions, making it less likely to experience the problem described above.

However, enabling ATOMIC_REQUESTS on the database connection will bring back the transaction-per-request model and the race condition along with it. In this case, the simple solution is using the @transaction.non_atomic_requests decorator to go back to autocommit for that view only.

Example

Let’s take a real world example: a blog where comments posted need to be filtered for spam. When the comment is created, the spam filter runs in the background, so the user doesn’t have to wait for it to finish.

I have a Django blog application allowing comments on blog posts. I’ll describe parts of the models/views and tasks for this application.

blog/models.py

The comment model looks like this:

from django.db import models
from django.utils.translation import ugettext_lazy as _


class Comment(models.Model):
    name = models.CharField(_('name'), max_length=64)
    email_address = models.EmailField(_('email address'))
    homepage = models.URLField(_('home page'),
                               blank=True, verify_exists=False)
    comment = models.TextField(_('comment'))
    pub_date = models.DateTimeField(_('Published date'),
                                    editable=False, auto_add_now=True)
    is_spam = models.BooleanField(_('spam?'),
                                  default=False, editable=False)

    class Meta:
        verbose_name = _('comment')
        verbose_name_plural = _('comments')

In the view where the comment is posted, I first write the comment to the database, then I launch the spam filter task in the background.

blog/views.py
from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response

from blog import tasks
from blog.models import Comment


class CommentForm(forms.ModelForm):

    class Meta:
        model = Comment


def add_comment(request, slug, template_name='comments/create.html'):
    post = get_object_or_404(Entry, slug=slug)
    remote_addr = request.META.get('REMOTE_ADDR')

    if request.method == 'post':
        form = CommentForm(request.POST, request.FILES)
        if form.is_valid():
            comment = form.save()
            # Check spam asynchronously.
            tasks.spam_filter.delay(comment_id=comment.id,
                                    remote_addr=remote_addr)
            return HttpResponseRedirect(post.get_absolute_url())
    else:
        form = CommentForm()

    context = RequestContext(request, {'form': form})
    return render_to_response(template_name, context_instance=context)

To filter spam in comments I use Akismet, the service used to filter spam in comments posted to the free weblog platform Wordpress. Akismet is free for personal use, but for commercial use you need to pay. You have to sign up to their service to get an API key.

To make API calls to Akismet I use the akismet.py library written by Michael Foord.

blog/tasks.py
from celery import Celery

from akismet import Akismet

from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site

from blog.models import Comment


app = Celery(broker='amqp://')


@app.task
def spam_filter(comment_id, remote_addr=None):
    logger = spam_filter.get_logger()
    logger.info('Running spam filter for comment %s', comment_id)

    comment = Comment.objects.get(pk=comment_id)
    current_domain = Site.objects.get_current().domain
    akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
    if not akismet.verify_key():
        raise ImproperlyConfigured('Invalid AKISMET_KEY')


    is_spam = akismet.comment_check(user_ip=remote_addr,
                        comment_content=comment.comment,
                        comment_author=comment.name,
                        comment_author_email=comment.email_address)
    if is_spam:
        comment.is_spam = True
        comment.save()

    return is_spam

Calling Tasks

Basics

This document describes Celery’s uniform “Calling API” used by task instances and the canvas.

The API defines a standard set of execution options, as well as three methods:

  • apply_async(args[, kwargs[, …]])

    Sends a task message.

  • delay(*args, **kwargs)

    Shortcut to send a task message, but does not support execution options.

  • calling (__call__)

    Applying an object supporting the calling API (e.g. add(2, 2)) means that the task will be executed in the current process, and not by a worker (a message will not be sent).

Quick Cheat Sheet

  • T.delay(arg, kwarg=value)

    always a shortcut to .apply_async.

  • T.apply_async((arg, ), {'kwarg': value})

  • T.apply_async(countdown=10)

    executes 10 seconds from now.

  • T.apply_async(eta=now + timedelta(seconds=10))

    executes 10 seconds from now, specifed using eta

  • T.apply_async(countdown=60, expires=120)

    executes in one minute from now, but expires after 2 minutes.

  • T.apply_async(expires=now + timedelta(days=2))

    expires in 2 days, set using datetime.

Example

The delay() method is convenient as it looks like calling a regular function:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

Using apply_async() instead you have to write:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

So delay is clearly convenient, but if you want to set additional execution options you have to use apply_async.

The rest of this document will go into the task execution options in detail. All examples use a task called add, returning the sum of two arguments:

@app.task
def add(x, y):
    return x + y

There’s another way…

You will learn more about this later while reading about the Canvas, but subtask‘s are objects used to pass around the signature of a task invocation, (for example to send it over the network), and they also support the Calling API:

task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
Linking (callbacks/errbacks)

Celery supports linking tasks together so that one task follows another. The callback task will be applied with the result of the parent task as a partial argument:

add.apply_async((2, 2), link=add.s(16))

Here the result of the first task (4) will be sent to a new task that adds 16 to the previous result, forming the expression (2 + 2) + 16 = 20

You can also cause a callback to be applied if task raises an exception (errback), but this behaves differently from a regular callback in that it will be passed the id of the parent task, not the result. This is because it may not always be possible to serialize the exception raised, and so this way the error callback requires a result backend to be enabled, and the task must retrieve the result of the task instead.

This is an example error callback:

@app.task(bind=True)
def error_handler(self, uuid):
    result = self.app.AsyncResult(uuid)
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          uuid, result.result, result.traceback))

it can be added to the task using the link_error execution option:

add.apply_async((2, 2), link_error=error_handler.s())

In addition, both the link and link_error options can be expressed as a list:

add.apply_async((2, 2), link=[add.s(16), other_task.s()])

The callbacks/errbacks will then be called in order, and all callbacks will be called with the return value of the parent task as a partial argument.

ETA and countdown

The ETA (estimated time of arrival) lets you set a specific date and time that is the earliest time at which your task will be executed. countdown is a shortcut to set eta by seconds into the future.

>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()    # this takes at least 3 seconds to return
20

The task is guaranteed to be executed at some time after the specified date and time, but not necessarily at that exact time. Possible reasons for broken deadlines may include many items waiting in the queue, or heavy network latency. To make sure your tasks are executed in a timely manner you should monitor the queue for congestion. Use Munin, or similar tools, to receive alerts, so appropriate action can be taken to ease the workload. See Munin.

While countdown is an integer, eta must be a datetime object, specifying an exact date and time (including millisecond precision, and timezone information):

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
Expiration

The expires argument defines an optional expiry time, either as seconds after task publish, or a specific date and time using datetime:

>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)

When a worker receives an expired task it will mark the task as REVOKED (TaskRevokedError).

Message Sending Retry

Celery will automatically retry sending messages in the event of connection failure, and retry behavior can be configured – like how often to retry, or a maximum number of retries – or disabled all together.

To disable retry you can set the retry execution option to False:

add.apply_async((2, 2), retry=False)
Retry Policy

A retry policy is a mapping that controls how retries behave, and can contain the following keys:

  • max_retries

    Maximum number of retries before giving up, in this case the exception that caused the retry to fail will be raised.

    A value of 0 or None means it will retry forever.

    The default is to retry 3 times.

  • interval_start

    Defines the number of seconds (float or integer) to wait between retries. Default is 0, which means the first retry will be instantaneous.

  • interval_step

    On each consecutive retry this number will be added to the retry delay (float or integer). Default is 0.2.

  • interval_max

    Maximum number of seconds (float or integer) to wait between retries. Default is 0.2.

For example, the default policy correlates to:

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

the maximum time spent retrying will be 0.4 seconds. It is set relatively short by default because a connection failure could lead to a retry pile effect if the broker connection is down: e.g. many web server processes waiting to retry blocking other incoming requests.

Serializers

Data transferred between clients and workers needs to be serialized, so every message in Celery has a content_type header that describes the serialization method used to encode it.

The default serializer is pickle, but you can change this using the CELERY_TASK_SERIALIZER setting, or for each individual task, or even per message.

There’s built-in support for pickle, JSON, YAML and msgpack, and you can also add your own custom serializers by registering them into the Kombu serializer registry (see ref:kombu:guide-serialization).

Each option has its advantages and disadvantages.

json – JSON is supported in many programming languages, is now

a standard part of Python (since 2.6), and is fairly fast to decode using the modern Python libraries such as cjson or simplejson.

The primary disadvantage to JSON is that it limits you to the following data types: strings, Unicode, floats, boolean, dictionaries, and lists. Decimals and dates are notably missing.

Also, binary data will be transferred using Base64 encoding, which will cause the transferred data to be around 34% larger than an encoding which supports native binary types.

However, if your data fits inside the above constraints and you need cross-language support, the default setting of JSON is probably your best choice.

See http://json.org for more information.

pickle – If you have no desire to support any language other than

Python, then using the pickle encoding will gain you the support of all built-in Python data types (except class instances), smaller messages when sending binary files, and a slight speedup over JSON processing.

See http://docs.python.org/library/pickle.html for more information.

yaml – YAML has many of the same characteristics as json,

except that it natively supports more data types (including dates, recursive references, etc.)

However, the Python libraries for YAML are a good bit slower than the libraries for JSON.

If you need a more expressive set of data types and need to maintain cross-language compatibility, then YAML may be a better fit than the above.

See http://yaml.org/ for more information.

msgpack – msgpack is a binary serialization format that is closer to JSON

in features. It is very young however, and support should be considered experimental at this point.

See http://msgpack.org/ for more information.

The encoding used is available as a message header, so the worker knows how to deserialize any task. If you use a custom serializer, this serializer must be available for the worker.

The following order is used to decide which serializer to use when sending a task:

  1. The serializer execution option.
  2. The Task.serializer attribute
  3. The CELERY_TASK_SERIALIZER setting.

Example setting a custom serializer for a single task invocation:

>>> add.apply_async((10, 10), serializer='json')
Compression

Celery can compress the messages using either gzip, or bzip2. You can also create your own compression schemes and register them in the kombu compression registry.

The following order is used to decide which compression scheme to use when sending a task:

  1. The compression execution option.
  2. The Task.compression attribute.
  3. The CELERY_MESSAGE_COMPRESSION attribute.

Example specifying the compression used when calling a task:

>>> add.apply_async((2, 2), compression='zlib')
Connections

You can handle the connection manually by creating a publisher:

results = []
with add.app.pool.acquire(block=True) as connection:
    with add.get_publisher(connection) as publisher:
        try:
            for args in numbers:
                res = add.apply_async((2, 2), publisher=publisher)
                results.append(res)
print([res.get() for res in results])

Though this particular example is much better expressed as a group:

>>> from celery import group

>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i) for i in numbers).apply_async()

>>> res.get()
[4, 8, 16, 32]
Routing options

Celery can route tasks to different queues.

Simple routing (name <-> name) is accomplished using the queue option:

add.apply_async(queue='priority.high')

You can then assign workers to the priority.high queue by using the workers -Q argument:

$ celery -A proj worker -l info -Q celery,priority.high

See also

Hard-coding queue names in code is not recommended, the best practice is to use configuration routers (CELERY_ROUTES).

To find out more about routing, please see Routing Tasks.

Advanced Options

These options are for advanced users who want to take use of AMQP’s full routing capabilities. Interested parties may read the routing guide.

  • exchange

    Name of exchange (or a kombu.entity.Exchange) to send the message to.

  • routing_key

    Routing key used to determine.

  • priority

    A number between 0 and 9, where 0 is the highest priority.

    Supported by: redis, beanstalk

Canvas: Designing Workflows

Signatures

New in version 2.0.

You just learned how to call a task using the tasks delay method in the calling guide, and this is often all you need, but sometimes you may want to pass the signature of a task invocation to another process or as an argument to another function.

A signature() wraps the arguments, keyword arguments, and execution options of a single task invocation in a way such that it can be passed to functions or even serialized and sent across the wire.

Signatures are often nicknamed “subtasks” because they describe a task to be called within a task.

  • You can create a signature for the add task using its name like this:

    >>> from celery import signature
    >>> signature('tasks.add', args=(2, 2), countdown=10)
    tasks.add(2, 2)
    

    This task has a signature of arity 2 (two arguments): (2, 2), and sets the countdown execution option to 10.

  • or you can create one using the task’s subtask method:

    >>> add.subtask((2, 2), countdown=10)
    tasks.add(2, 2)
    
  • There is also a shortcut using star arguments:

    >>> add.s(2, 2)
    tasks.add(2, 2)
    
  • Keyword arguments are also supported:

    >>> add.s(2, 2, debug=True)
    tasks.add(2, 2, debug=True)
    
  • From any signature instance you can inspect the different fields:

    >>> s = add.subtask((2, 2), {'debug': True}, countdown=10)
    >>> s.args
    (2, 2)
    >>> s.kwargs
    {'debug': True}
    >>> s.options
    {'countdown': 10}
    
  • It supports the “Calling API” which means it supports delay and apply_async or being called directly.

    Calling the signature will execute the task inline in the current process:

    >>> add(2, 2)
    4
    >>> add.s(2, 2)()
    4
    

    delay is our beloved shortcut to apply_async taking star-arguments:

    >>> result = add.delay(2, 2)
    >>> result.get()
    4
    

    apply_async takes the same arguments as the app.Task.apply_async() method:

    >>> add.apply_async(args, kwargs, **options)
    >>> add.subtask(args, kwargs, **options).apply_async()
    
    >>> add.apply_async((2, 2), countdown=1)
    >>> add.subtask((2, 2), countdown=1).apply_async()
    
  • You can’t define options with s(), but a chaining set call takes care of that:

    >>> add.s(2, 2).set(countdown=1)
    proj.tasks.add(2, 2)
    
Partials

With a signature, you can execute the task in a worker:

>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async(countdown=1)

Or you can call it directly in the current process:

>>> add.s(2, 2)()
4

Specifying additional args, kwargs or options to apply_async/delay creates partials:

  • Any arguments added will be prepended to the args in the signature:

    >>> partial = add.s(2)          # incomplete signature
    >>> partial.delay(4)            # 2 + 4
    >>> partial.apply_async((4,))  # same
    
  • Any keyword arguments added will be merged with the kwargs in the signature, with the new keyword arguments taking precedence:

    >>> s = add.s(2, 2)
    >>> s.delay(debug=True)                    # -> add(2, 2, debug=True)
    >>> s.apply_async(kwargs={'debug': True})  # same
    
  • Any options added will be merged with the options in the signature, with the new options taking precedence:

    >>> s = add.subtask((2, 2), countdown=10)
    >>> s.apply_async(countdown=1)  # countdown is now 1
    

You can also clone signatures to create derivates:

>>> s = add.s(2)
proj.tasks.add(2)
>>> s.clone(args=(4,), kwargs={'debug': True})
proj.tasks.add(4, 2, debug=True)
Immutability

New in version 3.0.

Partials are meant to be used with callbacks, any tasks linked or chord callbacks will be applied with the result of the parent task. Sometimes you want to specify a callback that does not take additional arguments, and in that case you can set the signature to be immutable:

>>> add.apply_async((2, 2), link=reset_buffers.subtask(immutable=True))

The .si() shortcut can also be used to create immutable signatures:

>>> add.apply_async((2, 2), link=reset_buffers.si())

Only the execution options can be set when a signature is immutable, so it’s not possible to call the signature with partial args/kwargs.

Note

In this tutorial I sometimes use the prefix operator ~ to signatures. You probably shouldn’t use it in your production code, but it’s a handy shortcut when experimenting in the Python shell:

>>> ~sig

>>> # is the same as
>>> sig.delay().get()
Callbacks

New in version 3.0.

Callbacks can be added to any task using the link argument to apply_async:

add.apply_async((2, 2), link=other_task.s())

The callback will only be applied if the task exited successfully, and it will be applied with the return value of the parent task as argument.

As I mentioned earlier, any arguments you add to a signature, will be prepended to the arguments specified by the signature itself!

If you have the signature:

>>> sig = add.s(10)

then sig.delay(result) becomes:

>>> add.apply_async(args=(result, 10))

...

Now let’s call our add task with a callback using partial arguments:

>>> add.apply_async((2, 2), link=add.s(8))

As expected this will first launch one task calculating 2 + 2, then another task calculating 4 + 8.

The Primitives

New in version 3.0.

Overview

  • group

    The group primitive is a signature that takes a list of tasks that should be applied in parallel.

  • chain

    The chain primitive lets us link together signatures so that one is called after the other, essentially forming a chain of callbacks.

  • chord

    A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.

  • map

    The map primitive works like the built-in map function, but creates a temporary task where a list of arguments is applied to the task. E.g. task.map([1, 2]) results in a single task being called, applying the arguments in order to the task function so that the result is:

    res = [task(1), task(2)]
    
  • starmap

    Works exactly like map except the arguments are applied as *args. For example add.starmap([(2, 2), (4, 4)]) results in a single task calling:

    res = [add(2, 2), add(4, 4)]
    
  • chunks

    Chunking splits a long list of arguments into parts, e.g the operation:

    >>> items = zip(xrange(1000), xrange(1000))  # 1000 items
    >>> add.chunks(items, 10)
    

    will split the list of items into chunks of 10, resulting in 100 tasks (each processing 10 items in sequence).

The primitives are also signature objects themselves, so that they can be combined in any number of ways to compose complex workflows.

Here’s some examples:

  • Simple chain

    Here’s a simple chain, the first task executes passing its return value to the next task in the chain, and so on.

    >>> from celery import chain
    
    # 2 + 2 + 4 + 8
    >>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
    >>> res.get()
    16
    

    This can also be written using pipes:

    >>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
    16
    
  • Immutable signatures

    Signatures can be partial so arguments can be added to the existing arguments, but you may not always want that, for example if you don’t want the result of the previous task in a chain.

    In that case you can mark the signature as immutable, so that the arguments cannot be changed:

    >>> add.subtask((2, 2), immutable=True)
    

    There’s also an .si shortcut for this:

    >>> add.si(2, 2)
    

    Now you can create a chain of independent tasks instead:

    >>> res = (add.si(2, 2) | add.si(4, 4) | add.s(8, 8))()
    >>> res.get()
    16
    
    >>> res.parent.get()
    8
    
    >>> res.parent.parent.get()
    4
    
  • Simple group

    You can easily create a group of tasks to execute in parallel:

    >>> from celery import group
    >>> res = group(add.s(i, i) for i in xrange(10))()
    >>> res.get(timeout=1)
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    
  • Simple chord

    The chord primitive enables us to add callback to be called when all of the tasks in a group have finished executing, which is often required for algorithms that aren’t embarrassingly parallel:

    >>> from celery import chord
    >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
    >>> res.get()
    90
    

    The above example creates 10 task that all start in parallel, and when all of them are complete the return values are combined into a list and sent to the xsum task.

    The body of a chord can also be immutable, so that the return value of the group is not passed on to the callback:

    >>> chord((import_contact.s(c) for c in contacts),
    ...       notify_complete.si(import_id)).apply_async()
    

    Note the use of .si above which creates an immutable signature.

  • Blow your mind by combining

    Chains can be partial too:

    >>> c1 = (add.s(4) | mul.s(8))
    
    # (16 + 4) * 8
    >>> res = c1(16)
    >>> res.get()
    160
    

    Which means that you can combine chains:

    # ((4 + 16) * 2 + 4) * 8
    >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
    
    >>> res = c2()
    >>> res.get()
    352
    

    Chaining a group together with another task will automatically upgrade it to be a chord:

    >>> c3 = (group(add.s(i, i) for i in xrange(10)) | xsum.s())
    >>> res = c3()
    >>> res.get()
    90
    

    Groups and chords accepts partial arguments too, so in a chain the return value of the previous task is forwarded to all tasks in the group:

    >>> new_user_workflow = (create_user.s() | group(
    ...                      import_contacts.s(),
    ...                      send_welcome_email.s()))
    ... new_user_workflow.delay(username='artv',
    ...                         first='Art',
    ...                         last='Vandelay',
    ...                         email='art@vandelay.com')
    

    If you don’t want to forward arguments to the group then you can make the signatures in the group immutable:

    >>> res = (add.s(4, 4) | group(add.si(i, i) for i in xrange(10)))()
    >>> res.get()
    <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [
        bc01831b-9486-4e51-b046-480d7c9b78de,
        2650a1b8-32bf-4771-a645-b0a35dcc791b,
        dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf,
        59f92e0a-23ea-41ce-9fad-8645a0e7759c,
        26e1e707-eccf-4bf4-bbd8-1e1729c3cce3,
        2d10a5f4-37f0-41b2-96ac-a973b1df024d,
        e13d3bdb-7ae3-4101-81a4-6f17ee21df2d,
        104b2be0-7b75-44eb-ac8e-f9220bdfa140,
        c5c551a5-0386-4973-aa37-b65cbeb2624b,
        83f72d71-4b71-428e-b604-6f16599a9f37]>
    
    >>> res.parent.get()
    8
    
Chains

New in version 3.0.

Tasks can be linked together, which in practice means adding a callback task:

>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
4

The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in mul(4, 16) since the result is 4.

You can also add error callbacks using the link_error argument:

>>> add.apply_async((2, 2), link_error=log_error.s())

>>> add.subtask((2, 2), link_error=log_error.s())

Since exceptions can only be serialized when pickle is used the error callbacks take the id of the parent task as argument instead:

from __future__ import print_function
import os
from proj.celery import app

@app.task
def log_error(task_id):
    result = app.AsyncResult(task_id)
    result.get(propagate=False)  # make sure result written.
    with open(os.path.join('/var/errors', task_id), 'a') as fh:
        print('--\n\n{0} {1} {2}'.format(
            task_id, result.result, result.traceback), file=fh)

To make it even easier to link tasks together there is a special signature called chain that lets you chain tasks together:

>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)

Calling the chain will call the tasks in the current process and return the result of the last task in the chain:

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.get()
640

It also sets parent attributes so that you can work your way up the chain to get intermediate results:

>>> res.parent.get()
64

>>> res.parent.parent.get()
8

>>> res.parent.parent
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>

Chains can also be made using the | (pipe) operator:

>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()

Note

It’s not possible to synchronize on groups, so a group chained to another signature is automatically upgraded to a chord:

# will actually be a chord when finally evaluated
res = (group(add.s(i, i) for i in range(10)) | xsum.s()).delay()
Trails

Tasks will keep track of what subtasks a task calls in the result backend (unless disabled using Task.trail) and this can be accessed from the result instance:

>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]

>>> res.children[0].get()
64

The result instance also has a collect() method that treats the result as a graph, enabling you to iterate over the results:

>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
 (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]

By default collect() will raise an IncompleteStream exception if the graph is not fully formed (one of the tasks has not completed yet), but you can get an intermediate representation of the graph too:

>>> for result, value in res.collect(intermediate=True)):
....
Graphs

In addition you can work with the result graph as a DependencyGraph:

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()

>>> res.parent.parent.graph
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
    463afec2-5ed4-4036-b22d-ba067ec64f52(0)
872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
    285fa253-fcf8-42ef-8b95-0078897e83e6(1)
        463afec2-5ed4-4036-b22d-ba067ec64f52(0)

You can even convert these graphs to dot format:

>>> with open('graph.dot', 'w') as fh:
...     res.parent.parent.graph.to_dot(fh)

and create images:

$ dot -Tpng graph.dot -o graph.png
_images/result_graph.png
Groups

New in version 3.0.

A group can be used to execute several tasks in parallel.

The group function takes a list of signatures:

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))

If you call the group, the tasks will be applied one after one in the current process, and a GroupResult instance is returned which can be used to keep track of the results, or tell how many tasks are ready and so on:

>>> g = group(add.s(2, 2), add.s(4, 4))
>>> res = g()
>>> res.get()
[4, 8]

Group also supports iterators:

>>> group(add.s(i, i) for i in xrange(100))()

A group is a signature object, so it can be used in combination with other signatures.

Group Results

The group task returns a special result too, this result works just like normal task results, except that it works on the group as a whole:

>>> from celery import group
>>> from tasks import add

>>> job = group([
...             add.s(2, 2),
...             add.s(4, 4),
...             add.s(8, 8),
...             add.s(16, 16),
...             add.s(32, 32),
... ])

>>> result = job.apply_async()

>>> result.ready()  # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]

The GroupResult takes a list of AsyncResult instances and operates on them as if it was a single task.

It supports the following operations:

  • successful()

    Return True if all of the subtasks finished successfully (e.g. did not raise an exception).

  • failed()

    Return True if any of the subtasks failed.

  • waiting()

    Return True if any of the subtasks is not ready yet.

  • ready()

    Return True if all of the subtasks are ready.

  • completed_count()

    Return the number of completed subtasks.

  • revoke()

    Revoke all of the subtasks.

  • join()

    Gather the results for all of the subtasks and return a list with them ordered by the order of which they were called.

Chords

New in version 2.3.

Note

Tasks used within a chord must not ignore their results. If the result backend is disabled for any task (header or body) in your chord you should read “Important Notes”.

A chord is a task that only executes after all of the tasks in a group have finished executing.

Let’s calculate the sum of the expression 1 + 1 + 2 + 2 + 3 + 3 ... n + n up to a hundred digits.

First you need two tasks, add() and tsum() (sum() is already a standard function):

@app.task
def add(x, y):
    return x + y

@app.task
def tsum(numbers):
    return sum(numbers)

Now you can use a chord to calculate each addition step in parallel, and then get the sum of the resulting numbers:

>>> from celery import chord
>>> from tasks import add, tsum

>>> chord(add.s(i, i)
...       for i in xrange(100))(tsum.s()).get()
9900

This is obviously a very contrived example, the overhead of messaging and synchronization makes this a lot slower than its Python counterpart:

sum(i + i for i in xrange(100))

The synchronization step is costly, so you should avoid using chords as much as possible. Still, the chord is a powerful primitive to have in your toolbox as synchronization is a required step for many parallel algorithms.

Let’s break the chord expression down:

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900

Remember, the callback can only be executed after all of the tasks in the header have returned. Each step in the header is executed as a task, in parallel, possibly on different nodes. The callback is then applied with the return value of each task in the header. The task id returned by chord() is the id of the callback, so you can wait for it to complete and get the final return value (but remember to never have a task wait for other tasks)

Error handling

So what happens if one of the tasks raises an exception?

This was not documented for some time and before version 3.1 the exception value will be forwarded to the chord callback.

From 3.1 errors will propagate to the callback, so the callback will not be executed instead the callback changes to failure state, and the error is set to the ChordError exception:

>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
>>> result = c()
>>> result.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "*/celery/result.py", line 120, in get
    interval=interval)
  File "*/celery/backends/amqp.py", line 150, in wait_for
    raise meta['result']
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
    raised ValueError('something something',)

If you’re running 3.0.14 or later you can enable the new behavior via the CELERY_CHORD_PROPAGATES setting:

CELERY_CHORD_PROPAGATES = True

While the traceback may be different depending on which result backend is being used, you can see the error description includes the id of the task that failed and a string representation of the original exception. You can also find the original traceback in result.traceback.

Note that the rest of the tasks will still execute, so the third task (add.s(8, 8)) is still executed even though the middle task failed. Also the ChordError only shows the task that failed first (in time): it does not respect the ordering of the header group.

Important Notes

Tasks used within a chord must not ignore their results. In practice this means that you must enable a CELERY_RESULT_BACKEND in order to use chords. Additionally, if CELERY_IGNORE_RESULT is set to True in your configuration, be sure that the individual tasks to be used within the chord are defined with ignore_result=False. This applies to both Task subclasses and decorated tasks.

Example Task subclass:

class MyTask(Task):
    abstract = True
    ignore_result = False

Example decorated task:

@app.task(ignore_result=False)
def another_task(project):
    do_something()

By default the synchronization step is implemented by having a recurring task poll the completion of the group every second, calling the signature when ready.

Example implementation:

from celery import maybe_signature

@app.task(bind=True)
def unlock_chord(self, group, callback, interval=1, max_retries=None):
    if group.ready():
        return maybe_signature(callback).delay(group.join())
    raise self.retry(countdown=interval, max_retries=max_retries)

This is used by all result backends except Redis and Memcached, which increment a counter after each task in the header, then applying the callback when the counter exceeds the number of tasks in the set. Note: chords do not properly work with Redis before version 2.2; you will need to upgrade to at least 2.2 to use them.

The Redis and Memcached approach is a much better solution, but not easily implemented in other backends (suggestions welcome!).

Note

If you are using chords with the Redis result backend and also overriding the Task.after_return() method, you need to make sure to call the super method or else the chord callback will not be applied.

def after_return(self, *args, **kwargs):
    do_something()
    super(MyTask, self).after_return(*args, **kwargs)
Map & Starmap

map and starmap are built-in tasks that calls the task for every element in a sequence.

They differ from group in that

  • only one task message is sent
  • the operation is sequential.

For example using map:

>>> from proj.tasks import add

>>> ~xsum.map([range(10), range(100)])
[45, 4950]

is the same as having a task doing:

@app.task
def temp():
    return [xsum(range(10)), xsum(range(100))]

and using starmap:

>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

is the same as having a task doing:

@app.task
def temp():
    return [add(i, i) for i in range(10)]

Both map and starmap are signature objects, so they can be used as other signatures and combined in groups etc., for example to call the starmap after 10 seconds:

>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
Chunks

Chunking lets you divide an iterable of work into pieces, so that if you have one million objects, you can create 10 tasks with hundred thousand objects each.

Some may worry that chunking your tasks results in a degradation of parallelism, but this is rarely true for a busy cluster and in practice since you are avoiding the overhead of messaging it may considerably increase performance.

To create a chunks signature you can use app.Task.chunks():

>>> add.chunks(zip(range(100), range(100)), 10)

As with group the act of sending the messages for the chunks will happen in the current process when called:

>>> from proj.tasks import add

>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

while calling .apply_async will create a dedicated task so that the individual tasks are applied in a worker instead:

>>> add.chunks(zip(range(100), range(100)), 10).apply_async()

You can also convert chunks to a group:

>>> group = add.chunks(zip(range(100), range(100)), 10).group()

and with the group skew the countdown of each task by increments of one:

>>> group.skew(start=1, stop=10)()

which means that the first task will have a countdown of 1, the second a countdown of 2 and so on.

Workers Guide

Starting the worker

You can start the worker in the foreground by executing the command:

$ celery -A proj worker -l info

For a full list of available command-line options see worker, or simply do:

$ celery worker --help

You can also start multiple workers on the same machine. If you do so be sure to give a unique name to each individual worker by specifying a host name with the --hostname|-n argument:

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3.%h

The hostname argument can expand the following variables:

  • %h: Hostname including domain name.
  • %n: Hostname only.
  • %d: Domain name only.

E.g. if the current hostname is george.example.com then these will expand to:

  • worker1.%h -> worker1.george.example.com
  • worker1.%n -> worker1.george
  • worker1.%d -> worker1.example.com

Note for supervisord users.

The % sign must be escaped by adding a second one: %%h.

Stopping the worker

Shutdown should be accomplished using the TERM signal.

When shutdown is initiated the worker will finish all currently executing tasks before it actually terminates, so if these tasks are important you should wait for it to finish before doing anything drastic (like sending the KILL signal).

If the worker won’t shutdown after considerate time, for example because of tasks stuck in an infinite-loop, you can use the KILL signal to force terminate the worker, but be aware that currently executing tasks will be lost (unless the tasks have the acks_late option set).

Also as processes can’t override the KILL signal, the worker will not be able to reap its children, so make sure to do so manually. This command usually does the trick:

$ ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
Restarting the worker

To restart the worker you should send the TERM signal and start a new instance. The easiest way to manage workers for development is by using celery multi:

$ celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid

For production deployments you should be using init scripts or other process supervision systems (see Running the worker as a daemon).

Other than stopping then starting the worker to restart, you can also restart the worker using the HUP signal, but note that the worker will be responsible for restarting itself so this is prone to problems and is not recommended in production:

$ kill -HUP $pid

Note

Restarting by HUP only works if the worker is running in the background as a daemon (it does not have a controlling terminal).

HUP is disabled on OS X because of a limitation on that platform.

Process Signals

The worker’s main process overrides the following signals:

TERM Warm shutdown, wait for tasks to complete.
QUIT Cold shutdown, terminate ASAP
USR1 Dump traceback for all active threads.
USR2 Remote debug, see celery.contrib.rdb.
Variables in file paths

The file path arguments for --logfile, --pidfile and --statedb can contain variables that the worker will expand:

Node name replacements
  • %h: Hostname including domain name.
  • %n: Hostname only.
  • %d: Domain name only.
  • %i: Prefork pool process index or 0 if MainProcess.
  • %I: Prefork pool process index with separator.

E.g. if the current hostname is george.example.com then these will expand to:

  • --logfile=%h.log -> george.example.com.log
  • --logfile=%n.log -> george.log
  • --logfile=%d -> example.com.log
Prefork pool process index

The prefork pool process index specifiers will expand into a different filename depending on the process that will eventually need to open the file.

This can be used to specify one log file per child process.

Note that the numbers will stay within the process limit even if processes exit or if autoscale/maxtasksperchild/time limits are used. I.e. the number is the process index not the process count or pid.

  • %i - Pool process index or 0 if MainProcess.

    Where -n worker1@example.com -c2 -f %n-%i.log will result in three log files:

    • worker1-0.log (main process)
    • worker1-1.log (pool process 1)
    • worker1-2.log (pool process 2)
  • %I - Pool process index with separator.

    Where -n worker1@example.com -c2 -f %n%I.log will result in three log files:

    • worker1.log (main process)
    • worker1-1.log (pool process 1)
    • worker1-2.log (pool process 2)
Concurrency

By default multiprocessing is used to perform concurrent execution of tasks, but you can also use Eventlet. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of CPUs available on the machine.

Number of processes (multiprocessing/prefork pool)

More pool processes are usually better, but there’s a cut-off point where adding more pool processes affects performance in negative ways. There is even some evidence to support that having multiple worker instances running, may perform better than having a single worker. For example 3 workers with 10 pool processes each. You need to experiment to find the numbers that works best for you, as this varies based on application, work load, task run times and other factors.

Remote control

New in version 2.0.

pool support: prefork, eventlet, gevent, blocking:threads/solo (see note) broker support: amqp, redis

Workers have the ability to be remote controlled using a high-priority broadcast message queue. The commands can be directed to all, or a specific list of workers.

Commands can also have replies. The client can then wait for and collect those replies. Since there’s no central authority to know how many workers are available in the cluster, there is also no way to estimate how many workers may send a reply, so the client has a configurable timeout — the deadline in seconds for replies to arrive in. This timeout defaults to one second. If the worker doesn’t reply within the deadline it doesn’t necessarily mean the worker didn’t reply, or worse is dead, but may simply be caused by network latency or the worker being slow at processing commands, so adjust the timeout accordingly.

In addition to timeouts, the client can specify the maximum number of replies to wait for. If a destination is specified, this limit is set to the number of destination hosts.

Note

The solo and threads pool supports remote control commands, but any task executing will block any waiting control command, so it is of limited use if the worker is very busy. In that case you must increase the timeout waiting for replies in the client.

The broadcast() function.

This is the client function used to send commands to the workers. Some remote control commands also have higher-level interfaces using broadcast() in the background, like rate_limit() and ping().

Sending the rate_limit command and keyword arguments:

>>> app.control.broadcast('rate_limit',
...                          arguments={'task_name': 'myapp.mytask',
...                                     'rate_limit': '200/m'})

This will send the command asynchronously, without waiting for a reply. To request a reply you have to use the reply argument:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)
[{'worker1.example.com': 'New rate limit set successfully'},
 {'worker2.example.com': 'New rate limit set successfully'},
 {'worker3.example.com': 'New rate limit set successfully'}]

Using the destination argument you can specify a list of workers to receive the command:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask',
...     'rate_limit': '200/m'}, reply=True,
...                             destination=['worker1@example.com'])
[{'worker1.example.com': 'New rate limit set successfully'}]

Of course, using the higher-level interface to set rate limits is much more convenient, but there are commands that can only be requested using broadcast().

Commands
revoke: Revoking tasks
pool support:all
broker support:amqp, redis
command:celery -A proj control revoke <task_id>

All worker nodes keeps a memory of revoked task ids, either in-memory or persistent on disk (see Persistent revokes).

When a worker receives a revoke request it will skip executing the task, but it won’t terminate an already executing task unless the terminate option is set.

Note

The terminate option is a last resort for administrators when a task is stuck. It’s not for terminating the task, it’s for terminating the process that is executing the task, and that process may have already started processing another task at the point when the signal is sent, so for this reason you must never call this programatically.

If terminate is set the worker child process processing the task will be terminated. The default signal sent is TERM, but you can specify this using the signal argument. Signal can be the uppercase name of any signal defined in the signal module in the Python Standard Library.

Terminating a task also revokes it.

Example

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True)

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True, signal='SIGKILL')
Revoking multiple tasks

New in version 3.1.

The revoke method also accepts a list argument, where it will revoke several tasks at once.

Example

>>> app.control.revoke([
...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
...    'f565793e-b041-4b2b-9ca4-dca22762a55d',
...    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

The GroupResult.revoke method takes advantage of this since version 3.1.

Persistent revokes

Revoking tasks works by sending a broadcast message to all the workers, the workers then keep a list of revoked tasks in memory. When a worker starts up it will synchronize revoked tasks with other workers in the cluster.

The list of revoked tasks is in-memory so if all workers restart the list of revoked ids will also vanish. If you want to preserve this list between restarts you need to specify a file for these to be stored in by using the –statedb argument to celery worker:

celery -A proj worker -l info --statedb=/var/run/celery/worker.state

or if you use celery multi you will want to create one file per worker instance so then you can use the %n format to expand the current node name:

celery multi start 2 -l info --statedb=/var/run/celery/%n.state

See also Variables in file paths

Note that remote control commands must be working for revokes to work. Remote control commands are only supported by the RabbitMQ (amqp) and Redis at this point.

Time Limits

New in version 2.0.

pool support: prefork/gevent

A single task can potentially run forever, if you have lots of tasks waiting for some event that will never happen you will block the worker from processing new tasks indefinitely. The best way to defend against this scenario happening is enabling time limits.

The time limit (–time-limit) is the maximum number of seconds a task may run before the process executing it is terminated and replaced by a new process. You can also enable a soft time limit (–soft-time-limit), this raises an exception the task can catch to clean up before the hard time limit kills it:

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

Time limits can also be set using the CELERYD_TASK_TIME_LIMIT / CELERYD_TASK_SOFT_TIME_LIMIT settings.

Note

Time limits do not currently work on Windows and other platforms that do not support the SIGUSR1 signal.

Changing time limits at runtime

New in version 2.3.

broker support: amqp, redis

There is a remote control command that enables you to change both soft and hard time limits for a task — named time_limit.

Example changing the time limit for the tasks.crawl_the_web task to have a soft time limit of one minute, and a hard time limit of two minutes:

>>> app.control.time_limit('tasks.crawl_the_web',
                           soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

Only tasks that starts executing after the time limit change will be affected.

Rate Limits
Changing rate-limits at runtime

Example changing the rate limit for the myapp.mytask task to execute at most 200 tasks of that type every minute:

>>> app.control.rate_limit('myapp.mytask', '200/m')

The above does not specify a destination, so the change request will affect all worker instances in the cluster. If you only want to affect a specific list of workers you can include the destination argument:

>>> app.control.rate_limit('myapp.mytask', '200/m',
...            destination=['celery@worker1.example.com'])

Warning

This won’t affect workers with the CELERY_DISABLE_RATE_LIMITS setting enabled.

Max tasks per child setting

New in version 2.0.

pool support: prefork

With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.

This is useful if you have memory leaks you have no control over for example from closed source C extensions.

The option can be set using the workers –maxtasksperchild argument or using the CELERYD_MAX_TASKS_PER_CHILD setting.

Autoscaling

New in version 2.2.

pool support: prefork, gevent

The autoscaler component is used to dynamically resize the pool based on load:

  • The autoscaler adds more pool processes when there is work to do,
    • and starts removing processes when the workload is low.

It’s enabled by the --autoscale option, which needs two numbers: the maximum and minimum number of pool processes:

--autoscale=AUTOSCALE
     Enable autoscaling by providing
     max_concurrency,min_concurrency.  Example:
       --autoscale=10,3 (always keep 3 processes, but grow to
      10 if necessary).

You can also define your own rules for the autoscaler by subclassing Autoscaler. Some ideas for metrics include load average or the amount of memory available. You can specify a custom autoscaler with the CELERYD_AUTOSCALER setting.

Queues

A worker instance can consume from any number of queues. By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).

You can specify what queues to consume from at startup, by giving a comma separated list of queues to the -Q option:

$ celery -A proj worker -l info -Q foo,bar,baz

If the queue name is defined in CELERY_QUEUES it will use that configuration, but if it’s not defined in the list of queues Celery will automatically generate a new queue for you (depending on the CELERY_CREATE_MISSING_QUEUES option).

You can also tell the worker to start and stop consuming from a queue at runtime using the remote control commands add_consumer and cancel_consumer.

Queues: Adding consumers

The add_consumer control command will tell one or more workers to start consuming from a queue. This operation is idempotent.

To tell all workers in the cluster to start consuming from a queue named “foo” you can use the celery control program:

$ celery -A proj control add_consumer foo
-> worker1.local: OK
    started consuming from u'foo'

If you want to specify a specific worker you can use the --destination` argument:

$ celery -A proj control add_consumer foo -d worker1.local

The same can be accomplished dynamically using the app.control.add_consumer() method:

>>> app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

>>> app.control.add_consumer('foo', reply=True,
...                          destination=['worker1@example.com'])
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

By now I have only shown examples using automatic queues, If you need more control you can also specify the exchange, routing_key and even other options:

>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['w1@example.com', 'w2@example.com'])
Queues: Cancelling consumers

You can cancel a consumer by queue name using the cancel_consumer control command.

To force all workers in the cluster to cancel consuming from a queue you can use the celery control program:

$ celery -A proj control cancel_consumer foo

The --destination argument can be used to specify a worker, or a list of workers, to act on the command:

$ celery -A proj control cancel_consumer foo -d worker1.local

You can also cancel consumers programmatically using the app.control.cancel_consumer() method:

>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]
Queues: List of active queues

You can get a list of queues that a worker consumes from by using the active_queues control command:

$ celery -A proj inspect active_queues
[...]

Like all other remote control commands this also supports the --destination argument used to specify which workers should reply to the request:

$ celery -A proj inspect active_queues -d worker1.local
[...]

This can also be done programmatically by using the app.control.inspect.active_queues() method:

>>> app.control.inspect().active_queues()
[...]

>>> app.control.inspect(['worker1.local']).active_queues()
[...]
Autoreloading

New in version 2.5.

pool support: prefork, eventlet, gevent, threads, solo

Starting celery worker with the --autoreload option will enable the worker to watch for file system changes to all imported task modules (and also any non-task modules added to the CELERY_IMPORTS setting or the -I|--include option).

This is an experimental feature intended for use in development only, using auto-reload in production is discouraged as the behavior of reloading a module in Python is undefined, and may cause hard to diagnose bugs and crashes. Celery uses the same approach as the auto-reloader found in e.g. the Django runserver command.

When auto-reload is enabled the worker starts an additional thread that watches for changes in the file system. New modules are imported, and already imported modules are reloaded whenever a change is detected, and if the prefork pool is used the child processes will finish the work they are doing and exit, so that they can be replaced by fresh processes effectively reloading the code.

File system notification backends are pluggable, and it comes with three implementations:

  • inotify (Linux)

    Used if the pyinotify library is installed. If you are running on Linux this is the recommended implementation, to install the pyinotify library you have to run the following command:

    $ pip install pyinotify
    
  • kqueue (OS X/BSD)

  • stat

    The fallback implementation simply polls the files using stat and is very expensive.

You can force an implementation by setting the CELERYD_FSNOTIFY environment variable:

$ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
Pool Restart Command

New in version 2.5.

Requires the CELERYD_POOL_RESTARTS setting to be enabled.

The remote control command pool_restart sends restart requests to the workers child processes. It is particularly useful for forcing the worker to import new modules, or for reloading already imported modules. This command does not interrupt executing tasks.

Example

Running the following command will result in the foo and bar modules being imported by the worker processes:

>>> app.control.broadcast('pool_restart',
...                       arguments={'modules': ['foo', 'bar']})

Use the reload argument to reload modules it has already imported:

>>> app.control.broadcast('pool_restart',
...                       arguments={'modules': ['foo'],
...                                  'reload': True})

If you don’t specify any modules then all known tasks modules will be imported/reloaded:

>>> app.control.broadcast('pool_restart', arguments={'reload': True})

The modules argument is a list of modules to modify. reload specifies whether to reload modules if they have previously been imported. By default reload is disabled. The pool_restart command uses the Python reload() function to reload modules, or you can provide your own custom reloader by passing the reloader argument.

Note

Module reloading comes with caveats that are documented in reload(). Please read this documentation and make sure your modules are suitable for reloading.

Inspecting workers

app.control.inspect lets you inspect running workers. It uses remote control commands under the hood.

You can also use the celery command to inspect workers, and it supports the same commands as the app.control interface.

# Inspect all nodes.
>>> i = app.control.inspect()

# Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
                            'worker2.example.com'])

# Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')
Dump of registered tasks

You can get a list of tasks registered in the worker using the registered():

>>> i.registered()
[{'worker1.example.com': ['tasks.add',
                          'tasks.sleeptask']}]
Dump of currently executing tasks

You can get a list of active tasks using active():

>>> i.active()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]
Dump of scheduled (ETA) tasks

You can get a list of tasks waiting to be scheduled by using scheduled():

>>> i.scheduled()
[{'worker1.example.com':
    [{'eta': '2010-06-07 09:07:52', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d',
        'args': '[1]',
        'kwargs': '{}'}},
     {'eta': '2010-06-07 09:07:53', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '49661b9a-aa22-4120-94b7-9ee8031d219d',
        'args': '[2]',
        'kwargs': '{}'}}]}]

Note

These are tasks with an eta/countdown argument, not periodic tasks.

Dump of reserved tasks

Reserved tasks are tasks that have been received, but are still waiting to be executed.

You can get a list of these using reserved():

>>> i.reserved()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]
Statistics

The remote control command inspect stats (or stats()) will give you a long list of useful (or not so useful) statistics about the worker:

$ celery -A proj inspect stats

The output will include the following fields:

  • broker

    Section for broker information.

    • connect_timeout

      Timeout in seconds (int/float) for establishing a new connection.

    • heartbeat

      Current heartbeat value (set by client).

    • hostname

      Hostname of the remote broker.

    • insist

      No longer used.

    • login_method

      Login method used to connect to the broker.

    • port

      Port of the remote broker.

    • ssl

      SSL enabled/disabled.

    • transport

      Name of transport used (e.g. amqp or redis)

    • transport_options

      Options passed to transport.

    • uri_prefix

      Some transports expects the host name to be an URL, this applies to for example SQLAlchemy where the host name part is the connection URI:

      redis+socket:///tmp/redis.sock

      In this example the uri prefix will be redis.

    • userid

      User id used to connect to the broker with.

    • virtual_host

      Virtual host used.

  • clock

    Value of the workers logical clock. This is a positive integer and should be increasing every time you receive statistics.

  • pid

    Process id of the worker instance (Main process).

  • pool

    Pool-specific section.

    • max-concurrency

      Max number of processes/threads/green threads.

    • max-tasks-per-child

      Max number of tasks a thread may execute before being recycled.

    • processes

      List of pids (or thread-id’s).

    • put-guarded-by-semaphore

      Internal

    • timeouts

      Default values for time limits.

    • writes

      Specific to the prefork pool, this shows the distribution of writes to each process in the pool when using async I/O.

  • prefetch_count

    Current prefetch count value for the task consumer.

  • rusage

    System usage statistics. The fields available may be different on your platform.

    From getrusage(2):

    • stime

      Time spent in operating system code on behalf of this process.

    • utime

      Time spent executing user instructions.

    • maxrss

      The maximum resident size used by this process (in kilobytes).

    • idrss

      Amount of unshared memory used for data (in kilobytes times ticks of execution)

    • isrss

      Amount of unshared memory used for stack space (in kilobytes times ticks of execution)

    • ixrss

      Amount of memory shared with other processes (in kilobytes times ticks of execution).

    • inblock

      Number of times the file system had to read from the disk on behalf of this process.

    • oublock

      Number of times the file system has to write to disk on behalf of this process.

    • majflt

      Number of page faults which were serviced by doing I/O.

    • minflt

      Number of page faults which were serviced without doing I/O.

    • msgrcv

      Number of IPC messages received.

    • msgsnd

      Number of IPC messages sent.

    • nvcsw

      Number of times this process voluntarily invoked a context switch.

    • nivcsw

      Number of times an involuntary context switch took place.

    • nsignals

      Number of signals received.

    • nswap

      The number of times this process was swapped entirely out of memory.

  • total

    List of task names and a total number of times that task have been executed since worker start.

Additional Commands
Remote shutdown

This command will gracefully shut down the worker remotely:

>>> app.control.broadcast('shutdown') # shutdown all workers
>>> app.control.broadcast('shutdown, destination="worker1@example.com")
Ping

This command requests a ping from alive workers. The workers reply with the string ‘pong’, and that’s just about it. It will use the default one second timeout for replies unless you specify a custom timeout:

>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
 {'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

ping() also supports the destination argument, so you can specify which workers to ping:

>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]
Enable/disable events

You can enable/disable events by using the enable_events, disable_events commands. This is useful to temporarily monitor a worker using celery events/celerymon.

>>> app.control.enable_events()
>>> app.control.disable_events()
Writing your own remote control commands

Remote control commands are registered in the control panel and they take a single argument: the current ControlDispatch instance. From there you have access to the active Consumer if needed.

Here’s an example control command that increments the task prefetch count:

from celery.worker.control import Panel

@Panel.register
def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}

Periodic Tasks

Introduction

celery beat is a scheduler. It kicks off tasks at regular intervals, which are then executed by the worker nodes available in the cluster.

By default the entries are taken from the CELERYBEAT_SCHEDULE setting, but custom stores can also be used, like storing the entries in an SQL database.

You have to ensure only a single scheduler is running for a schedule at a time, otherwise you would end up with duplicate tasks. Using a centralized approach means the schedule does not have to be synchronized, and the service can operate without using locks.

Time Zones

The periodic task schedules uses the UTC time zone by default, but you can change the time zone used using the CELERY_TIMEZONE setting.

An example time zone could be Europe/London:

CELERY_TIMEZONE = 'Europe/London'

This setting must be added to your app, either by configuration it directly using (app.conf.CELERY_TIMEZONE = 'Europe/London'), or by adding it to your configuration module if you have set one up using app.config_from_object. See Configuration for more information about configuration options.

The default scheduler (storing the schedule in the celerybeat-schedule file) will automatically detect that the time zone has changed, and so will reset the schedule itself, but other schedulers may not be so smart (e.g. the Django database scheduler, see below) and in that case you will have to reset the schedule manually.

Django Users

Celery recommends and is compatible with the new USE_TZ setting introduced in Django 1.4.

For Django users the time zone specified in the TIME_ZONE setting will be used, or you can specify a custom time zone for Celery alone by using the CELERY_TIMEZONE setting.

The database scheduler will not reset when timezone related settings change, so you must do this manually:

$ python manage.py shell
>>> from djcelery.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)
Entries

To schedule a task periodically you have to add an entry to the CELERYBEAT_SCHEDULE setting.

Example: Run the tasks.add task every 30 seconds.

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16)
    },
}

CELERY_TIMEZONE = 'UTC'

Note

If you are wondering where these settings should go then please see Configuration. You can either set these options on your app directly or you can keep a separate module for configuration.

If you want to use a single item tuple for args, don’t forget that the constructor is a comma and not a pair of parentheses.

Using a timedelta for the schedule means the task will be sent in 30 second intervals (the first task will be sent 30 seconds after celery beat starts, and then every 30 seconds after the last run).

A crontab like schedule also exists, see the section on Crontab schedules.

Like with cron, the tasks may overlap if the first task does not complete before the next. If that is a concern you should use a locking strategy to ensure only one instance can run at a time (see for example Ensuring a task is only executed one at a time).

Available Fields
  • task

    The name of the task to execute.

  • schedule

    The frequency of execution.

    This can be the number of seconds as an integer, a timedelta, or a crontab. You can also define your own custom schedule types, by extending the interface of schedule.

  • args

    Positional arguments (list or tuple).

  • kwargs

    Keyword arguments (dict).

  • options

    Execution options (dict).

    This can be any argument supported by apply_async(), e.g. exchange, routing_key, expires, and so on.

  • relative

    By default timedelta schedules are scheduled “by the clock”. This means the frequency is rounded to the nearest second, minute, hour or day depending on the period of the timedelta.

    If relative is true the frequency is not rounded and will be relative to the time when celery beat was started.

Crontab schedules

If you want more control over when the task is executed, for example, a particular time of day or day of the week, you can use the crontab schedule type:

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

The syntax of these crontab expressions are very flexible. Some examples:

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,
hour='0,3,6,9,12,15,18,21')
Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='*',
hour='*', day_of_week='sun')
Same as previous.
crontab(minute='*/10',
hour='3,17,22', day_of_week='thu,fri')
Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays.
crontab(minute=0, hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0, day_of_month='2') Execute on the second day of every month.
crontab(0, 0,
day_of_month='2-30/3')
Execute on every even numbered day.
crontab(0, 0,
day_of_month='1-7,15-21')
Execute on the first and third weeks of the month.
crontab(0, 0, day_of_month='11',
month_of_year='5')
Execute on 11th of May every year.
crontab(0, 0,
month_of_year='*/3')
Execute on the first month of every quarter.

See celery.schedules.crontab for more documentation.

Starting the Scheduler

To start the celery beat service:

$ celery -A proj beat

You can also start embed beat inside the worker by enabling workers -B option, this is convenient if you will never run more than one worker node, but it’s not commonly used and for that reason is not recommended for production use:

$ celery -A proj worker -B

Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

Note

To daemonize beat see Running the worker as a daemon.

Using custom scheduler classes

Custom scheduler classes can be specified on the command-line (the -S argument). The default scheduler is celery.beat.PersistentScheduler, which is simply keeping track of the last run times in a local database file (a shelve).

django-celery also ships with a scheduler that stores the schedule in the Django database:

$ celery -A proj beat -S djcelery.schedulers.DatabaseScheduler

Using django-celery‘s scheduler you can add, modify and remove periodic tasks from the Django Admin.

HTTP Callback Tasks (Webhooks)

Basics

If you need to call into another language, framework or similar, you can do so by using HTTP callback tasks.

The HTTP callback tasks uses GET/POST data to pass arguments and returns result as a JSON response. The scheme to call a task is:

GET http://example.com/mytask/?arg1=a&arg2=b&arg3=c

or using POST:

POST http://example.com/mytask

Note

POST data needs to be form encoded.

Whether to use GET or POST is up to you and your requirements.

The web page should then return a response in the following format if the execution was successful:

{'status': 'success', 'retval': …}

or if there was an error:

{'status': 'failure', 'reason': 'Invalid moon alignment.'}
Enabling the HTTP task

To enable the HTTP dispatch task you have to add celery.task.http to CELERY_IMPORTS, or start the worker with -I celery.task.http.

Django webhook example

With this information you could define a simple task in Django:

from django.http import HttpResponse
from anyjson import serialize


def multiply(request):
    x = int(request.GET['x'])
    y = int(request.GET['y'])
    result = x * y
    response = {'status': 'success', 'retval': result}
    return HttpResponse(serialize(response), mimetype='application/json')
Ruby on Rails webhook example

or in Ruby on Rails:

def multiply
    @x = params[:x].to_i
    @y = params[:y].to_i

    @status = {:status => 'success', :retval => @x * @y}

    render :json => @status
end

You can easily port this scheme to any language/framework; new examples and libraries are very welcome.

Calling webhook tasks

To call a task you can use the URL class:

>>> from celery.task.http import URL
>>> res = URL('http://example.com/multiply').get_async(x=10, y=10)

URL is a shortcut to the HttpDispatchTask. You can subclass this to extend the functionality.

>>> from celery.task.http import HttpDispatchTask
>>> res = HttpDispatchTask.delay(
...     url='http://example.com/multiply',
...     method='GET', x=10, y=10)
>>> res.get()
100

The output of celery worker (or the log file if enabled) should show the task being executed:

[INFO/MainProcess] Task celery.task.http.HttpDispatchTask
        [f2cc8efc-2a14-40cd-85ad-f1c77c94beeb] processed: 100

Since calling tasks can be done via HTTP using the djcelery.views.apply() view, calling tasks from other languages is easy. For an example service exposing tasks via HTTP you should have a look at examples/celery_http_gateway in the Celery distribution: http://github.com/celery/celery/tree/master/examples/celery_http_gateway/

Routing Tasks

Note

Alternate routing concepts like topic and fanout may not be available for all transports, please consult the transport comparison table.

Basics
Automatic routing

The simplest way to do routing is to use the CELERY_CREATE_MISSING_QUEUES setting (on by default).

With this setting on, a named queue that is not already defined in CELERY_QUEUES will be created automatically. This makes it easy to perform simple routing tasks.

Say you have two servers, x, and y that handles regular tasks, and one server z, that only handles feed related tasks. You can use this configuration:

CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}

With this route enabled import feed tasks will be routed to the “feeds” queue, while all other tasks will be routed to the default queue (named “celery” for historical reasons).

Now you can start server z to only process the feeds queue like this:

user@z:/$ celery -A proj worker -Q feeds

You can specify as many queues as you want, so you can make this server process the default queue as well:

user@z:/$ celery -A proj worker -Q feeds,celery
Changing the name of the default queue

You can change the name of the default queue by using the following configuration:

from kombu import Exchange, Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
)
How the queues are defined

The point with this feature is to hide the complex AMQP protocol for users with only basic needs. However – you may still be interested in how these queues are declared.

A queue named “video” will be created with the following settings:

{'exchange': 'video',
 'exchange_type': 'direct',
 'routing_key': 'video'}

The non-AMQP backends like ghettoq does not support exchanges, so they require the exchange to have the same name as the queue. Using this design ensures it will work for them as well.

Manual routing

Say you have two servers, x, and y that handles regular tasks, and one server z, that only handles feed related tasks, you can use this configuration:

from kombu import Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default'

CELERY_QUEUES is a list of Queue instances. If you don’t set the exchange or exchange type values for a key, these will be taken from the CELERY_DEFAULT_EXCHANGE and CELERY_DEFAULT_EXCHANGE_TYPE settings.

To route a task to the feed_tasks queue, you can add an entry in the CELERY_ROUTES setting:

CELERY_ROUTES = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

You can also override this using the routing_key argument to Task.apply_async(), or send_task():

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

To make server z consume from the feed queue exclusively you can start it with the -Q option:

user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h

Servers x and y must be configured to consume from the default queue:

user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

If you want, you can even have your feed processing worker handle regular tasks as well, maybe in times when there’s a lot of work to do:

user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h

If you have another queue but on another exchange you want to add, just specify a custom exchange and exchange type:

from kombu import Exchange, Queue

CELERY_QUEUES = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
)

If you’re confused about these terms, you should read up on AMQP.

See also

In addition to the AMQP Primer below, there’s Rabbits and Warrens, an excellent blog post describing queues and exchanges. There’s also AMQP in 10 minutes*: Flexible Routing Model, and Standard Exchange Types. For users of RabbitMQ the RabbitMQ FAQ could be useful as a source of information.

AMQP Primer
Messages

A message consists of headers and a body. Celery uses headers to store the content type of the message and its content encoding. The content type is usually the serialization format used to serialize the message. The body contains the name of the task to execute, the task id (UUID), the arguments to apply it with and some additional metadata – like the number of retries or an ETA.

This is an example task message represented as a Python dictionary:

{'task': 'myapp.tasks.add',
 'id': '54086c5e-6193-4575-8308-dbab76798756',
 'args': [4, 4],
 'kwargs': {}}
Producers, consumers and brokers

The client sending messages is typically called a publisher, or a producer, while the entity receiving messages is called a consumer.

The broker is the message server, routing messages from producers to consumers.

You are likely to see these terms used a lot in AMQP related material.

Exchanges, queues and routing keys.
  1. Messages are sent to exchanges.
  2. An exchange routes messages to one or more queues. Several exchange types exists, providing different ways to do routing, or implementing different messaging scenarios.
  3. The message waits in the queue until someone consumes it.
  4. The message is deleted from the queue when it has been acknowledged.

The steps required to send and receive messages are:

  1. Create an exchange
  2. Create a queue
  3. Bind the queue to the exchange.

Celery automatically creates the entities necessary for the queues in CELERY_QUEUES to work (except if the queue’s auto_declare setting is set to False).

Here’s an example queue configuration with three queues; One for video, one for images and one default queue for everything else:

from kombu import Exchange, Queue

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos',  Exchange('media'),   routing_key='media.video'),
    Queue('images',  Exchange('media'),   routing_key='media.image'),
)
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
Exchange types

The exchange type defines how the messages are routed through the exchange. The exchange types defined in the standard are direct, topic, fanout and headers. Also non-standard exchange types are available as plug-ins to RabbitMQ, like the last-value-cache plug-in by Michael Bridgen.

Direct exchanges

Direct exchanges match by exact routing keys, so a queue bound by the routing key video only receives messages with that routing key.

Topic exchanges

Topic exchanges matches routing keys using dot-separated words, and the wildcard characters: * (matches a single word), and # (matches zero or more words).

With routing keys like usa.news, usa.weather, norway.news and norway.weather, bindings could be *.news (all news), usa.# (all items in the USA) or usa.weather (all USA weather items).

Hands-on with the API

Celery comes with a tool called celery amqp that is used for command line access to the AMQP API, enabling access to administration tasks like creating/deleting queues and exchanges, purging queues or sending messages. It can also be used for non-AMQP brokers, but different implementation may not implement all commands.

You can write commands directly in the arguments to celery amqp, or just start with no arguments to start it in shell-mode:

$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>

Here 1> is the prompt. The number 1, is the number of commands you have executed so far. Type help for a list of commands available. It also supports auto-completion, so you can start typing a command and then hit the tab key to show a list of possible matches.

Let’s create a queue you can send messages to:

$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.

This created the direct exchange testexchange, and a queue named testqueue. The queue is bound to the exchange using the routing key testkey.

From now on all messages sent to the exchange testexchange with routing key testkey will be moved to this queue. You can send a message by using the basic.publish command:

4> basic.publish 'This is a message!' testexchange testkey
ok.

Now that the message is sent you can retrieve it again. You can use the basic.get` command here, which polls for new messages on the queue (which is alright for maintainence tasks, for services you’d want to use basic.consume instead)

Pop a message off the queue:

5> basic.get testqueue
{'body': 'This is a message!',
 'delivery_info': {'delivery_tag': 1,
                   'exchange': u'testexchange',
                   'message_count': 0,
                   'redelivered': False,
                   'routing_key': u'testkey'},
 'properties': {}}

AMQP uses acknowledgment to signify that a message has been received and processed successfully. If the message has not been acknowledged and consumer channel is closed, the message will be delivered to another consumer.

Note the delivery tag listed in the structure above; Within a connection channel, every received message has a unique delivery tag, This tag is used to acknowledge the message. Also note that delivery tags are not unique across connections, so in another client the delivery tag 1 might point to a different message than in this channel.

You can acknowledge the message you received using basic.ack:

6> basic.ack 1
ok.

To clean up after our test session you should delete the entities you created:

7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.
Routing Tasks
Defining queues

In Celery available queues are defined by the CELERY_QUEUES setting.

Here’s an example queue configuration with three queues; One for video, one for images and one default queue for everything else:

default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

Here, the CELERY_DEFAULT_QUEUE will be used to route tasks that doesn’t have an explicit route.

The default exchange, exchange type and routing key will be used as the default routing values for tasks, and as the default values for entries in CELERY_QUEUES.

Specifying task destination

The destination for a task is decided by the following (in order):

  1. The Routers defined in CELERY_ROUTES.
  2. The routing arguments to Task.apply_async().
  3. Routing related attributes defined on the Task itself.

It is considered best practice to not hard-code these settings, but rather leave that as configuration options by using Routers; This is the most flexible approach, but sensible defaults can still be set as task attributes.

Routers

A router is a class that decides the routing options for a task.

All you need to define a new router is to create a class with a route_for_task method:

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}
        return None

If you return the queue key, it will expand with the defined settings of that queue in CELERY_QUEUES:

{'queue': 'video', 'routing_key': 'video.compress'}

becomes –>

{'queue': 'video',
 'exchange': 'video',
 'exchange_type': 'topic',
 'routing_key': 'video.compress'}

You install router classes by adding them to the CELERY_ROUTES setting:

CELERY_ROUTES = (MyRouter(), )

Router classes can also be added by name:

CELERY_ROUTES = ('myapp.routers.MyRouter', )

For simple task name -> route mappings like the router example above, you can simply drop a dict into CELERY_ROUTES to get the same behavior:

CELERY_ROUTES = ({'myapp.tasks.compress_video': {
                        'queue': 'video',
                        'routing_key': 'video.compress'
                 }}, )

The routers will then be traversed in order, it will stop at the first router returning a true value, and use that as the final route for the task.

Broadcast

Celery can also support broadcast routing. Here is an example exchange broadcast_tasks that delivers copies of tasks to all workers connected to it:

from kombu.common import Broadcast

CELERY_QUEUES = (Broadcast('broadcast_tasks'), )

CELERY_ROUTES = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}}

Now the tasks.reload_cache task will be sent to every worker consuming from this queue.

Broadcast & Results

Note that Celery result does not define what happens if two tasks have the same task_id. If the same task is distributed to more than one worker, then the state history may not be preserved.

It is a good idea to set the task.ignore_result attribute in this case.

Monitoring and Management Guide

Introduction

There are several tools available to monitor and inspect Celery clusters.

This document describes some of these, as as well as features related to monitoring, like events and broadcast commands.

Workers
Management Command-line Utilities (inspect/control)

celery can also be used to inspect and manage worker nodes (and to some degree tasks).

To list all the commands available do:

$ celery help

or to get help for a specific command do:

$ celery <command> --help
Commands
  • shell: Drop into a Python shell.

    The locals will include the celery variable, which is the current app. Also all known tasks will be automatically added to locals (unless the --without-tasks flag is set).

    Uses Ipython, bpython, or regular python in that order if installed. You can force an implementation using --force-ipython|-I, --force-bpython|-B, or --force-python|-P.

  • status: List active nodes in this cluster

    $ celery -A proj status
    
  • result: Show the result of a task

    $ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
    

    Note that you can omit the name of the task as long as the task doesn’t use a custom result backend.

  • purge: Purge messages from all configured task queues.

    Warning

    There is no undo for this operation, and messages will be permanently deleted!

    $ celery -A proj purge
    
  • inspect active: List active tasks

    $ celery -A proj inspect active
    

    These are all the tasks that are currently being executed.

  • inspect scheduled: List scheduled ETA tasks

    $ celery -A proj inspect scheduled
    

    These are tasks reserved by the worker because they have the eta or countdown argument set.

  • inspect reserved: List reserved tasks

    $ celery -A proj inspect reserved
    

    This will list all tasks that have been prefetched by the worker, and is currently waiting to be executed (does not include tasks with an eta).

  • inspect revoked: List history of revoked tasks

    $ celery -A proj inspect revoked
    
  • inspect registered: List registered tasks

    $ celery -A proj inspect registered
    
  • inspect stats: Show worker statistics (see Statistics)

    $ celery -A proj inspect stats
    
  • control enable_events: Enable events

    $ celery -A proj control enable_events
    
  • control disable_events: Disable events

    $ celery -A proj control disable_events
    
  • migrate: Migrate tasks from one broker to another (EXPERIMENTAL).

    $ celery -A proj migrate redis://localhost amqp://localhost
    

    This command will migrate all the tasks on one broker to another. As this command is new and experimental you should be sure to have a backup of the data before proceeding.

Note

All inspect and control commands supports a --timeout argument, This is the number of seconds to wait for responses. You may have to increase this timeout if you’re not getting a response due to latency.

Specifying destination nodes

By default the inspect and control commands operates on all workers. You can specify a single, or a list of workers by using the –destination argument:

$ celery -A proj inspect -d w1,w2 reserved

$ celery -A proj control -d w1,w2 enable_events
Flower: Real-time Celery web-monitor

Flower is a real-time web based monitor and administration tool for Celery. It is under active development, but is already an essential tool. Being the recommended monitor for Celery, it obsoletes the Django-Admin monitor, celerymon and the ncurses based monitor.

Flower is pronounced like “flow”, but you can also use the botanical version if you prefer.

Features
  • Real-time monitoring using Celery Events

    • Task progress and history
    • Ability to show task details (arguments, start time, runtime, and more)
    • Graphs and statistics
  • Remote Control

    • View worker status and statistics
    • Shutdown and restart worker instances
    • Control worker pool size and autoscale settings
    • View and modify the queues a worker instance consumes from
    • View currently running tasks
    • View scheduled tasks (ETA/countdown)
    • View reserved and revoked tasks
    • Apply time and rate limits
    • Configuration viewer
    • Revoke or terminate tasks
  • HTTP API

  • OpenID authentication

Screenshots

_images/dashboard.png
_images/monitor.png

More screenshots:

Usage

You can use pip to install Flower:

$ pip install flower

Running the flower command will start a web-server that you can visit:

$ celery -A proj flower

The default port is http://localhost:5555, but you can change this using the –port argument:

$ celery -A proj flower --port=5555

Broker URL can also be passed through the –broker argument :

$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0

Then, you can visit flower in your web browser :

$ open http://localhost:5555

Flower has many more features than are detailed here, including authorization options. Check out the official documentation for more information.

celery events: Curses Monitor

New in version 2.0.

celery events is a simple curses monitor displaying task and worker history. You can inspect the result and traceback of tasks, and it also supports some management commands like rate limiting and shutting down workers. This monitor was started as a proof of concept, and you probably want to use Flower instead.

Starting:

$ celery -A proj events

You should see a screen like:

_images/celeryevshotsm1.jpg

celery events is also used to start snapshot cameras (see Snapshots:

$ celery -A proj events --camera=<camera-class> --frequency=1.0

and it includes a tool to dump events to stdout:

$ celery -A proj events --dump

For a complete list of options use --help:

$ celery events --help
RabbitMQ

To manage a Celery cluster it is important to know how RabbitMQ can be monitored.

RabbitMQ ships with the rabbitmqctl(1) command, with this you can list queues, exchanges, bindings, queue lengths, the memory usage of each queue, as well as manage users, virtual hosts and their permissions.

Note

The default virtual host ("/") is used in these examples, if you use a custom virtual host you have to add the -p argument to the command, e.g: rabbitmqctl list_queues -p my_vhost

Inspecting queues

Finding the number of tasks in a queue:

$ rabbitmqctl list_queues name messages messages_ready \
                          messages_unacknowledged

Here messages_ready is the number of messages ready for delivery (sent but not received), messages_unacknowledged is the number of messages that has been received by a worker but not acknowledged yet (meaning it is in progress, or has been reserved). messages is the sum of ready and unacknowledged messages.

Finding the number of workers currently consuming from a queue:

$ rabbitmqctl list_queues name consumers

Finding the amount of memory allocated to a queue:

$ rabbitmqctl list_queues name memory
Tip:Adding the -q option to rabbitmqctl(1) makes the output easier to parse.
Redis

If you’re using Redis as the broker, you can monitor the Celery cluster using the redis-cli(1) command to list lengths of queues.

Inspecting queues

Finding the number of tasks in a queue:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

The default queue is named celery. To get all available queues, invoke:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*

Note

Queue keys only exists when there are tasks in them, so if a key does not exist it simply means there are no messages in that queue. This is because in Redis a list with no elements in it is automatically removed, and hence it won’t show up in the keys command output, and llen for that list returns 0.

Also, if you’re using Redis for other purposes, the output of the keys command will include unrelated values stored in the database. The recommended way around this is to use a dedicated DATABASE_NUMBER for Celery, you can also use database numbers to separate Celery applications from each other (virtual hosts), but this will not affect the monitoring events used by e.g. Flower as Redis pub/sub commands are global rather than database based.

Munin

This is a list of known Munin plug-ins that can be useful when maintaining a Celery cluster.

Events

The worker has the ability to send a message whenever some event happens. These events are then captured by tools like Flower, and celery events to monitor the cluster.

Snapshots

New in version 2.1.

Even a single worker can produce a huge amount of events, so storing the history of all events on disk may be very expensive.

A sequence of events describes the cluster state in that time period, by taking periodic snapshots of this state you can keep all history, but still only periodically write it to disk.

To take snapshots you need a Camera class, with this you can define what should happen every time the state is captured; You can write it to a database, send it by email or something else entirely.

celery events is then used to take snapshots with the camera, for example if you want to capture state every 2 seconds using the camera myapp.Camera you run celery events with the following arguments:

$ celery -A proj events -c myapp.Camera --frequency=2.0
Custom Camera

Cameras can be useful if you need to capture events and do something with those events at an interval. For real-time event processing you should use app.events.Receiver directly, like in Real-time processing.

Here is an example camera, dumping the snapshot to screen:

from pprint import pformat

from celery.events.snapshot import Polaroid

class DumpCam(Polaroid):

    def on_shutter(self, state):
        if not state.event_count:
            # No new events since last snapshot.
            return
        print('Workers: {0}'.format(pformat(state.workers, indent=4)))
        print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
        print('Total: {0.event_count} events, {0.task_count} tasks'.format(
            state))

See the API reference for celery.events.state to read more about state objects.

Now you can use this cam with celery events by specifying it with the -c option:

$ celery -A proj events -c myapp.DumpCam --frequency=2.0

Or you can use it programmatically like this:

from celery import Celery
from myapp import DumpCam

def main(app, freq=1.0):
    state = app.events.State()
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': state.event})
        with DumpCam(state, freq=freq):
            recv.capture(limit=None, timeout=None)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    main(app)
Real-time processing

To process events in real-time you need the following

  • An event consumer (this is the Receiver)

  • A set of handlers called when events come in.

    You can have different handlers for each event type, or a catch-all handler can be used (‘*’)

  • State (optional)

    app.events.State is a convenient in-memory representation of tasks and workers in the cluster that is updated as events come in.

    It encapsulates solutions for many common things, like checking if a worker is still alive (by verifying heartbeats), merging event fields together as events come in, making sure timestamps are in sync, and so on.

Combining these you can easily process events in real-time:

from celery import Celery


def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(), ))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Note

The wakeup argument to capture sends a signal to all workers to force them to send a heartbeat. This way you can immediately see workers when the monitor starts.

You can listen to specific events by specifying the handlers:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(), ))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)
Event Reference

This list contains the events sent by the worker, and their arguments.

Task Events
task-sent
signature:task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key)

Sent when a task message is published and the CELERY_SEND_TASK_SENT_EVENT setting is enabled.

task-received
signature:task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp)

Sent when the worker receives a task.

task-started
signature:task-started(uuid, hostname, timestamp, pid)

Sent just before the worker executes the task.

task-succeeded
signature:task-succeeded(uuid, result, runtime, hostname, timestamp)

Sent if the task executed successfully.

Runtime is the time it took to execute the task using the pool. (Starting from the task is sent to the worker pool, and ending when the pool result handler callback is called).

task-failed
signature:task-failed(uuid, exception, traceback, hostname, timestamp)

Sent if the execution of the task failed.

task-revoked
signature:task-revoked(uuid, terminated, signum, expired)

Sent if the task has been revoked (Note that this is likely to be sent by more than one worker).

  • terminated is set to true if the task process was terminated,

    and the signum field set to the signal used.

  • expired is set to true if the task expired.

task-retried
signature:task-retried(uuid, exception, traceback, hostname, timestamp)

Sent if the task failed, but will be retried in the future.

Worker Events
worker-online
signature:worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

The worker has connected to the broker and is online.

  • hostname: Hostname of the worker.
  • timestamp: Event timestamp.
  • freq: Heartbeat frequency in seconds (float).
  • sw_ident: Name of worker software (e.g. py-celery).
  • sw_ver: Software version (e.g. 2.2.0).
  • sw_sys: Operating System (e.g. Linux, Windows, Darwin).
worker-heartbeat
signature:worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)

Sent every minute, if the worker has not sent a heartbeat in 2 minutes, it is considered to be offline.

  • hostname: Hostname of the worker.
  • timestamp: Event timestamp.
  • freq: Heartbeat frequency in seconds (float).
  • sw_ident: Name of worker software (e.g. py-celery).
  • sw_ver: Software version (e.g. 2.2.0).
  • sw_sys: Operating System (e.g. Linux, Windows, Darwin).
  • active: Number of currently executing tasks.
  • processed: Total number of tasks processed by this worker.
worker-offline
signature:worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

The worker has disconnected from the broker.

Security

Introduction

While Celery is written with security in mind, it should be treated as an unsafe component.

Depending on your Security Policy, there are various steps you can take to make your Celery installation more secure.

Areas of Concern
Broker

It is imperative that the broker is guarded from unwanted access, especially if accessible to the public. By default, workers trust that the data they get from the broker has not been tampered with. See Message Signing for information on how to make the broker connection more trustworthy.

The first line of defence should be to put a firewall in front of the broker, allowing only white-listed machines to access it.

Keep in mind that both firewall misconfiguration, and temporarily disabling the firewall, is common in the real world. Solid security policy includes monitoring of firewall equipment to detect if they have been disabled, be it accidentally or on purpose.

In other words, one should not blindly trust the firewall either.

If your broker supports fine-grained access control, like RabbitMQ, this is something you should look at enabling. See for example http://www.rabbitmq.com/access-control.html.

If supported by your broker backend, you can enable end-to-end SSL encryption and authentication using BROKER_USE_SSL.

Client

In Celery, “client” refers to anything that sends messages to the broker, e.g. web-servers that apply tasks.

Having the broker properly secured doesn’t matter if arbitrary messages can be sent through a client.

[Need more text here]

Worker

The default permissions of tasks running inside a worker are the same ones as the privileges of the worker itself. This applies to resources such as memory, file-systems and devices.

An exception to this rule is when using the multiprocessing based task pool, which is currently the default. In this case, the task will have access to any memory copied as a result of the fork() call (does not apply under MS Windows), and access to memory contents written by parent tasks in the same worker child process.

Limiting access to memory contents can be done by launching every task in a subprocess (fork() + execve()).

Limiting file-system and device access can be accomplished by using chroot, jail, sandboxing, virtual machines or other mechanisms as enabled by the platform or additional software.

Note also that any task executed in the worker will have the same network access as the machine on which it’s running. If the worker is located on an internal network it’s recommended to add firewall rules for outbound traffic.

Serializers

The default pickle serializer is convenient because it supports arbitrary Python objects, whereas other serializers only work with a restricted set of types.

But for the same reasons the pickle serializer is inherently insecure [*], and should be avoided whenever clients are untrusted or unauthenticated.

[*]http://nadiana.com/python-pickle-insecure

You can disable untrusted content by specifying a white-list of accepted content-types in the CELERY_ACCEPT_CONTENT setting:

New in version 3.0.18.

Note

This setting was first supported in version 3.0.18. If you’re running an earlier version it will simply be ignored, so make sure you’re running a version that supports it.

CELERY_ACCEPT_CONTENT = ['json']

This accepts a list of serializer names and content-types, so you could also specify the content type for json:

CELERY_ACCEPT_CONTENT = ['application/json']

Celery also comes with a special auth serializer that validates communication between Celery clients and workers, making sure that messages originates from trusted sources. Using Public-key cryptography the auth serializer can verify the authenticity of senders, to enable this read Message Signing for more information.

Message Signing

Celery can use the pyOpenSSL library to sign message using Public-key cryptography, where messages sent by clients are signed using a private key and then later verified by the worker using a public certificate.

Optimally certificates should be signed by an official Certificate Authority, but they can also be self-signed.

To enable this you should configure the CELERY_TASK_SERIALIZER setting to use the auth serializer. Also required is configuring the paths used to locate private keys and certificates on the file-system: the CELERY_SECURITY_KEY, CELERY_SECURITY_CERTIFICATE and CELERY_SECURITY_CERT_STORE settings respectively. With these configured it is also necessary to call the celery.setup_security() function. Note that this will also disable all insecure serializers so that the worker won’t accept messages with untrusted content types.

This is an example configuration using the auth serializer, with the private key and certificate files located in /etc/ssl.

CELERY_SECURITY_KEY = '/etc/ssl/private/worker.key'
CELERY_SECURITY_CERTIFICATE = '/etc/ssl/certs/worker.pem'
CELERY_SECURITY_CERT_STORE = '/etc/ssl/certs/*.pem'
from celery.security import setup_security
setup_security()

Note

While relative paths are not disallowed, using absolute paths is recommended for these files.

Also note that the auth serializer won’t encrypt the contents of a message, so if needed this will have to be enabled separately.

Intrusion Detection

The most important part when defending your systems against intruders is being able to detect if the system has been compromised.

Logs

Logs are usually the first place to look for evidence of security breaches, but they are useless if they can be tampered with.

A good solution is to set up centralized logging with a dedicated logging server. Access to it should be restricted. In addition to having all of the logs in a single place, if configured correctly, it can make it harder for intruders to tamper with your logs.

This should be fairly easy to setup using syslog (see also syslog-ng and rsyslog.). Celery uses the logging library, and already has support for using syslog.

A tip for the paranoid is to send logs using UDP and cut the transmit part of the logging server’s network cable :-)

Tripwire

Tripwire is a (now commercial) data integrity tool, with several open source implementations, used to keep cryptographic hashes of files in the file-system, so that administrators can be alerted when they change. This way when the damage is done and your system has been compromised you can tell exactly what files intruders have changed (password files, logs, backdoors, rootkits and so on). Often this is the only way you will be able to detect an intrusion.

Some open source implementations include:

Also, the ZFS file-system comes with built-in integrity checks that can be used.

Optimizing

Introduction

The default configuration makes a lot of compromises. It’s not optimal for any single case, but works well enough for most situations.

There are optimizations that can be applied based on specific use cases.

Optimizations can apply to different properties of the running environment, be it the time tasks take to execute, the amount of memory used, or responsiveness at times of high load.

Ensuring Operations

In the book Programming Pearls, Jon Bentley presents the concept of back-of-the-envelope calculations by asking the question;

❝ How much water flows out of the Mississippi River in a day? ❞

The point of this exercise [*] is to show that there is a limit to how much data a system can process in a timely manner. Back of the envelope calculations can be used as a means to plan for this ahead of time.

In Celery; If a task takes 10 minutes to complete, and there are 10 new tasks coming in every minute, the queue will never be empty. This is why it’s very important that you monitor queue lengths!

A way to do this is by using Munin. You should set up alerts, that will notify you as soon as any queue has reached an unacceptable size. This way you can take appropriate action like adding new worker nodes, or revoking unnecessary tasks.

[*]The chapter is available to read for free here: The back of the envelope. The book is a classic text. Highly recommended.
General Settings
librabbitmq (Python 2 only)

If you’re using RabbitMQ (AMQP) as the broker then you can install the librabbitmq module to use an optimized client written in C:

$ pip install librabbitmq

The ‘amqp’ transport will automatically use the librabbitmq module if it’s installed, or you can also specify the transport you want directly by using the pyamqp:// or librabbitmq:// prefixes.

Broker Connection Pools

The broker connection pool is enabled by default since version 2.5.

You can tweak the BROKER_POOL_LIMIT setting to minimize contention, and the value should be based on the number of active threads/greenthreads using broker connections.

Using Transient Queues

Queues created by Celery are persistent by default. This means that the broker will write messages to disk to ensure that the tasks will be executed even if the broker is restarted.

But in some cases it’s fine that the message is lost, so not all tasks require durability. You can create a transient queue for these tasks to improve performance:

from kombu import Exchange, Queue

CELERY_QUEUES = (
    Queue('celery', routing_key='celery'),
    Queue('transient', routing_key='transient',
          delivery_mode=1),
)

The delivery_mode changes how the messages to this queue are delivered. A value of 1 means that the message will not be written to disk, and a value of 2 (default) means that the message can be written to disk.

To direct a task to your new transient queue you can specify the queue argument (or use the CELERY_ROUTES setting):

task.apply_async(args, queue='transient')

For more information see the routing guide.

Worker Settings
Prefetch Limits

Prefetch is a term inherited from AMQP that is often misunderstood by users.

The prefetch limit is a limit for the number of tasks (messages) a worker can reserve for itself. If it is zero, the worker will keep consuming messages, not respecting that there may be other available worker nodes that may be able to process them sooner [†], or that the messages may not even fit in memory.

The workers’ default prefetch count is the CELERYD_PREFETCH_MULTIPLIER setting multiplied by the number of concurrency slots[*]_ (processes/threads/greenthreads).

If you have many tasks with a long duration you want the multiplier value to be 1, which means it will only reserve one task per worker process at a time.

However – If you have many short-running tasks, and throughput/round trip latency is important to you, this number should be large. The worker is able to process more tasks per second if the messages have already been prefetched, and is available in memory. You may have to experiment to find the best value that works for you. Values like 50 or 150 might make sense in these circumstances. Say 64, or 128.

If you have a combination of long- and short-running tasks, the best option is to use two worker nodes that are configured separately, and route the tasks according to the run-time. (see Routing Tasks).

[†]RabbitMQ and other brokers deliver messages round-robin, so this doesn’t apply to an active system. If there is no prefetch limit and you restart the cluster, there will be timing delays between nodes starting. If there are 3 offline nodes and one active node, all messages will be delivered to the active node.
[‡]This is the concurrency setting; CELERYD_CONCURRENCY or the -c option to the celery worker program.
Reserve one task at a time

When using early acknowledgement (default), a prefetch multiplier of 1 means the worker will reserve at most one extra task for every active worker process.

When users ask if it’s possible to disable “prefetching of tasks”, often what they really want is to have a worker only reserve as many tasks as there are child processes.

But this is not possible without enabling late acknowledgements acknowledgements; A task that has been started, will be retried if the worker crashes mid execution so the task must be idempotent (see also notes at Should I use retry or acks_late?).

You can enable this behavior by using the following configuration options:

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
Prefork pool prefetch settings

The prefork pool will asynchronously send as many tasks to the processes as it can and this means that the processes are, in effect, prefetching tasks.

This benefits performance but it also means that tasks may be stuck waiting for long running tasks to complete:

-> send T1 to Process A
# A executes T1
-> send T2 to Process B
# B executes T2
<- T2 complete

-> send T3 to Process A
# A still executing T1, T3 stuck in local buffer and will not start until
# T1 returns, and other queued tasks will not be sent to idle processes

The worker will send tasks to the process as long as the pipe buffer is writable. The pipe buffer size varies based on the operating system: some may have a buffer as small as 64kb but on recent Linux versions the buffer size is 1MB (can only be changed system wide).

You can disable this prefetching behavior by enabling the -Ofair worker option:

$ celery -A proj worker -l info -Ofair

With this option enabled the worker will only write to processes that are available for work, disabling the prefetch behavior.

Concurrency

Release:3.1
Date:January 23, 2016
Concurrency with Eventlet
Introduction

The Eventlet homepage describes it as; A concurrent networking library for Python that allows you to change how you run your code, not how you write it.

  • It uses epoll(4) or libevent for highly scalable non-blocking I/O.
  • Coroutines ensure that the developer uses a blocking style of programming that is similar to threading, but provide the benefits of non-blocking I/O.
  • The event dispatch is implicit, which means you can easily use Eventlet from the Python interpreter, or as a small part of a larger application.

Celery supports Eventlet as an alternative execution pool implementation. It is in some cases superior to prefork, but you need to ensure your tasks do not perform blocking calls, as this will halt all other operations in the worker until the blocking call returns.

The prefork pool can take use of multiple processes, but how many is often limited to a few processes per CPU. With Eventlet you can efficiently spawn hundreds, or thousands of green threads. In an informal test with a feed hub system the Eventlet pool could fetch and process hundreds of feeds every second, while the prefork pool spent 14 seconds processing 100 feeds. Note that is one of the applications evented I/O is especially good at (asynchronous HTTP requests). You may want a mix of both Eventlet and prefork workers, and route tasks according to compatibility or what works best.

Enabling Eventlet

You can enable the Eventlet pool by using the -P option to celery worker:

$ celery -A proj worker -P eventlet -c 1000
Examples

See the Eventlet examples directory in the Celery distribution for some examples taking use of Eventlet support.

Signals

Signals allows decoupled applications to receive notifications when certain actions occur elsewhere in the application.

Celery ships with many signals that your application can hook into to augment behavior of certain actions.

Basics

Several kinds of events trigger signals, you can connect to these signals to perform actions as they trigger.

Example connecting to the after_task_publish signal:

from celery.signals import after_task_publish

@after_task_publish.connect
def task_sent_handler(sender=None, body=None, **kwargs):
    print('after_task_publish for task id {body[id]}'.format(
        body=body,
    ))

Some signals also have a sender which you can filter by. For example the after_task_publish signal uses the task name as a sender, so by providing the sender argument to connect you can connect your handler to be called every time a task with name “proj.tasks.add” is published:

@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, body=None, **kwargs):
    print('after_task_publish for task id {body[id]}'.format(
        body=body,
    ))

Signals use the same implementation as django.core.dispatch. As a result other keyword parameters (e.g. signal) are passed to all signal handlers by default.

The best practice for signal handlers is to accept arbitrary keyword arguments (i.e. **kwargs). That way new celery versions can add additional arguments without breaking user code.

Signals
Task Signals
before_task_publish

New in version 3.1.

Dispatched before a task is published. Note that this is executed in the process sending the task.

Sender is the name of the task being sent.

Provides arguments:

  • body

    Task message body.

    This is a mapping containing the task message fields (see Task Messages).

  • exchange

    Name of the exchange to send to or a Exchange object.

  • routing_key

    Routing key to use when sending the message.

  • headers

    Application headers mapping (can be modified).

  • properties

    Message properties (can be modified)

  • declare

    List of entities (Exchange, Queue or :class:~`kombu.binding` to declare before publishing the message. Can be modified.

  • retry_policy

    Mapping of retry options. Can be any argument to kombu.Connection.ensure() and can be modified.

after_task_publish

Dispatched when a task has been sent to the broker. Note that this is executed in the process that sent the task.

Sender is the name of the task being sent.

Provides arguments:

  • body

    The task message body, see Task Messages for a reference of possible fields that can be defined.

  • exchange

    Name of the exchange or Exchange object used.

  • routing_key

    Routing key used.

task_prerun

Dispatched before a task is executed.

Sender is the task object being executed.

Provides arguments:

  • task_id

    Id of the task to be executed.

  • task

    The task being executed.

  • args

    the tasks positional arguments.

  • kwargs

    The tasks keyword arguments.

task_postrun

Dispatched after a task has been executed.

Sender is the task object executed.

Provides arguments:

  • task_id

    Id of the task to be executed.

  • task

    The task being executed.

  • args

    The tasks positional arguments.

  • kwargs

    The tasks keyword arguments.

  • retval

    The return value of the task.

  • state

    Name of the resulting state.

task_retry

Dispatched when a task will be retried.

Sender is the task object.

Provides arguments:

  • request

    The current task request.

  • reason

    Reason for retry (usually an exception instance, but can always be coerced to str).

  • einfo

    Detailed exception information, including traceback (a billiard.einfo.ExceptionInfo object).

task_success

Dispatched when a task succeeds.

Sender is the task object executed.

Provides arguments

  • result

    Return value of the task.

task_failure

Dispatched when a task fails.

Sender is the task object executed.

Provides arguments:

  • task_id

    Id of the task.

  • exception

    Exception instance raised.

  • args

    Positional arguments the task was called with.

  • kwargs

    Keyword arguments the task was called with.

  • traceback

    Stack trace object.

  • einfo

    The celery.datastructures.ExceptionInfo instance.

task_revoked

Dispatched when a task is revoked/terminated by the worker.

Sender is the task object revoked/terminated.

Provides arguments:

  • request

    This is a Request instance, and not task.request. When using the prefork pool this signal is dispatched in the parent process, so task.request is not available and should not be used. Use this object instead, which should have many of the same fields.

  • terminated

    Set to True if the task was terminated.

  • signum

    Signal number used to terminate the task. If this is None and terminated is True then TERM should be assumed.

  • expired Set to True if the task expired.

App Signals
import_modules

This signal is sent when a program (worker, beat, shell) etc, asks for modules in the CELERY_INCLUDE and CELERY_IMPORTS settings to be imported.

Sender is the app instance.

Worker Signals
celeryd_after_setup

This signal is sent after the worker instance is set up, but before it calls run. This means that any queues from the -Q option is enabled, logging has been set up and so on.

It can be used to e.g. add custom queues that should always be consumed from, disregarding the -Q option. Here’s an example that sets up a direct queue for each worker, these queues can then be used to route a task to any specific worker:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    instance.app.amqp.queues.select_add(queue_name)

Provides arguments:

  • sender Hostname of the worker.

  • instance

    This is the celery.apps.worker.Worker instance to be initialized. Note that only the app and hostname (nodename) attributes have been set so far, and the rest of __init__ has not been executed.

  • conf

    The configuration of the current app.

celeryd_init

This is the first signal sent when celery worker starts up. The sender is the host name of the worker, so this signal can be used to setup worker specific configuration:

from celery.signals import celeryd_init

@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
    conf.CELERY_DEFAULT_RATE_LIMIT = '10/m'

or to set up configuration for multiple workers you can omit specifying a sender when you connect:

from celery.signals import celeryd_init

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    if sender in ('worker1@example.com', 'worker2@example.com'):
        conf.CELERY_DEFAULT_RATE_LIMIT = '10/m'
    if sender == 'worker3@example.com':
        conf.CELERYD_PREFETCH_MULTIPLIER = 0

Provides arguments:

  • sender Nodename of the worker.

  • instance

    This is the celery.apps.worker.Worker instance to be initialized. Note that only the app and hostname (nodename) attributes have been set so far, and the rest of __init__ has not been executed.

  • conf

    The configuration of the current app.

  • options

    Options passed to the worker from command-line arguments (including defaults).

worker_init

Dispatched before the worker is started.

worker_ready

Dispatched when the worker is ready to accept work.

worker_process_init

Dispatched in all pool child processes when they start.

Note that handlers attached to this signal must not be blocking for more than 4 seconds, or the process will be killed assuming it failed to start.

worker_process_shutdown

Dispatched in all pool child processes just before they exit.

Note: There is no guarantee that this signal will be dispatched, similarly to finally blocks it’s impossible to guarantee that handlers will be called at shutdown, and if called it may be interrupted during.

Provides arguments:

  • pid

    The pid of the child process that is about to shutdown.

  • exitcode

    The exitcode that will be used when the child process exits.

worker_shutdown

Dispatched when the worker is about to shut down.

Beat Signals
beat_init

Dispatched when celery beat starts (either standalone or embedded). Sender is the celery.beat.Service instance.

beat_embedded_init

Dispatched in addition to the beat_init signal when celery beat is started as an embedded process. Sender is the celery.beat.Service instance.

Eventlet Signals
eventlet_pool_started

Sent when the eventlet pool has been started.

Sender is the celery.concurrency.eventlet.TaskPool instance.

eventlet_pool_preshutdown

Sent when the worker shutdown, just before the eventlet pool is requested to wait for remaining workers.

Sender is the celery.concurrency.eventlet.TaskPool instance.

eventlet_pool_postshutdown

Sent when the pool has been joined and the worker is ready to shutdown.

Sender is the celery.concurrency.eventlet.TaskPool instance.

eventlet_pool_apply

Sent whenever a task is applied to the pool.

Sender is the celery.concurrency.eventlet.TaskPool instance.

Provides arguments:

  • target

    The target function.

  • args

    Positional arguments.

  • kwargs

    Keyword arguments.

Logging Signals
setup_logging

Celery won’t configure the loggers if this signal is connected, so you can use this to completely override the logging configuration with your own.

If you would like to augment the logging configuration setup by Celery then you can use the after_setup_logger and after_setup_task_logger signals.

Provides arguments:

  • loglevel

    The level of the logging object.

  • logfile

    The name of the logfile.

  • format

    The log format string.

  • colorize

    Specify if log messages are colored or not.

after_setup_logger

Sent after the setup of every global logger (not task loggers). Used to augment logging configuration.

Provides arguments:

  • logger

    The logger object.

  • loglevel

    The level of the logging object.

  • logfile

    The name of the logfile.

  • format

    The log format string.

  • colorize

    Specify if log messages are colored or not.

after_setup_task_logger

Sent after the setup of every single task logger. Used to augment logging configuration.

Provides arguments:

  • logger

    The logger object.

  • loglevel

    The level of the logging object.

  • logfile

    The name of the logfile.

  • format

    The log format string.

  • colorize

    Specify if log messages are colored or not.

Command signals
user_preload_options

This signal is sent after any of the Celery command line programs are finished parsing the user preload options.

It can be used to add additional command-line arguments to the celery umbrella command:

from celery import Celery
from celery import signals
from celery.bin.base import Option

app = Celery()
app.user_options['preload'].add(Option(
    '--monitoring', action='store_true',
    help='Enable our external monitoring utility, blahblah',
))

@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
    if options['monitoring']:
        enable_monitoring()

Sender is the Command instance, which depends on what program was called (e.g. for the umbrella command it will be a CeleryCommand) object).

Provides arguments:

  • app

    The app instance.

  • options

    Mapping of the parsed user preload options (with default values).

Deprecated Signals
task_sent

This signal is deprecated, please use after_task_publish instead.

Extensions and Bootsteps

Custom Message Consumers

You may want to embed custom Kombu consumers to manually process your messages.

For that purpose a special ConsumerStep bootstep class exists, where you only need to define the get_consumers method, which must return a list of kombu.Consumer objects to start whenever the connection is established:

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

def send_me_a_message(self, who='world!', producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.send(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )

if __name__ == '__main__':
    send_me_a_message('celery')

Note

Kombu Consumers can take use of two different message callback dispatching mechanisms. The first one is the callbacks argument which accepts a list of callbacks with a (body, message) signature, the second one is the on_message argument which takes a single callback with a (message, ) signature. The latter will not automatically decode and deserialize the payload which is useful in many cases:

def get_consumers(self, channel):
    return [Consumer(channel, queues=[my_queue],
                     on_message=self.on_message)]


def on_message(self, message):
    payload = message.decode()
    print(
        'Received message: {0!r} {props!r} rawlen={s}'.format(
        payload, props=message.properties, s=len(message.body),
    ))
    message.ack()
Blueprints

Bootsteps is a technique to add functionality to the workers. A bootstep is a custom class that defines hooks to do custom actions at different stages in the worker. Every bootstep belongs to a blueprint, and the worker currently defines two blueprints: Worker, and Consumer


Figure A: Bootsteps in the Worker and Consumer blueprints. Starting
from the bottom up the first step in the worker blueprint is the Timer, and the last step is to start the Consumer blueprint, which then establishes the broker connection and starts consuming messages.
_images/worker_graph_full.png

Worker

The Worker is the first blueprint to start, and with it starts major components like the event loop, processing pool, the timer, and also optional components like the autoscaler. When the worker is fully started it will continue to the Consumer blueprint.

The WorkController is the core worker implementation, and contains several methods and attributes that you can use in your bootstep.

Attributes
app

The current app instance.

hostname

The workers node name (e.g. worker1@example.com)

blueprint

This is the worker Blueprint.

hub

Event loop object (Hub). You can use this to register callbacks in the event loop.

This is only supported by async I/O enabled transports (amqp, redis), in which case the worker.use_eventloop attribute should be set.

Your worker bootstep must require the Hub bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.components:Hub', )
pool

The current process/eventlet/gevent/thread pool. See celery.concurrency.base.BasePool.

Your worker bootstep must require the Pool bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.components:Pool', )
timer

Timer used to schedule functions.

Your worker bootstep must require the Timer bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.components:Timer', )
statedb

Database <celery.worker.state.Persistent>` to persist state between worker restarts.

This is only defined if the statedb argument is enabled.

Your worker bootstep must require the Statedb bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.components:Statedb', )
autoscaler

Autoscaler used to automatically grow and shrink the number of processes in the pool.

This is only defined if the autoscale argument is enabled.

Your worker bootstep must require the Autoscaler bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoscaler:Autoscaler', )
autoreloader

Autoreloader used to automatically reload use code when the filesystem changes.

This is only defined if the autoreload argument is enabled. Your worker bootstep must require the Autoreloader bootstep to use this;

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoreloader:Autoreloader', )
Example worker bootstep

An example Worker bootstep could be:

from celery import bootsteps

class ExampleWorkerStep(bootsteps.StartStopStep):
    requires = ('Pool', )

    def __init__(self, worker, **kwargs):
        print('Called when the WorkController instance is constructed')
        print('Arguments to WorkController: {0!r}'.format(kwargs))

    def create(self, worker):
        # this method can be used to delegate the action methods
        # to another object that implements ``start`` and ``stop``.
        return self

    def start(self, worker):
        print('Called when the worker is started.')

    def stop(self, worker):
        print("Called when the worker shuts down.")

    def terminate(self, worker):
        print("Called when the worker terminates")

Every method is passed the current WorkController instance as the first argument.

Another example could use the timer to wake up at regular intervals:

from celery import bootsteps


class DeadlockDetection(bootsteps.StartStopStep):
    requires = ('Timer', )

    def __init__(self, worker, deadlock_timeout=3600):
        self.timeout = deadlock_timeout
        self.requests = []
        self.tref = None

    def start(self, worker):
        # run every 30 seconds.
        self.tref = worker.timer.call_repeatedly(
            30.0, self.detect, (worker, ), priority=10,
        )

    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None

    def detect(self, worker):
        # update active requests
        for req in self.worker.active_requests:
            if req.time_start and time() - req.time_start > self.timeout:
                raise SystemExit()
Consumer

The Consumer blueprint establishes a connection to the broker, and is restarted every time this connection is lost. Consumer bootsteps include the worker heartbeat, the remote control command consumer, and importantly, the task consumer.

When you create consumer bootsteps you must take into account that it must be possible to restart your blueprint. An additional ‘shutdown’ method is defined for consumer bootsteps, this method is called when the worker is shutdown.

Attributes
app

The current app instance.

controller

The parent WorkController object that created this consumer.

hostname

The workers node name (e.g. worker1@example.com)

blueprint

This is the worker Blueprint.

hub

Event loop object (Hub). You can use this to register callbacks in the event loop.

This is only supported by async I/O enabled transports (amqp, redis), in which case the worker.use_eventloop attribute should be set.

Your worker bootstep must require the Hub bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker:Hub', )
connection

The current broker connection (kombu.Connection).

A consumer bootstep must require the ‘Connection’ bootstep to use this:

class Step(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Connection', )
event_dispatcher

A app.events.Dispatcher object that can be used to send events.

A consumer bootstep must require the Events bootstep to use this.

class Step(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Events', )
gossip

Worker to worker broadcast communication (class:~celery.worker.consumer.Gossip).

A consumer bootstep must require the Gossip bootstep to use this.

class RatelimitStep(bootsteps.StartStopStep):
    """Rate limit tasks based on the number of workers in the
    cluster."""
    requires = ('celery.worker.consumer:Gossip',)

    def start(self, c):
        self.c = c
        self.c.gossip.on.node_join.add(self.on_cluster_size_change)
        self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
        self.c.gossip.on.node_lost.add(self.on_node_lost)
        self.tasks = [
            self.app.tasks['proj.tasks.add']
            self.app.tasks['proj.tasks.mul']
        ]
        self.last_size = None

    def on_cluster_size_change(self, worker):
        cluster_size = len(self.c.gossip.state.alive_workers())
        if cluster_size != self.last_size:
            for task in self.tasks:
                task.rate_limit = 1.0 / cluster_size
            self.c.reset_rate_limits()
            self.last_size = cluster_size

    def on_node_lost(self, worker):
        # may have processed heartbeat too late, so wake up in a while
        # to see if the worker recovered
        self.c.timer.call_after(10.0, self.on_cluster_size_change)

Callbacks

  • gossip.on.node_join(worker)

    Called whenever a new node joins the cluster, providing a Worker instance.

  • gossip.on.node_leave(worker)

    Called whenever a new node leaves the cluster (shuts down), providing a Worker instance.

  • gossip.on.node_lost(worker)

    Called whenever heartbeat was missed for a worker instance in the cluster (heartbeat not received or processed in time), providing a Worker instance.

    This does not necessarily mean the worker is actually offline, so use a time out mechanism if the default heartbeat timeout is not sufficient.

pool

The current process/eventlet/gevent/thread pool. See celery.concurrency.base.BasePool.

timer

Timer <celery.utils.timer2.Schedule used to schedule functions.

heart

Responsible for sending worker event heartbeats (Heart).

Your consumer bootstep must require the Heart bootstep to use this:

class Step(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Heart', )
task_consumer

The kombu.Consumer object used to consume task messages.

Your consumer bootstep must require the Tasks bootstep to use this:

class Step(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Heart', )
strategies

Every registered task type has an entry in this mapping, where the value is used to execute an incoming message of this task type (the task execution strategy). This mapping is generated by the Tasks bootstep when the consumer starts:

for name, task in app.tasks.items():
    strategies[name] = task.start_strategy(app, consumer)
    task.__trace__ = celery.app.trace.build_tracer(
        name, task, loader, hostname
    )

Your consumer bootstep must require the Tasks bootstep to use this:

class Step(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Heart', )
task_buckets

A defaultdict used to lookup the rate limit for a task by type. Entries in this dict may be None (for no limit) or a TokenBucket instance implementing consume(tokens) and expected_time(tokens).

TokenBucket implements the token bucket algorithm, but any algorithm may be used as long as it conforms to the same interface and defines the two methods above.

qos

The QoS object can be used to change the task channels current prefetch_count value, e.g:

# increment at next cycle
consumer.qos.increment_eventually(1)
# decrement at next cycle
consumer.qos.decrement_eventually(1)
consumer.qos.set(10)
Methods
consumer.reset_rate_limits()

Updates the task_buckets mapping for all registered task types.

consumer.bucket_for_task(type, Bucket=TokenBucket)

Creates rate limit bucket for a task using its task.rate_limit attribute.

consumer.add_task_queue(name, exchange=None, exchange_type=None,
routing_key=None, **options):

Adds new queue to consume from. This will persist on connection restart.

consumer.cancel_task_queue(name)

Stop consuming from queue by name. This will persist on connection restart.

apply_eta_task(request)

Schedule eta task to execute based on the request.eta attribute. (Request)

Installing Bootsteps

app.steps['worker'] and app.steps['consumer'] can be modified to add new bootsteps:

>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep)  # < add class, do not instantiate
>>> app.steps['consumer'].add(MyConsumerStep)

>>> app.steps['consumer'].update([StepA, StepB])

>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}

The order of steps is not important here as the order is decided by the resulting dependency graph (Step.requires).

To illustrate how you can install bootsteps and how they work, this is an example step that prints some useless debugging information. It can be added both as a worker and consumer bootstep:

from celery import Celery
from celery import bootsteps

class InfoStep(bootsteps.Step):

    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults and so on.
        print('{0!r} is in init'.format(parent))

    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))

    def stop(self, parent):
        # the Consumer calls stop every time the consumer is restarted
        # (i.e. connection is lost) and also at shutdown.  The Worker
        # will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))

    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))

    app = Celery(broker='amqp://')
    app.steps['worker'].add(InfoStep)
    app.steps['consumer'].add(InfoStep)

Starting the worker with this step installed will give us the following logs:

<Worker: w@example.com (initializing)> is in init
<Consumer: w@example.com (initializing)> is in init
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <Worker: w@example.com (running)> is starting
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <Consumer: w@example.com (running)> is starting
<Consumer: w@example.com (closing)> is stopping
<Worker: w@example.com (closing)> is stopping
<Consumer: w@example.com (terminating)> is shutting down

The print statements will be redirected to the logging subsystem after the worker has been initialized, so the “is starting” lines are timestamped. You may notice that this does no longer happen at shutdown, this is because the stop and shutdown methods are called inside a signal handler, and it’s not safe to use logging inside such a handler. Logging with the Python logging module is not reentrant, which means that you cannot interrupt the function and call it again later. It’s important that the stop and shutdown methods you write is also reentrant.

Starting the worker with --loglevel=debug will show us more information about the boot process:

[2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at 0x101ad8410> is in init
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
    {Hub, Queues (intra), Pool, Autoreloader, Timer, StateDB,
     Autoscaler, InfoStep, Beat, Consumer}
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
[2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
    {Connection, Mingle, Events, Gossip, InfoStep, Agent,
     Heart, Control, Tasks, event loop}
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
[2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
[2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <celery.apps.worker.Worker object at 0x101ad8410> is starting
[2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
[2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
[2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
[2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
[2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
[2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
[2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
[2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
[2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
[2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
[2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
[2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
[2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
[2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.
Command-line programs
Adding new command-line options
Command-specific options

You can add additional command-line options to the worker, beat and events commands by modifying the user_options attribute of the application instance.

Celery commands uses the optparse module to parse command-line arguments, and so you have to use optparse specific option instances created using optparse.make_option(). Please see the optparse documentation to read about the fields supported.

Example adding a custom option to the celery worker command:

from celery import Celery
from celery.bin import Option  # <-- alias to optparse.make_option

app = Celery(broker='amqp://')

app.user_options['worker'].add(
    Option('--enable-my-option', action='store_true', default=False,
           help='Enable custom option.'),
)

All bootsteps will now receive this argument as a keyword argument to Bootstep.__init__:

from celery import bootsteps

class MyBootstep(bootsteps.Step):

    def __init__(self, worker, enable_my_option=False, **options):
        if enable_my_option:
            party()

app.steps['worker'].add(MyBootstep)
Preload options

The celery umbrella command supports the concept of ‘preload options’, which are special options passed to all subcommands and parsed outside of the main parsing step.

The list of default preload options can be found in the API reference: celery.bin.base.

You can add new preload options too, e.g. to specify a configuration template:

from celery import Celery
from celery import signals
from celery.bin import Option

app = Celery()
app.user_options['preload'].add(
    Option('-Z', '--template', default='default',
           help='Configuration template to use.'),
)

@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
    use_template(options['template'])
Adding new celery sub-commands

New commands can be added to the celery umbrella command by using setuptools entry-points.

Entry-points is special metadata that can be added to your packages setup.py program, and then after installation, read from the system using the pkg_resources module.

Celery recognizes celery.commands entry-points to install additional subcommands, where the value of the entry-point must point to a valid subclass of celery.bin.base.Command. There is limited documentation, unfortunately, but you can find inspiration from the various commands in the celery.bin package.

This is how the Flower monitoring extension adds the celery flower command, by adding an entry-point in setup.py:

setup(
    name='flower',
    entry_points={
        'celery.commands': [
           'flower = flower.command.FlowerCommand',
        ],
    }
)

The command definition is in two parts separated by the equal sign, where the first part is the name of the subcommand (flower), then the fully qualified module path to the class that implements the command (flower.command.FlowerCommand).

In the module flower/command.py, the command class is defined something like this:

from celery.bin.base import Command, Option


class FlowerCommand(Command):

    def get_options(self):
        return (
            Option('--port', default=8888, type='int',
                help='Webserver port',
            ),
            Option('--debug', action='store_true'),
        )

    def run(self, port=None, debug=False, **kwargs):
        print('Running our command')
Worker API
Hub - The workers async event loop.
supported transports:
 amqp, redis

New in version 3.0.

The worker uses asynchronous I/O when the amqp or redis broker transports are used. The eventual goal is for all transports to use the eventloop, but that will take some time so other transports still use a threading-based solution.

hub.add(fd, callback, flags)
hub.add_reader(fd, callback, *args)

Add callback to be called when fd is readable.

The callback will stay registered until explictly removed using hub.remove(fd), or the fd is automatically discarded because it’s no longer valid.

Note that only one callback can be registered for any given fd at a time, so calling add a second time will remove any callback that was previously registered for that fd.

A file descriptor is any file-like object that supports the fileno method, or it can be the file descriptor number (int).

hub.add_writer(fd, callback, *args)

Add callback to be called when fd is writable. See also notes for hub.add_reader() above.

hub.remove(fd)

Remove all callbacks for fd from the loop.

Timer - Scheduling events
timer.call_after(secs, callback, args=(), kwargs=(),
priority=0)
timer.call_repeatedly(secs, callback, args=(), kwargs=(),
priority=0)
timer.call_at(eta, callback, args=(), kwargs=(),
priority=0)

Configuration and defaults

This document describes the configuration options available.

If you’re using the default loader, you must create the celeryconfig.py module and make sure it is available on the Python path.

Example configuration file

This is an example configuration file to get you started. It should contain all you need to run a basic Celery set-up.

## Broker settings.
BROKER_URL = 'amqp://guest:guest@localhost:5672//'

# List of modules to import when celery starts.
CELERY_IMPORTS = ('myapp.tasks', )

## Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'db+sqlite:///results.db'

CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}

Configuration Directives

Time and date settings
CELERY_ENABLE_UTC

New in version 2.5.

If enabled dates and times in messages will be converted to use the UTC timezone.

Note that workers running Celery versions below 2.5 will assume a local timezone for all messages, so only enable if all workers have been upgraded.

Enabled by default since version 3.0.

CELERY_TIMEZONE

Configure Celery to use a custom time zone. The timezone value can be any time zone supported by the pytz library.

If not set the UTC timezone is used. For backwards compatibility there is also a CELERY_ENABLE_UTC setting, and this is set to false the system local timezone is used instead.

Task settings
CELERY_ANNOTATIONS

This setting can be used to rewrite any task attribute from the configuration. The setting can be a dict, or a list of annotation objects that filter for tasks and return a map of attributes to change.

This will change the rate_limit attribute for the tasks.add task:

CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}

or change the same for all tasks:

CELERY_ANNOTATIONS = {'*': {'rate_limit': '10/s'}}

You can change methods too, for example the on_failure handler:

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
    print('Oh no! Task failed: {0!r}'.format(exc))

CELERY_ANNOTATIONS = {'*': {'on_failure': my_on_failure}}

If you need more flexibility then you can use objects instead of a dict to choose which tasks to annotate:

class MyAnnotate(object):

    def annotate(self, task):
        if task.name.startswith('tasks.'):
            return {'rate_limit': '10/s'}

CELERY_ANNOTATIONS = (MyAnnotate(), {…})
Concurrency settings
CELERYD_CONCURRENCY

The number of concurrent worker processes/threads/green threads executing tasks.

If you’re doing mostly I/O you can have more processes, but if mostly CPU-bound, try to keep it close to the number of CPUs on your machine. If not set, the number of CPUs/cores on the host will be used.

Defaults to the number of available CPUs.

CELERYD_PREFETCH_MULTIPLIER

How many messages to prefetch at a time multiplied by the number of concurrent processes. The default is 4 (four messages for each process). The default setting is usually a good choice, however – if you have very long running tasks waiting in the queue and you have to start the workers, note that the first worker to start will receive four times the number of messages initially. Thus the tasks may not be fairly distributed to the workers.

To disable prefetching, set CELERYD_PREFETCH_MULTIPLIER to 1. Setting CELERYD_PREFETCH_MULTIPLIER to 0 will allow the worker to keep consuming as many messages as it wants.

For more on prefetching, read Prefetch Limits

Note

Tasks with ETA/countdown are not affected by prefetch limits.

Task result backend settings
CELERY_RESULT_BACKEND
Deprecated aliases:
 CELERY_BACKEND

The backend used to store task results (tombstones). Disabled by default. Can be one of the following:

CELERY_RESULT_SERIALIZER

Result serialization format. Default is pickle. See Serializers for information about supported serialization formats.

Database backend settings
Database URL Examples

To use the database backend you have to configure the CELERY_RESULT_BACKEND setting with a connection URL and the db+ prefix:

CELERY_RESULT_BACKEND = 'db+scheme://user:password@host:port/dbname'

Examples:

# sqlite (filename)
CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

# mysql
CELERY_RESULT_BACKEND = 'db+mysql://scott:tiger@localhost/foo'

# postgresql
CELERY_RESULT_BACKEND = 'db+postgresql://scott:tiger@localhost/mydatabase'

# oracle
CELERY_RESULT_BACKEND = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'

Please see Supported Databases for a table of supported databases, and Connection String for more information about connection strings (which is the part of the URI that comes after the db+ prefix).

CELERY_RESULT_DBURI

This setting is no longer used as it’s now possible to specify the database URL directly in the CELERY_RESULT_BACKEND setting.

CELERY_RESULT_ENGINE_OPTIONS

To specify additional SQLAlchemy database engine options you can use the CELERY_RESULT_ENGINE_OPTIONS setting:

# echo enables verbose logging from SQLAlchemy.
CELERY_RESULT_ENGINE_OPTIONS = {'echo': True}
Short lived sessions
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True

Short lived sessions are disabled by default. If enabled they can drastically reduce performance, especially on systems processing lots of tasks. This option is useful on low-traffic workers that experience errors as a result of cached database connections going stale through inactivity. For example, intermittent errors like (OperationalError) (2006, ‘MySQL server has gone away’) can be fixed by enabling short lived sessions. This option only affects the database backend.

Specifying Table Names

When SQLAlchemy is configured as the result backend, Celery automatically creates two tables to store result metadata for tasks. This setting allows you to customize the table names:

# use custom table names for the database result backend.
CELERY_RESULT_DB_TABLENAMES = {
    'task': 'myapp_taskmeta',
    'group': 'myapp_groupmeta',
}
RPC backend settings
CELERY_RESULT_PERSISTENT

If set to True, result messages will be persistent. This means the messages will not be lost after a broker restart. The default is for the results to be transient.

Example configuration
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False
Cache backend settings

Note

The cache backend supports the pylibmc and python-memcached libraries. The latter is used only if pylibmc is not installed.

Using a single memcached server:

CELERY_RESULT_BACKEND = 'cache+memcached://127.0.0.1:11211/'

Using multiple memcached servers:

CELERY_RESULT_BACKEND = """
    cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()

The “memory” backend stores the cache in memory only:

CELERY_RESULT_BACKEND = 'cache'
CELERY_CACHE_BACKEND = 'memory'
CELERY_CACHE_BACKEND_OPTIONS

You can set pylibmc options using the CELERY_CACHE_BACKEND_OPTIONS setting:

CELERY_CACHE_BACKEND_OPTIONS = {'binary': True,
                                'behaviors': {'tcp_nodelay': True}}
CELERY_CACHE_BACKEND

This setting is no longer used as it’s now possible to specify the cache backend directly in the CELERY_RESULT_BACKEND setting.

Redis backend settings
Configuring the backend URL

Note

The Redis backend requires the redis library: http://pypi.python.org/pypi/redis/

To install the redis package use pip or easy_install:

$ pip install redis

This backend requires the CELERY_RESULT_BACKEND setting to be set to a Redis URL:

CELERY_RESULT_BACKEND = 'redis://:password@host:port/db'

For example:

CELERY_RESULT_BACKEND = 'redis://localhost/0'

which is the same as:

CELERY_RESULT_BACKEND = 'redis://'

The fields of the URL is defined as folows:

  • host

Host name or IP address of the Redis server. e.g. localhost.

  • port

Port to the Redis server. Default is 6379.

  • db

Database number to use. Default is 0. The db can include an optional leading slash.

  • password

Password used to connect to the database.

CELERY_REDIS_MAX_CONNECTIONS

Maximum number of connections available in the Redis connection pool used for sending and retrieving results.

MongoDB backend settings

Note

The MongoDB backend requires the pymongo library: http://github.com/mongodb/mongo-python-driver/tree/master

CELERY_MONGODB_BACKEND_SETTINGS

This is a dict supporting the following keys:

  • database

    The database name to connect to. Defaults to celery.

  • taskmeta_collection

    The collection name to store task meta data. Defaults to celery_taskmeta.

  • max_pool_size

    Passed as max_pool_size to PyMongo’s Connection or MongoClient constructor. It is the maximum number of TCP connections to keep open to MongoDB at a given time. If there are more open connections than max_pool_size, sockets will be closed when they are released. Defaults to 10.

  • options

    Additional keyword arguments to pass to the mongodb connection constructor. See the pymongo docs to see a list of arguments supported.

Example configuration
CELERY_RESULT_BACKEND = 'mongodb://192.168.1.100:30000/'
CELERY_MONGODB_BACKEND_SETTINGS = {
    'database': 'mydb',
    'taskmeta_collection': 'my_taskmeta_collection',
}
Cassandra backend settings

Note

The Cassandra backend requires the pycassa library: http://pypi.python.org/pypi/pycassa/

To install the pycassa package use pip or easy_install:

$ pip install pycassa

This backend requires the following configuration directives to be set.

CASSANDRA_SERVERS

List of host:port Cassandra servers. e.g.:

CASSANDRA_SERVERS = ['localhost:9160']
CASSANDRA_KEYSPACE

The keyspace in which to store the results. e.g.:

CASSANDRA_KEYSPACE = 'tasks_keyspace'
CASSANDRA_COLUMN_FAMILY

The column family in which to store the results. e.g.:

CASSANDRA_COLUMN_FAMILY = 'tasks'
CASSANDRA_READ_CONSISTENCY

The read consistency used. Values can be ONE, QUORUM or ALL.

CASSANDRA_WRITE_CONSISTENCY

The write consistency used. Values can be ONE, QUORUM or ALL.

CASSANDRA_DETAILED_MODE

Enable or disable detailed mode. Default is False. This mode allows to use the power of Cassandra wide columns to store all states for a task as a wide column, instead of only the last one.

To use this mode, you need to configure your ColumnFamily to use the TimeUUID type as a comparator:

create column family task_results with comparator = TimeUUIDType;
CASSANDRA_OPTIONS

Options to be passed to the pycassa connection pool (optional).

Example configuration
CASSANDRA_SERVERS = ['localhost:9160']
CASSANDRA_KEYSPACE = 'celery'
CASSANDRA_COLUMN_FAMILY = 'task_results'
CASSANDRA_READ_CONSISTENCY = 'ONE'
CASSANDRA_WRITE_CONSISTENCY = 'ONE'
CASSANDRA_DETAILED_MODE = True
CASSANDRA_OPTIONS = {
    'timeout': 300,
    'max_retries': 10
}
IronCache backend settings

Note

The IronCache backend requires the iron_celery library: http://pypi.python.org/pypi/iron_celery

To install the iron_celery package use pip or easy_install:

$ pip install iron_celery

IronCache is configured via the URL provided in CELERY_RESULT_BACKEND, for example:

CELERY_RESULT_BACKEND = 'ironcache://project_id:token@'

Or to change the cache name:

ironcache:://project_id:token@/awesomecache

For more information, see: https://github.com/iron-io/iron_celery

Couchbase backend settings

Note

The Couchbase backend requires the couchbase library: https://pypi.python.org/pypi/couchbase

To install the couchbase package use pip or easy_install:

$ pip install couchbase

This backend can be configured via the CELERY_RESULT_BACKEND set to a couchbase URL:

CELERY_RESULT_BACKEND = 'couchbase://username:password@host:port/bucket'
CELERY_COUCHBASE_BACKEND_SETTINGS

This is a dict supporting the following keys:

  • host

    Host name of the Couchbase server. Defaults to localhost.

  • port

    The port the Couchbase server is listening to. Defaults to 8091.

  • bucket

    The default bucket the Couchbase server is writing to. Defaults to default.

  • username

    User name to authenticate to the Couchbase server as (optional).

  • password

    Password to authenticate to the Couchbase server (optional).

AMQP backend settings

Do not use in production.

This is the old AMQP result backend that creates one queue per task, if you want to send results back as message please consider using the RPC backend instead, or if you need the results to be persistent use a result backend designed for that purpose (e.g. Redis, or a database).

Note

The AMQP backend requires RabbitMQ 1.1.0 or higher to automatically expire results. If you are running an older version of RabbitMQ you should disable result expiration like this:

CELERY_TASK_RESULT_EXPIRES = None
CELERY_RESULT_EXCHANGE

Name of the exchange to publish results in. Default is celeryresults.

CELERY_RESULT_EXCHANGE_TYPE

The exchange type of the result exchange. Default is to use a direct exchange.

CELERY_RESULT_PERSISTENT

If set to True, result messages will be persistent. This means the messages will not be lost after a broker restart. The default is for the results to be transient.

Example configuration
CELERY_RESULT_BACKEND = 'amqp'
CELERY_TASK_RESULT_EXPIRES = 18000  # 5 hours.
Message Routing
CELERY_QUEUES

Most users will not want to specify this setting and should rather use the automatic routing facilities.

If you really want to configure advanced routing, this setting should be a list of kombu.Queue objects the worker will consume from.

Note that workers can be overriden this setting via the -Q option, or individual queues from this list (by name) can be excluded using the -X option.

Also see Basics for more information.

The default is a queue/exchange/binding key of celery, with exchange type direct.

See also CELERY_ROUTES

CELERY_ROUTES

A list of routers, or a single router used to route tasks to queues. When deciding the final destination of a task the routers are consulted in order.

A router can be specified as either:

  • A router class instances
  • A string which provides the path to a router class
  • A dict containing router specification. It will be converted to a celery.routes.MapRoute instance.

Examples:

CELERY_ROUTES = {"celery.ping": "default",
                 "mytasks.add": "cpu-bound",
                 "video.encode": {
                     "queue": "video",
                     "exchange": "media"
                     "routing_key": "media.video.encode"}}

CELERY_ROUTES = ("myapp.tasks.Router", {"celery.ping": "default})

Where myapp.tasks.Router could be:

class Router(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == "celery.ping":
            return "default"

route_for_task may return a string or a dict. A string then means it’s a queue name in CELERY_QUEUES, a dict means it’s a custom route.

When sending tasks, the routers are consulted in order. The first router that doesn’t return None is the route to use. The message options is then merged with the found route settings, where the routers settings have priority.

Example if apply_async() has these arguments:

Task.apply_async(immediate=False, exchange="video",
                 routing_key="video.compress")

and a router returns:

{"immediate": True, "exchange": "urgent"}

the final message options will be:

immediate=True, exchange="urgent", routing_key="video.compress"

(and any default message options defined in the Task class)

Values defined in CELERY_ROUTES have precedence over values defined in CELERY_QUEUES when merging the two.

With the follow settings:

CELERY_QUEUES = {"cpubound": {"exchange": "cpubound",
                              "routing_key": "cpubound"}}

CELERY_ROUTES = {"tasks.add": {"queue": "cpubound",
                               "routing_key": "tasks.add",
                               "serializer": "json"}}

The final routing options for tasks.add will become:

{"exchange": "cpubound",
 "routing_key": "tasks.add",
 "serializer": "json"}

See Routers for more examples.

CELERY_QUEUE_HA_POLICY
brokers:RabbitMQ

This will set the default HA policy for a queue, and the value can either be a string (usually all):

CELERY_QUEUE_HA_POLICY = 'all'

Using ‘all’ will replicate the queue to all current nodes, Or you can give it a list of nodes to replicate to:

CELERY_QUEUE_HA_POLICY = ['rabbit@host1', 'rabbit@host2']

Using a list will implicitly set x-ha-policy to ‘nodes’ and x-ha-policy-params to the given list of nodes.

See http://www.rabbitmq.com/ha.html for more information.

CELERY_WORKER_DIRECT

This option enables so that every worker has a dedicated queue, so that tasks can be routed to specific workers.

The queue name for each worker is automatically generated based on the worker hostname and a .dq suffix, using the C.dq exchange.

For example the queue name for the worker with node name w1@example.com becomes:

w1@example.com.dq

Then you can route the task to the task by specifying the hostname as the routing key and the C.dq exchange:

CELERY_ROUTES = {
    'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'}
}
CELERY_CREATE_MISSING_QUEUES

If enabled (default), any queues specified that are not defined in CELERY_QUEUES will be automatically created. See Automatic routing.

CELERY_DEFAULT_QUEUE

The name of the default queue used by .apply_async if the message has no route or no custom queue has been specified.

This queue must be listed in CELERY_QUEUES. If CELERY_QUEUES is not specified then it is automatically created containing one queue entry, where this name is used as the name of that queue.

The default is: celery.

CELERY_DEFAULT_EXCHANGE

Name of the default exchange to use when no custom exchange is specified for a key in the CELERY_QUEUES setting.

The default is: celery.

CELERY_DEFAULT_EXCHANGE_TYPE

Default exchange type used when no custom exchange type is specified for a key in the CELERY_QUEUES setting. The default is: direct.

CELERY_DEFAULT_ROUTING_KEY

The default routing key used when no custom routing key is specified for a key in the CELERY_QUEUES setting.

The default is: celery.

CELERY_DEFAULT_DELIVERY_MODE

Can be transient or persistent. The default is to send persistent messages.

Broker Settings
CELERY_ACCEPT_CONTENT

A whitelist of content-types/serializers to allow.

If a message is received that is not in this list then the message will be discarded with an error.

By default any content type is enabled (including pickle and yaml) so make sure untrusted parties do not have access to your broker. See Security for more.

Example:

# using serializer name
CELERY_ACCEPT_CONTENT = ['json']

# or the actual content-type (MIME)
CELERY_ACCEPT_CONTENT = ['application/json']
BROKER_FAILOVER_STRATEGY

Default failover strategy for the broker Connection object. If supplied, may map to a key in ‘kombu.connection.failover_strategies’, or be a reference to any method that yields a single item from a supplied list.

Example:

# Random failover strategy
def random_failover_strategy(servers):
    it = list(it)  # don't modify callers list
    shuffle = random.shuffle
    for _ in repeat(None):
        shuffle(it)
        yield it[0]

BROKER_FAILOVER_STRATEGY=random_failover_strategy
BROKER_TRANSPORT
Aliases:BROKER_BACKEND
Deprecated aliases:
 CARROT_BACKEND
BROKER_URL

Default broker URL. This must be an URL in the form of:

transport://userid:password@hostname:port/virtual_host

Only the scheme part (transport://) is required, the rest is optional, and defaults to the specific transports default values.

The transport part is the broker implementation to use, and the default is amqp, which uses librabbitmq by default or falls back to pyamqp if that is not installed. Also there are many other choices including redis, beanstalk, sqlalchemy, django, mongodb, couchdb. It can also be a fully qualified path to your own transport implementation.

More than broker URL, of the same transport, can also be specified. The broker URLs can be passed in as a single string that is semicolon delimited:

BROKER_URL = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'

Or as a list:

BROKER_URL = [
    'transport://userid:password@localhost:port//',
    'transport://userid:password@hostname:port//'
]

The brokers will then be used in the BROKER_FAILOVER_STRATEGY.

See URLs in the Kombu documentation for more information.

BROKER_HEARTBEAT
transports supported:
 pyamqp

It’s not always possible to detect connection loss in a timely manner using TCP/IP alone, so AMQP defines something called heartbeats that’s is used both by the client and the broker to detect if a connection was closed.

Heartbeats are disabled by default.

If the heartbeat value is 10 seconds, then the heartbeat will be monitored at the interval specified by the BROKER_HEARTBEAT_CHECKRATE setting, which by default is double the rate of the heartbeat value (so for the default 10 seconds, the heartbeat is checked every 5 seconds).

BROKER_HEARTBEAT_CHECKRATE
transports supported:
 pyamqp

At intervals the worker will monitor that the broker has not missed too many heartbeats. The rate at which this is checked is calculated by dividing the BROKER_HEARTBEAT value with this value, so if the heartbeat is 10.0 and the rate is the default 2.0, the check will be performed every 5 seconds (twice the heartbeat sending rate).

BROKER_USE_SSL
transports supported:
 pyamqp

Toggles SSL usage on broker connection and SSL settings.

If True the connection will use SSL with default SSL settings. If set to a dict, will configure SSL connection according to the specified policy. The format used is python ssl.wrap_socket() options.

Default is False (no SSL).

Note that SSL socket is generally served on a separate port by the broker.

Example providing a client cert and validating the server cert against a custom certificate authority:

import ssl

BROKER_USE_SSL = {
  'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}

Warning

Be careful using BROKER_USE_SSL=True. It is possible that your default configuration will not validate the server cert at all. Please read Python ssl module security considerations.

BROKER_POOL_LIMIT

New in version 2.3.

The maximum number of connections that can be open in the connection pool.

The pool is enabled by default since version 2.5, with a default limit of ten connections. This number can be tweaked depending on the number of threads/greenthreads (eventlet/gevent) using a connection. For example running eventlet with 1000 greenlets that use a connection to the broker, contention can arise and you should consider increasing the limit.

If set to None or 0 the connection pool will be disabled and connections will be established and closed for every use.

Default (since 2.5) is to use a pool of 10 connections.

BROKER_CONNECTION_TIMEOUT

The default timeout in seconds before we give up establishing a connection to the AMQP server. Default is 4 seconds.

BROKER_CONNECTION_RETRY

Automatically try to re-establish the connection to the AMQP broker if lost.

The time between retries is increased for each retry, and is not exhausted before BROKER_CONNECTION_MAX_RETRIES is exceeded.

This behavior is on by default.

BROKER_CONNECTION_MAX_RETRIES

Maximum number of retries before we give up re-establishing a connection to the AMQP broker.

If this is set to 0 or None, we will retry forever.

Default is 100 retries.

BROKER_LOGIN_METHOD

Set custom amqp login method, default is AMQPLAIN.

BROKER_TRANSPORT_OPTIONS

New in version 2.2.

A dict of additional options passed to the underlying transport.

See your transport user manual for supported options (if any).

Example setting the visibility timeout (supported by Redis and SQS transports):

BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000}  # 5 hours
Task execution settings
CELERY_ALWAYS_EAGER

If this is True, all tasks will be executed locally by blocking until the task returns. apply_async() and Task.delay() will return an EagerResult instance, which emulates the API and behavior of AsyncResult, except the result is already evaluated.

That is, tasks will be executed locally instead of being sent to the queue.

CELERY_EAGER_PROPAGATES_EXCEPTIONS

If this is True, eagerly executed tasks (applied by task.apply(), or when the CELERY_ALWAYS_EAGER setting is enabled), will propagate exceptions.

It’s the same as always running apply() with throw=True.

CELERY_IGNORE_RESULT

Whether to store the task return values or not (tombstones). If you still want to store errors, just not successful return values, you can set CELERY_STORE_ERRORS_EVEN_IF_IGNORED.

CELERY_MESSAGE_COMPRESSION

Default compression used for task messages. Can be gzip, bzip2 (if available), or any custom compression schemes registered in the Kombu compression registry.

The default is to send uncompressed messages.

CELERY_TASK_RESULT_EXPIRES

Time (in seconds, or a timedelta object) for when after stored task tombstones will be deleted.

A built-in periodic task will delete the results after this time (celery.task.backend_cleanup).

A value of None or 0 means results will never expire (depending on backend specifications).

Default is to expire after 1 day.

Note

For the moment this only works with the amqp, database, cache, redis and MongoDB backends.

When using the database or MongoDB backends, celery beat must be running for the results to be expired.

CELERY_MAX_CACHED_RESULTS

Result backends caches ready results used by the client.

This is the total number of results to cache before older results are evicted. The default is 5000. 0 or None means no limit, and a value of -1 will disable the cache.

CELERY_CHORD_PROPAGATES

New in version 3.0.14.

This setting defines what happens when a task part of a chord raises an exception:

  • If propagate is True the chord callback will change state to FAILURE with the exception value set to a ChordError instance containing information about the error and the task that failed.

    This is the default behavior in Celery 3.1+

  • If propagate is False the exception value will instead be forwarded to the chord callback.

    This was the default behavior before version 3.1.

CELERY_TRACK_STARTED

If True the task will report its status as “started” when the task is executed by a worker. The default value is False as the normal behaviour is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried. Having a “started” state can be useful for when there are long running tasks and there is a need to report which task is currently running.

CELERY_TASK_SERIALIZER

A string identifying the default serialization method to use. Can be pickle (default), json, yaml, msgpack or any custom serialization methods that have been registered with kombu.serialization.registry.

See also

Serializers.

CELERY_TASK_PUBLISH_RETRY

New in version 2.2.

Decides if publishing task messages will be retried in the case of connection loss or other connection errors. See also CELERY_TASK_PUBLISH_RETRY_POLICY.

Enabled by default.

CELERY_TASK_PUBLISH_RETRY_POLICY

New in version 2.2.

Defines the default policy when retrying publishing a task message in the case of connection loss or other connection errors.

See Message Sending Retry for more information.

CELERY_DEFAULT_RATE_LIMIT

The global default rate limit for tasks.

This value is used for tasks that does not have a custom rate limit The default is no rate limit.

CELERY_DISABLE_RATE_LIMITS

Disable all rate limits, even if tasks has explicit rate limits set.

CELERY_ACKS_LATE

Late ack means the task messages will be acknowledged after the task has been executed, not just before, which is the default behavior.

Worker
CELERY_IMPORTS

A sequence of modules to import when the worker starts.

This is used to specify the task modules to import, but also to import signal handlers and additional remote control commands, etc.

The modules will be imported in the original order.

CELERY_INCLUDE

Exact same semantics as CELERY_IMPORTS, but can be used as a means to have different import categories.

The modules in this setting are imported after the modules in CELERY_IMPORTS.

CELERYD_WORKER_LOST_WAIT

In some cases a worker may be killed without proper cleanup, and the worker may have published a result before terminating. This value specifies how long we wait for any missing results before raising a WorkerLostError exception.

Default is 10.0

CELERYD_MAX_TASKS_PER_CHILD

Maximum number of tasks a pool worker process can execute before it’s replaced with a new one. Default is no limit.

CELERYD_TASK_TIME_LIMIT

Task hard time limit in seconds. The worker processing the task will be killed and replaced with a new one when this is exceeded.

CELERYD_TASK_SOFT_TIME_LIMIT

Task soft time limit in seconds.

The SoftTimeLimitExceeded exception will be raised when this is exceeded. The task can catch this to e.g. clean up before the hard time limit comes.

Example:

from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()
CELERY_STORE_ERRORS_EVEN_IF_IGNORED

If set, the worker stores all task errors in the result store even if Task.ignore_result is on.

CELERYD_STATE_DB

Name of the file used to stores persistent worker state (like revoked tasks). Can be a relative or absolute path, but be aware that the suffix .db may be appended to the file name (depending on Python version).

Can also be set via the --statedb argument to worker.

Not enabled by default.

CELERYD_TIMER_PRECISION

Set the maximum time in seconds that the ETA scheduler can sleep between rechecking the schedule. Default is 1 second.

Setting this value to 1 second means the schedulers precision will be 1 second. If you need near millisecond precision you can set this to 0.1.

CELERY_ENABLE_REMOTE_CONTROL

Specify if remote control of the workers is enabled.

Default is True.

Error E-Mails
CELERY_SEND_TASK_ERROR_EMAILS

The default value for the Task.send_error_emails attribute, which if set to True means errors occurring during task execution will be sent to ADMINS by email.

Disabled by default.

ADMINS

List of (name, email_address) tuples for the administrators that should receive error emails.

SERVER_EMAIL

The email address this worker sends emails from. Default is celery@localhost.

EMAIL_HOST

The mail server to use. Default is localhost.

EMAIL_HOST_USER

User name (if required) to log on to the mail server with.

EMAIL_HOST_PASSWORD

Password (if required) to log on to the mail server with.

EMAIL_PORT

The port the mail server is listening on. Default is 25.

EMAIL_USE_SSL

Use SSL when connecting to the SMTP server. Disabled by default.

EMAIL_USE_TLS

Use TLS when connecting to the SMTP server. Disabled by default.

EMAIL_TIMEOUT

Timeout in seconds for when we give up trying to connect to the SMTP server when sending emails.

The default is 2 seconds.

Example E-Mail configuration

This configuration enables the sending of error emails to george@vandelay.com and kramer@vandelay.com:

# Enables error emails.
CELERY_SEND_TASK_ERROR_EMAILS = True

# Name and email addresses of recipients
ADMINS = (
    ('George Costanza', 'george@vandelay.com'),
    ('Cosmo Kramer', 'kosmo@vandelay.com'),
)

# Email address used as sender (From field).
SERVER_EMAIL = 'no-reply@vandelay.com'

# Mailserver configuration
EMAIL_HOST = 'mail.vandelay.com'
EMAIL_PORT = 25
# EMAIL_HOST_USER = 'servers'
# EMAIL_HOST_PASSWORD = 's3cr3t'
Events
CELERY_SEND_EVENTS

Send events so the worker can be monitored by tools like celerymon.

CELERY_SEND_TASK_SENT_EVENT

New in version 2.2.

If enabled, a task-sent event will be sent for every task so tasks can be tracked before they are consumed by a worker.

Disabled by default.

CELERY_EVENT_QUEUE_TTL
transports supported:
 amqp

Message expiry time in seconds (int/float) for when messages sent to a monitor clients event queue is deleted (x-message-ttl)

For example, if this value is set to 10 then a message delivered to this queue will be deleted after 10 seconds.

Disabled by default.

CELERY_EVENT_QUEUE_EXPIRES
transports supported:
 amqp

Expiry time in seconds (int/float) for when a monitor clients event queue will be deleted (x-expires).

Default is never, relying on the queue autodelete setting.

CELERY_EVENT_SERIALIZER

Message serialization format used when sending event messages. Default is json. See Serializers.

Broadcast Commands
CELERY_BROADCAST_QUEUE

Name prefix for the queue used when listening for broadcast messages. The workers host name will be appended to the prefix to create the final queue name.

Default is celeryctl.

CELERY_BROADCAST_EXCHANGE

Name of the exchange used for broadcast messages.

Default is celeryctl.

CELERY_BROADCAST_EXCHANGE_TYPE

Exchange type used for broadcast messages. Default is fanout.

Logging
CELERYD_HIJACK_ROOT_LOGGER

New in version 2.2.

By default any previously configured handlers on the root logger will be removed. If you want to customize your own logging handlers, then you can disable this behavior by setting CELERYD_HIJACK_ROOT_LOGGER = False.

Note

Logging can also be customized by connecting to the celery.signals.setup_logging signal.

CELERYD_LOG_COLOR

Enables/disables colors in logging output by the Celery apps.

By default colors are enabled if

  1. the app is logging to a real terminal, and not a file.
  2. the app is not running on Windows.
CELERYD_LOG_FORMAT

The format to use for log messages.

Default is [%(asctime)s: %(levelname)s/%(processName)s] %(message)s

See the Python logging module for more information about log formats.

CELERYD_TASK_LOG_FORMAT

The format to use for log messages logged in tasks. Can be overridden using the --loglevel option to worker.

Default is:

[%(asctime)s: %(levelname)s/%(processName)s]
    [%(task_name)s(%(task_id)s)] %(message)s

See the Python logging module for more information about log formats.

CELERY_REDIRECT_STDOUTS

If enabled stdout and stderr will be redirected to the current logger.

Enabled by default. Used by celery worker and celery beat.

CELERY_REDIRECT_STDOUTS_LEVEL

The log level output to stdout and stderr is logged as. Can be one of DEBUG, INFO, WARNING, ERROR or CRITICAL.

Default is WARNING.

Security
CELERY_SECURITY_KEY

New in version 2.5.

The relative or absolute path to a file containing the private key used to sign messages when Message Signing is used.

CELERY_SECURITY_CERTIFICATE

New in version 2.5.

The relative or absolute path to an X.509 certificate file used to sign messages when Message Signing is used.

CELERY_SECURITY_CERT_STORE

New in version 2.5.

The directory containing X.509 certificates used for Message Signing. Can be a glob with wildcards, (for example /etc/certs/*.pem).

Custom Component Classes (advanced)
CELERYD_POOL

Name of the pool class used by the worker.

Eventlet/Gevent

Never use this option to select the eventlet or gevent pool. You must use the -P option instead, otherwise the monkey patching will happen too late and things will break in strange and silent ways.

Default is celery.concurrency.prefork:TaskPool.

CELERYD_POOL_RESTARTS

If enabled the worker pool can be restarted using the pool_restart remote control command.

Disabled by default.

CELERYD_AUTOSCALER

New in version 2.2.

Name of the autoscaler class to use.

Default is celery.worker.autoscale:Autoscaler.

CELERYD_AUTORELOADER

Name of the autoreloader class used by the worker to reload Python modules and files that have changed.

Default is: celery.worker.autoreload:Autoreloader.

CELERYD_CONSUMER

Name of the consumer class used by the worker. Default is celery.worker.consumer.Consumer

CELERYD_TIMER

Name of the ETA scheduler class used by the worker. Default is celery.utils.timer2.Timer, or one overrided by the pool implementation.

Periodic Task Server: celery beat
CELERYBEAT_SCHEDULE

The periodic task schedule used by beat. See Entries.

CELERYBEAT_SCHEDULER

The default scheduler class. Default is celery.beat:PersistentScheduler.

Can also be set via the -S argument to beat.

CELERYBEAT_SCHEDULE_FILENAME

Name of the file used by PersistentScheduler to store the last run times of periodic tasks. Can be a relative or absolute path, but be aware that the suffix .db may be appended to the file name (depending on Python version).

Can also be set via the --schedule argument to beat.

CELERYBEAT_SYNC_EVERY

The number of periodic tasks that can be called before another database sync is issued. Defaults to 0 (sync based on timing - default of 3 minutes as determined by scheduler.sync_every). If set to 1, beat will call sync after every task message sent.

CELERYBEAT_MAX_LOOP_INTERVAL

The maximum number of seconds beat can sleep between checking the schedule.

The default for this value is scheduler specific. For the default celery beat scheduler the value is 300 (5 minutes), but for e.g. the django-celery database scheduler it is 5 seconds because the schedule may be changed externally, and so it must take changes to the schedule into account.

Also when running celery beat embedded (-B) on Jython as a thread the max interval is overridden and set to 1 so that it’s possible to shut down in a timely manner.

Monitor Server: celerymon
CELERYMON_LOG_FORMAT

The format to use for log messages.

Default is [%(asctime)s: %(levelname)s/%(processName)s] %(message)s

See the Python logging module for more information about log formats.

Django

Release:3.1
Date:January 23, 2016

First steps with Django

Using Celery with Django

Note

Previous versions of Celery required a separate library to work with Django, but since 3.1 this is no longer the case. Django is supported out of the box now so this document only contains a basic way to integrate Celery and Django. You will use the same API as non-Django users so it’s recommended that you read the First Steps with Celery tutorial first and come back to this tutorial. When you have a working example you can continue to the Next Steps guide.

To use Celery with your Django project you must first define an instance of the Celery library (called an “app”)

If you have a modern Django project layout like:

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
- manage.py

then the recommended way is to create a new proj/proj/celery.py module that defines the Celery instance:

file:proj/proj/celery.py
from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

from django.conf import settings  # noqa

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Then you need to import this app in your proj/proj/__init__.py module. This ensures that the app is loaded when Django starts so that the @shared_task decorator (mentioned later) will use it:

proj/proj/__init__.py:

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app  # noqa

Note that this example project layout is suitable for larger projects, for simple projects you may use a single contained module that defines both the app and tasks, like in the First Steps with Celery tutorial.

Let’s break down what happens in the first module, first we import absolute imports from the future, so that our celery.py module will not clash with the library:

from __future__ import absolute_import

Then we set the default DJANGO_SETTINGS_MODULE for the celery command-line program:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

Specifying the settings here means the celery command line program will know where your Django project is. This statement must always appear before the app instance is created, which is what we do next:

app = Celery('proj')

This is your instance of the library, you can have many instances but there’s probably no reason for that when using Django.

We also add the Django settings module as a configuration source for Celery. This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.

You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object when using Windows or execv:

app.config_from_object('django.conf:settings')

Next, a common practice for reusable apps is to define all tasks in a separate tasks.py module, and Celery does have a way to autodiscover these modules:

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

With the line above Celery will automatically discover tasks in reusable apps if you follow the tasks.py convention:

- app1/
    - app1/tasks.py
    - app1/models.py
- app2/
    - app2/tasks.py
    - app2/models.py

This way you do not have to manually add the individual modules to the CELERY_IMPORTS setting. The lambda so that the autodiscovery can happen only when needed, and so that importing your module will not evaluate the Django settings object.

Finally, the debug_task example is a task that dumps its own request information. This is using the new bind=True task option introduced in Celery 3.1 to easily refer to the current task instance.

Using the @shared_task decorator

The tasks you write will probably live in reusable apps, and reusable apps cannot depend on the project itself, so you also cannot import your app instance directly.

The @shared_task decorator lets you create tasks without having any concrete app instance:

demoapp/tasks.py:

from __future__ import absolute_import

from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

See also

You can find the full source code for the Django example project at: https://github.com/celery/celery/tree/3.1/examples/django/

Using the Django ORM/Cache as a result backend.

If you want to store task results in the Django database then you still need to install the django-celery library for that (alternatively you can use the SQLAlchemy result backend).

The django-celery library implements result backends using the Django ORM and the Django Cache frameworks.

To use this extension in your project you need to follow these four steps:

  1. Install the django-celery library:

    $ pip install django-celery
    
  2. Add djcelery to INSTALLED_APPS.

  3. Create the celery database tables.

    This step will create the tables used to store results when using the database result backend and the tables used by the database periodic task scheduler. You can skip this step if you don’t use these.

    If you are using south for schema migrations, you’ll want to:

    $ python manage.py migrate djcelery
    

    For those who are not using south, a normal syncdb will work:

    $ python manage.py syncdb
    
  4. Configure celery to use the django-celery backend.

    For the database backend you must use:

    app.conf.update(
        CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
    )
    

    For the cache backend you can use:

    app.conf.update(
        CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend',
    )
    

    If you have connected Celery to your Django settings then you can add this directly into your settings module (without the app.conf.update part)

Relative Imports

You have to be consistent in how you import the task module, e.g. if you have project.app in INSTALLED_APPS then you also need to import the tasks from project.app or else the names of the tasks will be different.

See Automatic naming and relative imports

Starting the worker process

In a production environment you will want to run the worker in the background as a daemon - see Running the worker as a daemon - but for testing and development it is useful to be able to start a worker instance by using the celery worker manage command, much as you would use Django’s runserver:

$ celery -A proj worker -l info

For a complete listing of the command-line options available, use the help command:

$ celery help
Where to go from here

If you want to learn more you should continue to the Next Steps tutorial, and after that you can study the User Guide.

Contributing

Welcome!

This document is fairly extensive and you are not really expected to study this in detail for small contributions;

The most important rule is that contributing must be easy and that the community is friendly and not nitpicking on details such as coding style.

If you’re reporting a bug you should read the Reporting bugs section below to ensure that your bug report contains enough information to successfully diagnose the issue, and if you’re contributing code you should try to mimic the conventions you see surrounding the code you are working on, but in the end all patches will be cleaned up by the person merging the changes so don’t worry too much.

Community Code of Conduct

The goal is to maintain a diverse community that is pleasant for everyone. That is why we would greatly appreciate it if everyone contributing to and interacting with the community also followed this Code of Conduct.

The Code of Conduct covers our behavior as members of the community, in any forum, mailing list, wiki, website, Internet relay chat (IRC), public meeting or private correspondence.

The Code of Conduct is heavily based on the Ubuntu Code of Conduct, and the Pylons Code of Conduct.

Be considerate.

Your work will be used by other people, and you in turn will depend on the work of others. Any decision you take will affect users and colleagues, and we expect you to take those consequences into account when making decisions. Even if it’s not obvious at the time, our contributions to Celery will impact the work of others. For example, changes to code, infrastructure, policy, documentation and translations during a release may negatively impact others work.

Be respectful.

The Celery community and its members treat one another with respect. Everyone can make a valuable contribution to Celery. We may not always agree, but disagreement is no excuse for poor behavior and poor manners. We might all experience some frustration now and then, but we cannot allow that frustration to turn into a personal attack. It’s important to remember that a community where people feel uncomfortable or threatened is not a productive one. We expect members of the Celery community to be respectful when dealing with other contributors as well as with people outside the Celery project and with users of Celery.

Be collaborative.

Collaboration is central to Celery and to the larger free software community. We should always be open to collaboration. Your work should be done transparently and patches from Celery should be given back to the community when they are made, not just when the distribution releases. If you wish to work on new code for existing upstream projects, at least keep those projects informed of your ideas and progress. It many not be possible to get consensus from upstream, or even from your colleagues about the correct implementation for an idea, so don’t feel obliged to have that agreement before you begin, but at least keep the outside world informed of your work, and publish your work in a way that allows outsiders to test, discuss and contribute to your efforts.

When you disagree, consult others.

Disagreements, both political and technical, happen all the time and the Celery community is no exception. It is important that we resolve disagreements and differing views constructively and with the help of the community and community process. If you really want to go a different way, then we encourage you to make a derivative distribution or alternate set of packages that still build on the work we’ve done to utilize as common of a core as possible.

When you are unsure, ask for help.

Nobody knows everything, and nobody is expected to be perfect. Asking questions avoids many problems down the road, and so questions are encouraged. Those who are asked questions should be responsive and helpful. However, when asking a question, care must be taken to do so in an appropriate forum.

Step down considerately.

Developers on every project come and go and Celery is no different. When you leave or disengage from the project, in whole or in part, we ask that you do so in a way that minimizes disruption to the project. This means you should tell people you are leaving and take the proper steps to ensure that others can pick up where you leave off.

Reporting Bugs

Security

You must never report security related issues, vulnerabilities or bugs including sensitive information to the bug tracker, or elsewhere in public. Instead sensitive bugs must be sent by email to security@celeryproject.org.

If you’d like to submit the information encrypted our PGP key is:

-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1.4.15 (Darwin)

mQENBFJpWDkBCADFIc9/Fpgse4owLNvsTC7GYfnJL19XO0hnL99sPx+DPbfr+cSE
9wiU+Wp2TfUX7pCLEGrODiEP6ZCZbgtiPgId+JYvMxpP6GXbjiIlHRw1EQNH8RlX
cVxy3rQfVv8PGGiJuyBBjxzvETHW25htVAZ5TI1+CkxmuyyEYqgZN2fNd0wEU19D
+c10G1gSECbCQTCbacLSzdpngAt1Gkrc96r7wGHBBSvDaGDD2pFSkVuTLMbIRrVp
lnKOPMsUijiip2EMr2DvfuXiUIUvaqInTPNWkDynLoh69ib5xC19CSVLONjkKBsr
Pe+qAY29liBatatpXsydY7GIUzyBT3MzgMJlABEBAAG0MUNlbGVyeSBTZWN1cml0
eSBUZWFtIDxzZWN1cml0eUBjZWxlcnlwcm9qZWN0Lm9yZz6JATgEEwECACIFAlJp
WDkCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEOArFOUDCicIw1IH/26f
CViDC7/P13jr+srRdjAsWvQztia9HmTlY8cUnbmkR9w6b6j3F2ayw8VhkyFWgYEJ
wtPBv8mHKADiVSFARS+0yGsfCkia5wDSQuIv6XqRlIrXUyqJbmF4NUFTyCZYoh+C
ZiQpN9xGhFPr5QDlMx2izWg1rvWlG1jY2Es1v/xED3AeCOB1eUGvRe/uJHKjGv7J
rj0pFcptZX+WDF22AN235WYwgJM6TrNfSu8sv8vNAQOVnsKcgsqhuwomSGsOfMQj
LFzIn95MKBBU1G5wOs7JtwiV9jefGqJGBO2FAvOVbvPdK/saSnB+7K36dQcIHqms
5hU4Xj0RIJiod5idlRC5AQ0EUmlYOQEIAJs8OwHMkrdcvy9kk2HBVbdqhgAREMKy
gmphDp7prRL9FqSY/dKpCbG0u82zyJypdb7QiaQ5pfPzPpQcd2dIcohkkh7G3E+e
hS2L9AXHpwR26/PzMBXyr2iNnNc4vTksHvGVDxzFnRpka6vbI/hrrZmYNYh9EAiv
uhE54b3/XhXwFgHjZXb9i8hgJ3nsO0pRwvUAM1bRGMbvf8e9F+kqgV0yWYNnh6QL
4Vpl1+epqp2RKPHyNQftbQyrAHXT9kQF9pPlx013MKYaFTADscuAp4T3dy7xmiwS
crqMbZLzfrxfFOsNxTUGE5vmJCcm+mybAtRo4aV6ACohAO9NevMx8pUAEQEAAYkB
HwQYAQIACQUCUmlYOQIbDAAKCRDgKxTlAwonCNFbB/9esir/f7TufE+isNqErzR/
aZKZo2WzZR9c75kbqo6J6DYuUHe6xI0OZ2qZ60iABDEZAiNXGulysFLCiPdatQ8x
8zt3DF9BMkEck54ZvAjpNSern6zfZb1jPYWZq3TKxlTs/GuCgBAuV4i5vDTZ7xK/
aF+OFY5zN7ciZHkqLgMiTZ+RhqRcK6FhVBP/Y7d9NlBOcDBTxxE1ZO1ute6n7guJ
ciw4hfoRk8qNN19szZuq3UU64zpkM2sBsIFM9tGF2FADRxiOaOWZHmIyVZriPFqW
RUwjSjs7jBVNq0Vy4fCu/5+e+XLOUBOoqtM5W7ELt0t1w9tXebtPEetV86in8fU2
=0chn
-----END PGP PUBLIC KEY BLOCK-----
Other bugs

Bugs can always be described to the Mailing list, but the best way to report an issue and to ensure a timely response is to use the issue tracker.

  1. Create a GitHub account.

You need to create a GitHub account to be able to create new issues and participate in the discussion.

  1. Determine if your bug is really a bug.

You should not file a bug if you are requesting support. For that you can use the Mailing list, or IRC.

  1. Make sure your bug hasn’t already been reported.

Search through the appropriate Issue tracker. If a bug like yours was found, check if you have new information that could be reported to help the developers fix the bug.

  1. Check if you’re using the latest version.

A bug could be fixed by some other improvements and fixes - it might not have an existing report in the bug tracker. Make sure you’re using the latest releases of celery, billiard and kombu.

  1. Collect information about the bug.

To have the best chance of having a bug fixed, we need to be able to easily reproduce the conditions that caused it. Most of the time this information will be from a Python traceback message, though some bugs might be in design, spelling or other errors on the website/docs/code.

  1. If the error is from a Python traceback, include it in the bug report.

  2. We also need to know what platform you’re running (Windows, OS X, Linux, etc.), the version of your Python interpreter, and the version of Celery, and related packages that you were running when the bug occurred.

  3. If you are reporting a race condition or a deadlock, tracebacks can be hard to get or might not be that useful. Try to inspect the process to get more diagnostic data. Some ideas:

    • Enable celery’s breakpoint signal and use it to inspect the process’s state. This will allow you to open a pdb session.
    • Collect tracing data using strace_(Linux), dtruss (OSX) and ktrace(BSD), ltrace and lsof.
  4. Include the output from the celery report command:

    $ celery -A proj report
    

    This will also include your configuration settings and it try to remove values for keys known to be sensitive, but make sure you also verify the information before submitting so that it doesn’t contain confidential information like API tokens and authentication credentials.

  1. Submit the bug.

By default GitHub will email you to let you know when new comments have been made on your bug. In the event you’ve turned this feature off, you should check back on occasion to ensure you don’t miss any questions a developer trying to fix the bug might ask.

Issue Trackers

Bugs for a package in the Celery ecosystem should be reported to the relevant issue tracker.

If you are unsure of the origin of the bug you can ask the Mailing list, or just use the Celery issue tracker.

Contributors guide to the codebase

There’s a separate section for internal details, including details about the codebase and a style guide.

Read Contributors Guide to the Code for more!

Versions

Version numbers consists of a major version, minor version and a release number. Since version 2.1.0 we use the versioning semantics described by semver: http://semver.org.

Stable releases are published at PyPI while development releases are only available in the GitHub git repository as tags. All version tags starts with “v”, so version 0.8.0 is the tag v0.8.0.

Branches

Current active version branches:

You can see the state of any branch by looking at the Changelog:

If the branch is in active development the topmost version info should contain metadata like:

2.4.0
======
:release-date: TBA
:status: DEVELOPMENT
:branch: master

The status field can be one of:

  • PLANNING

    The branch is currently experimental and in the planning stage.

  • DEVELOPMENT

    The branch is in active development, but the test suite should be passing and the product should be working and possible for users to test.

  • FROZEN

    The branch is frozen, and no more features will be accepted. When a branch is frozen the focus is on testing the version as much as possible before it is released.

master branch

The master branch is where development of the next version happens.

Maintenance branches

Maintenance branches are named after the version, e.g. the maintenance branch for the 2.2.x series is named 2.2. Previously these were named releaseXX-maint.

The versions we currently maintain is:

  • 3.1

    This is the current series.

  • 3.0

    This is the previous series, and the last version to support Python 2.5.

Archived branches

Archived branches are kept for preserving history only, and theoretically someone could provide patches for these if they depend on a series that is no longer officially supported.

An archived version is named X.Y-archived.

Our currently archived branches are:

  • 2.5-archived
  • 2.4-archived
  • 2.3-archived
  • 2.1-archived
  • 2.0-archived
  • 1.0-archived
Feature branches

Major new features are worked on in dedicated branches. There is no strict naming requirement for these branches.

Feature branches are removed once they have been merged into a release branch.

Tags

Tags are used exclusively for tagging releases. A release tag is named with the format vX.Y.Z, e.g. v2.3.1. Experimental releases contain an additional identifier vX.Y.Z-id, e.g. v3.0.0-rc1. Experimental tags may be removed after the official release.

Working on Features & Patches

Note

Contributing to Celery should be as simple as possible, so none of these steps should be considered mandatory.

You can even send in patches by email if that is your preferred work method. We won’t like you any less, any contribution you make is always appreciated!

However following these steps may make maintainers life easier, and may mean that your changes will be accepted sooner.

Forking and setting up the repository

First you need to fork the Celery repository, a good introduction to this is in the Github Guide: Fork a Repo.

After you have cloned the repository you should checkout your copy to a directory on your machine:

$ git clone git@github.com:username/celery.git

When the repository is cloned enter the directory to set up easy access to upstream changes:

$ cd celery
$ git remote add upstream git://github.com/celery/celery.git
$ git fetch upstream

If you need to pull in new changes from upstream you should always use the --rebase option to git pull:

git pull --rebase upstream master

With this option you don’t clutter the history with merging commit notes. See Rebasing merge commits in git. If you want to learn more about rebasing see the Rebase section in the Github guides.

If you need to work on a different branch than master you can fetch and checkout a remote branch like this:

git checkout --track -b 3.0-devel origin/3.0-devel
Running the unit test suite

To run the Celery test suite you need to install a few dependencies. A complete list of the dependencies needed are located in requirements/test.txt.

Installing the test requirements:

$ pip install -U -r requirements/test.txt

When installation of dependencies is complete you can execute the test suite by calling nosetests:

$ nosetests

Some useful options to nosetests are:

  • -x

    Stop running the tests at the first test that fails.

  • -s

    Don’t capture output

  • --nologcapture

    Don’t capture log output.

  • -v

    Run with verbose output.

If you want to run the tests for a single test file only you can do so like this:

$ nosetests celery.tests.test_worker.test_worker_job
Creating pull requests

When your feature/bugfix is complete you may want to submit a pull requests so that it can be reviewed by the maintainers.

Creating pull requests is easy, and also let you track the progress of your contribution. Read the Pull Requests section in the Github Guide to learn how this is done.

You can also attach pull requests to existing issues by following the steps outlined here: http://bit.ly/koJoso

Calculating test coverage

To calculate test coverage you must first install the coverage module.

Installing the coverage module:

$ pip install -U coverage

Code coverage in HTML:

$ nosetests --with-coverage --cover-html

The coverage output will then be located at celery/tests/cover/index.html.

Code coverage in XML (Cobertura-style):

$ nosetests --with-coverage --cover-xml --cover-xml-file=coverage.xml

The coverage XML output will then be located at coverage.xml

Running the tests on all supported Python versions

There is a tox configuration file in the top directory of the distribution.

To run the tests for all supported Python versions simply execute:

$ tox

If you only want to test specific Python versions use the -e option:

$ tox -e py26
Building the documentation

To build the documentation you need to install the dependencies listed in requirements/docs.txt:

$ pip install -U -r requirements/docs.txt

After these dependencies are installed you should be able to build the docs by running:

$ cd docs
$ rm -rf .build
$ make html

Make sure there are no errors or warnings in the build output. After building succeeds the documentation is available at .build/html.

Verifying your contribution

To use these tools you need to install a few dependencies. These dependencies can be found in requirements/pkgutils.txt.

Installing the dependencies:

$ pip install -U -r requirements/pkgutils.txt
pyflakes & PEP8

To ensure that your changes conform to PEP8 and to run pyflakes execute:

$ paver flake8

To not return a negative exit code when this command fails use the -E option, this can be convenient while developing:

$ paver flake8 -E
API reference

To make sure that all modules have a corresponding section in the API reference please execute:

$ paver autodoc
$ paver verifyindex

If files are missing you can ad