Creating a Nested (Chained) Queue in Celery

If you’re building any type of application, chances are you have background tasks that need to be run periodically. Although there a lot of task queues out there, my personal favorite is Celery mainly because it’s simple to configure yet very powerful in its feature set.

One of the challenges we faced when building our queue was getting it to work as efficiently as possible with our MongoDB datastore. The embedded nature of Mongo’s BSON structure allows different documents to have various levels of nesting with any number of child elements.

Take for example, a collection named users (where each document is one user), and each user has a list of Twitter accounts and the latest 20 tweets from each. A simple queue would just process one user at a time, but that may create some problems if certain users have only 1 or 2 Twitter accounts and some have 20 or 30. To resolve this, you need to implement a nested (more like chained since new tasks get added to the end) task queue.

Note that this is a situation unique to NoSQL datastores. In MySQL you could simply update your table of Twitter accounts without having to directly reference an individual user.

Install Celery

If you haven’t already, go ahead and install celery using pip:

$ pip install celery

Set up Tasks and Celeryconfig

Go ahead and create a directory for your celery files and cd into it:

$ mkdir /home/myapp/celery && cd /home/myapp/celery

Create a tasks.py file:

# Register Celery
from celery import Celery
celery = Celery('tasks', broker='amqp://guest@localhost//')
# I’m using RabbitMQ as a broker, but there are other options: https://docs.celeryq.dev/en/latest/getting-started/first-steps-with-celery.html#choosing-a-broker

from celery.decorators import task

@task
def initiateUpdate():
    """
    This is the main function that will be called periodically. It goes through all the users and uses updateTwitterAccount to individually update each Twitter account.
    """
    for user in list_of_users:
        for account in user.twitter_accounts:
            updateTwitterAccount.delay(account) # don’t forget to use .delay to add the task to the celery queue

@task
def updateTwitterAccount(account):
    """
    This is the function that is called by initiateUpdate to update individual Twitter accounts
    """
    # do whatever you need to do to update a Twitter account

You’ll also need a celeryconfig.py file:

BROKER_URL = 'amqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'

# We’re going to use celerybeat to call initiateUpdate every 3 hours
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'update-linked-calendars': {
        'task': 'tasks.initiateUpdate',
        'schedule': crontab(minute=0, hour='*/3') # customize your crontab: https://docs.celeryq.dev/en/latest/userguide/periodic-tasks.html#crontab-schedules
    },
}

CELERY_IMPORTS = ('tasks',)

With that, you should have all the files you’ll need to run celery and celerybeat from command line. However, it makes much more sense to set up celeryd and celerybeat as a daemon.

Just to clarify for those of you not too familiar with celery: celerybeat is similar to cron but only adds tasks to a queue at periodic intervals; you must run celeryd along with it to process all the tasks sent by celerybeat.

Daemonize Celery (Optional)

Celery does not daemonize itself by default so you need to manually add the scripts.

$ cd /etc/default

Create two files called celeryd and celerybeat. The source for these files can be found here and here.

You’ll also need to add config files for the daemon.

/etc/default/celeryd:

# Name of nodes to start (here we have a single node)
CELERYD_NODES="w1"
# our you could have 3 codes:
# CELERYD_NODES="w1 w2 w3"

# Where to chdir at start.
CELERYD_CHDIR="/home/myapp/celery/"

# Extra arguments to celeryd
CELERYD_OPTS="--time-limit=300 --concurrency=8"

# Name of the celery config module.
CELERY_CONFIG_MODULE="celeryconfig"

# %n will be replaced with the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"

# Workers should run as an unprivileged user.
CELERYD_USER="user"
CELERYD_GROUP="user"

/etc/default/celerybeat:

# Where to chdir at start.
CELERYBEAT_CHDIR="/home/myapp/celery/"

# Name of the celery config module.
CELERY_CONFIG_MODULE="celeryconfig"

Now, simply start the celeryd and celerybeat daemon with the following command:

$ /etc/init.d/celeryd start
$ /etc/init.d/celerybeat start

There you have it, a simple and easy way for you to run nested background tasks with celery.