Django Task Queue

My django app needs to periodically fetch and process data from a 3rd party api. I need a task queue.

Celery is the big dog. It's well worth a look. It's great but I'm not going to use it. django-background-tasks is a simpler option. It uses a relational database to store the jobs and cron as the scheduler. It can get the job done but I'm not going to use it either.

I'm going to use a combination of redis, python-rq, and rq-scheduler. For me python-rq hits the sweet spot between complexity and flexibility.

Redis is a rock solid in-memory data structure store. It has data structures like lists and hashes, and commands like rpush, lpop, blpop that make it an excellent choice for implementing queues. Python-rq implements a simple api to queue jobs and run them using workers. Jobs are python function that are pickled, given an id, and stored in a redis hash. The id is then added to a redis list. Workers are python processes that run jobs by popping an id from the list, unpickling the function, fork()ing, and then executing the function.

The api is small and clean. I create a queue instance with the Queue() api passing it the redis connection. I then enqueue a job onto the queue with the Queue.enqueue() api. I then run the command line script rq worker that pops jobs from queues, fork()'s and executes them. I can even create a custom worker classes if I need different behavior from rq worker.

It is instructive to get this all going outside of django.

Queue Example (OS X)

  1. Install redis with % brew install redis
  2. Start redis with the default settings in a new terminal window % redis-server
  3. Create a new pyenv environment for playing with python-rq and redis
  4. % mkdir redis
    % cd redis
    % pyenv virtualenv redis
    % pyenv local redis
    % pip install rq
    % pip install rq-scheduler
    
  5. Create a module called work.py containing a function that will be performed by the job:
  6. # Get the n'th number in the Fibonacci sequence
    
    def fib(n):  
        a, b = 1, 1
        for i in range(n - 1):
            a, b = b, a + b
        return a
    
  7. Create a module called jobs.py containing code to enqueue a job:
    import time
    
    from rq import Queue  
    from redis import Redis
    
    from work import fib
    
    
    def queue_job():  
        # Tell rq to connect to redis on the default port and use the default queue
        redis_conn = Redis()
        q = Queue(connection=redis_conn)
    
        # Enqueue a job to calculate the 100th Fibonacci number
        job = q.enqueue(fib, 100)
    
        # Print the job.result immediately after enqueuing. 'None' means the job hasn't finished.
        print(job.result)   # => None
    
        # Now, wait a few seconds, until the worker is finished
        time.sleep(3)
        print(job.result)   # => 354224848179261915075
    
    
    if __name__ == '__main__':  
        queue_job()
    
  8. Start rq's worker process in a new terminal window % rq worker
  9. Create a new terminal window and arrange it so you can see it and the terminal window created in step 6. Run jobs.py to enqueue the jobs with % python jobs.py.

In #7 terminal you should see:

% python jobs.py 
None  
354224848179261915075  

In #6 terminal you should see something like:

21:18:14 RQ worker 'rq:worker:Suprise.90196' started, version 0.5.6  
21:18:14  
21:18:14 *** Listening on default...  
23:08:21 default: work.fib(100) (25dddb0b-586f-43e3-8ba1-c107a6eaa9a6)  
23:08:21 Job OK  
23:08:21 Result is kept for 500 seconds  
23:08:21  
23:08:21 *** Listening on default...  

The jobs.py module added a job to the default redis queue and the rq worker process popped the job and executed it. Very cool, but what about scheduling jobs in the future? That's where rq-scheduler comes in. rq-scheduler implements a simple api to schedule jobs and a process that polls redis every minute and moves scheduled jobs to queues when they need to be executed.

Scheduling Example

  1. In a new terminal window start the rq-scheduler process with default redis settings but change the poll interval to 5 seconds % rqscheduler -i 5.
  2. Create a module called schedule.py containing code that schedules a couple of jobs:
  3. from datetime import datetime, timedelta, timezone
    
    from redis import Redis  
    from rq_scheduler import Scheduler
    
    from work import fib
    
    
    def schedule_job():  
        # Get a scheduler for the "default" queue on the default redis server
        scheduler = Scheduler(connection=Redis())
    
        now = time_to_str(datetime.now())
    
        # Schedule a job, to find the 10th Fibonacci number, to be run in 10 seconds time.
        # The scheduler expects utc dates.
        job1_at = datetime.utcnow() + timedelta(seconds=10)
        scheduler.enqueue_at(job1_at, fib, 10)
        print("scheduling job1 at {} to run at {}".format(now, time_to_str(utc_to_local(job1_at))))
    
        # schedule a second job, to find the 20th Fibonacci number, to be run in 20 seconds time
        job2_at = datetime.utcnow() + timedelta(seconds=20)
        scheduler.enqueue_at(job2_at, fib, 20)
        print("scheduling job2 at {} to run at {}".format(now, time_to_str(utc_to_local(job2_at))))
    
    
    def time_to_str(date):  
        return date.strftime('%H:%M:%S')
    
    
    def utc_to_local(utc):  
        return utc.replace(tzinfo=timezone.utc).astimezone(tz=None)
    
    
    if __name__ == '__main__':  
        schedule_job()
    
  4. Schedule the jobs with python schedule.py.

The schedule.py terminal should just print out the time the jobs were scheduled:

% python schedule.py 
scheduling job1 at 00:14:50 to run at 00:15:00  
scheduling job2 at 00:14:50 to run at 00:15:10  

The rqscheduler terminal should look something like this:

% rqscheduler -i 5
00:14:46 Running RQ scheduler...  
00:14:46 Checking for scheduled jobs...  
00:14:51 Checking for scheduled jobs...  
00:14:56 Checking for scheduled jobs...  
00:15:01 Checking for scheduled jobs...  
00:15:07 Checking for scheduled jobs...  
00:15:12 Checking for scheduled jobs...  

The rq worker terminal (created in #6 in the queue example) should look something like this:

% rq worker
00:14:44 RQ worker 'rq:worker:Suprise.95509' started, version 0.5.6  
00:14:44  
00:14:44 *** Listening on default...  
00:15:02 default: work.fib(10) (9ecdad40-77a6-40ad-a689-36a147a5d465)  
00:15:02 Job OK  
00:15:02 Result is kept for 500 seconds  
00:15:02  
00:15:02 *** Listening on default...  
00:15:12 default: work.fib(20) (a144d888-b630-4d47-93fb-2873ff47841d)  
00:15:12 Job OK  
00:15:12 Result is kept for 500 seconds  

As you can see rqscheduler polls redis every 5 seconds 'checking for scheduled jobs...'. When it finds a job that was scheduled to run since the last time it polled redis it moves it to the 'default' queue. The worker process then pops the job from the queue and executes the job.

I've only covered the very basics here but the docs and source are great.

Integration with django

The whole point of this was to add task queues to my django app. That is where django-rq comes in. It is a django app that makes it easy to setup queues in settings.py. It wraps some of the python-rq and rq-scheduler apis to use the queues defined in settings.py. It includes a dashboard to monitor queues and django management commands that run workers and the scheduler with the django context so you can access your models from your task code.

The documentation is the best place to get started but it boils down to :

  1. Install django-rq with pip install django-rq
  2. Add django-rq to INSTALLED_APPS:
  3. INSTALLED_APPS = (  
        # other apps
        "django_rq",
    )
    
  4. Add the RQ_QUEUES setting to settings.py and define the queues you want to use:
  5. RQ_QUEUES = {  
        'default': {
            'HOST': 'localhost',
            'PORT': 6379,
            'DB': 0,
        },
        'low': {
            'HOST': 'localhost',
            'PORT': 6379,
            'DB': 0,
        }
    }
    
  6. Add the django-rq views to your URLConf url(r'^rq/', include('django_rq.urls')),
  7. Start the workers for your queues python manage.py rqworker default low.
  8. Start the scheduler python manage.py rqscheduler

You can now add jobs to queues with django_rq.get_queue('low').enqueue() or use the @job decorator. You can add jobs to the scheduler with django_rq.get_scheduler('low').enqueue_at(). You can monitor the queues at /rq/.

Tips

  1. Structure job code well. Decompose functionality into helper modules and keep the job code simple and clean.
  2. Be careful what you put on a queue. Jobs and their arguments are pickled and then executed in a separate context.
  3. If you find that you need multiple instances of redis remember that workers spun up with rqworker can only monitor queues in the same instance. The solution is to spin up additional workers for the other redis instances. For example, if I had started two instances of redis with one queue on each, default on port 6379 and low on 6380, I would need to start two workers: python manage.py rqworker default and python manage.py rqworker low.
  4. Put rqworker and rqscheduler under supervisord or some other process monitoring system.
  5. When python-rq jobs fail they get appended to the failed queue but you should configure custom exception handling for your jobs. Django-rq supports configuring exception handlers for queues you define in settings.py.
  6. Log like you mean it. You will need to piece things together after the inevitable failure so prepare for it.
  7. Channel your inner Sting and monitor everything about your queues and logs.

Cover photo of a Female Osmia Conjuncta Bee