Tag Archives: celery

Configure Celery with SQS and Django on Elastic Beanstalk

 Introduction

Has your users complained about the loading issue on the web app you developed. That might be because of some long I/O bound call or a time consuming process. For example, when a customer signs up to website and we need to send confirmation email which in normal case the email will be sent and then reply 200 OK response is sent on signup POST. However we can send email later, after sending 200 OK response, right?. This is not so straight forward when you are working with  a framework like Django, which is tightly binded to MVC paradigm.

So, how do we do it ? The very first thought in mind would be python threading module. Well, Python threads are implemented as pthreads (kernel threads), and because of the global interpreter lock (GIL), a Python process only runs one thread at a time. And again threads are hard to manage, maintain code and scale it.

Perequisite

Audience for this blog requires to have knowledge about Django and AWS elastic beanstalk.

Celery

Celery is here to rescue. It can help when you have a time consuming task (heavy compute or I/O bound tasks) between request-response cycle. Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. In this post I will walk you through the celery setup procedure with django and SQS on elastic beanstalk.

Why Celery ?   

Celery is very easy to integrate with existing code base. Just write a decorator above the definition of a function declaring a celery task and call that function with a .delay method of that function.

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def hello():
    return 'hello world'
# Calling a celery task
hello.delay()

Broker

To work with celery, we need a message broker. As of writing this blog, Celery supports RabbitMQ, Redis, and Amazon SQS (not fully) as message broker solutions. Unless you don’t want to stick to AWS ecosystem (as in my case), I recommend to go with RabbitMQ or Redis because SQS does not yet support remote control commands and events. For more info check here. One of the reason to use SQS is its pricing. One million SQS free request per month for every user.

Proceeding with SQS, go to AWS SQS dashboard and create a new SQS queues. Click on create new queue button.

Depending upon the requirement we can select any type of the queue. We will name queue as dev-celery.

Installation

Celery has a very nice documentation. Installation and configuration is described here. For convenience here are the steps

Activate your virtual environment, if you have configured one and install cerely.

pip install celery[sqs]

Configuration

Celery has built-in support of django. It will pick its setting parameter from django’s settings.py which are prepended by CELERY_ (‘CELERY’ word needs to be defined while initializing celery app as namespace). So put below setting parameter in settings.py

# Amazon credentials will be taken from environment variable.
CELERY_BROKER_URL = 'sqs://'

AWS login credentials should be present in the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

CELERY_BROKER_TRANSPORT_OPTIONS = {'region': 'us-west-2',
                                   'visibility_timeout': 3600,
                                   'polling_interval': 10,
                                   'queue_name_prefix': '%s-' % {True: 'dev',
                                                                 False: 'production'}[DEBUG],
                                   'CELERYD_PREFETCH_MULTIPLIER': 0,
                                  }


Now let’s configure celery app within django code. Create a celery.py file besides django’s settings.py.

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
   print('Request: {0!r}'.format(self.request))

Now put below code in projects __init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Testing

Now let’s test the configuration. Open terminal start celery

Terminal 1

$ celery worker --app=proj --loglevel=INFO
-------------- celery@lintel v4.1.0 (latentcall)
---- **** -----
--- * ***  * -- Linux-4.15.0-24-generic-x86_64-with-Ubuntu-18.04-bionic 2018-07-04 11:18:57
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         enq_web:0x7f0ba29fa3d0
- ** ---------- .> transport:   sqs://localhost//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
               .> celery           exchange=celery(direct) key=celery
[tasks]
 . enq_web._celery.debug_task

 

All the task which are registered to use celery using celery decorators appear here while starting celery. If you find that your task does not appear here then make sure that the module containing the task is imported on startup.

Now open django shell in another terminal

Terminal 2

$ python manage.py shell

In [1]: from proj import celery
In [2]: celery.debug_task() # ←← ← Not through celery 
In [3]: celery.debug_task.delay() # ←← ← This is through celery

After executing the task function with delay method, that task should run in the worker process which is listening to events in other terminal. Here celery sent a message to SQS with details of the task and worker process which was listening to SQS, received it and task was executed in worker process. Below is what you should see in terminal 1

Terminal 1

Request: <Context: {'origin': 'gen14099@lintel', u'args': [], 'chain': None, 'root_id': '041be6c3-419d-4aa0-822f-d50da1b340a0', 'expires': None, u'is_eager': False, u'correlation_id': '041be6c3-419d-4aa0-822f-d50da1b340a0', 'chord': None, u'reply_to': 'd2e76b9b-094b-33b4-a873-db5d2ace8881', 'id': '041be6c3-419d-4aa0-822f-d50da1b340a0', 'kwargsrepr': '{}', 'lang': 'py', 'retries': 0, 'task': 'proj.celery.debug_task', 'group': None, 'timelimit': [None, None], u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': 'celery', u'exchange': u''}, u'hostname': u'celery@lintel', 'called_directly': False, 'parent_id': None, 'argsrepr': '()', 'errbacks': None, 'callbacks': None, u'kwargs': {}, 'eta': None, '_protected': 1}>

Deploy celery worker process on AWS elastic beanstalk

Celery provides “multi” sub command to run process in daemon mode, but this cannot be used on production. Celery recommends various daemonization tools http://docs.celeryproject.org/en/latest/userguide/daemonizing.html

AWS elastic beanstalk already use supervisord for managing web server process. Celery can also be configured using supervisord tool. Celery’s official documentation has a nice example of supervisord config for celery. https://github.com/celery/celery/tree/master/extra/supervisord. Based on that we write quite a few commands under .ebextensions directory.

Create two files under .ebextensions directory. Celery.sh file extract the environment variable and forms celery configuration, which copied to /opt/python/etc/celery.conf file and supervisord is restarted. Here main celery command:

celery worker -A PROJECT_NAME -P solo --loglevel=INFO -n worker.%%h.

At the time if writing this blog celery had https://github.com/celery/celery/issues/3759 issue. As a work around to this issue we add “-P solo”. This will run task sequentially for a single worker process.

#!/usr/bin/env bash

# Get django environment variables
celeryenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/export //g' | sed 's/$PATH/%(ENV_PATH)s/g' | sed 's/$PYTHONPATH//g' | sed 's/$LD_LIBRARY_PATH//g'`
celeryenv=${celeryenv%?}

# Create celery configuraiton script
celeryconf="[program:celeryd-worker]
; Set full path to celery program if using virtualenv
command=/opt/python/run/venv/bin/celery worker -A PROJECT_NAME -P solo --loglevel=INFO -n worker.%%h

directory=/opt/python/current/app/enq_web
user=nobody
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

environment=$celeryenv
"

# Create the celery supervisord conf script
echo "$celeryconf" | tee /opt/python/etc/celery.conf

# Add configuration script to supervisord conf (if not there already)
if ! grep -Fxq "[include]" /opt/python/etc/supervisord.conf
  then
  echo "[include]" | tee -a /opt/python/etc/supervisord.conf
  echo "files: celery.conf" | tee -a /opt/python/etc/supervisord.conf
fi

# Reread the supervisord config
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf reread

# Update supervisord in cache without restarting all services
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf update

# Start/Restart celeryd through supervisord
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf restart celeryd-worker

Now create elastic beanstalk configuration file as below. Make sure you have pycurl and celery in requirements.txt. To install pycurl libcurl-devel needs to be installed from yum package manager.

packages:
  yum:
    libcurl-devel: []

container_commands:
    01_mkdir_for_log_and_pid:
        command: "mkdir -p /var/log/celery/ /var/run/celery/"
    02_celery_configure:
        command: "cp .ebextensions/celery-worker.sh /opt/elasticbeanstalk/hooks/appdeploy/post/ && chmod 744 /opt/elasticbeanstalk/hooks/appdeploy/post/celery-worker.sh"
        cwd: "/opt/python/ondeck/app"
    03_celery_run:
        command: "/opt/elasticbeanstalk/hooks/appdeploy/post/celery-worker.sh"

Add these files to git and deploy to elastic beanstalk.

Below is the figure describing the architecture with django, celery and elastic beanstalk.

How to integrate Celery into Django project

What is Celery?

Celery is a distributed task queue that allows us to execute jobs in background. This article explains how to set up Celery with Django to perform a background task.

Advantages:

  • Large or small, Celery makes scheduling such periodic tasks easy.
  • You never want end users to have to wait unnecessarily for pages to load or actions to complete. If a long process is part of your application’s workflow, you can use Celery to execute that process in the background, as resources become available, so that your application can continue to respond to client requests.

Celery uses brokers to pass messages between a Django Project and the Celery workers. We will use Redis as the message broker.

Installation

Before diving into Celery, follow the below setup points

Create a new virtualenv ‘venv’ using following command:

$ virtualenv venv

To activate the environment use command:

$ source ./venv/bin/activate

Install django and create a django project ‘myproject’. Make sure to activate a virtualenv, create a requirements.txt file and run the migrations. Then fire up the server and navigate to http://localhost:8000/ in your browser. You should see the familiar “Congratulations on your first Django-powered page” text. When done, kill the sever.

Let’s install Celery:

$ pip install celery

$ pip freeze > requirements.txt

Now we will integrate Celery into our Django project with the following steps:

Step 1:

Inside the myproject directory i.e beside your settings.py create a new file called celery.py and add the following code in that:

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Let’s break down what happens in the first module, first we import absolute imports from the future, so that our celery.py module will not clash with the library:

from __future__ import absolute_import

Then we set the default DJANGO_SETTINGS_MODULE for the celery command-line program:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

Specifying the settings here means the celery command line program will know where your Django project is. This statement must always appear before the app instance is created, which is what we do next:

app = Celery('myproject')

This is your instance of the library, you can have many instances but there’s probably no reason for that when using Django.

We also add the Django settings module as a configuration source for Celery. This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.

You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object when using Windows or execv:

app.config_from_object('django.conf:settings')

Next, a common practice for reusable apps is to define all tasks in a separate tasks.py module, and Celery does have a way to autodiscover these modules:

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Step 2:

To ensure that the Celery app is loaded when Django starts, add the following code into the __init__.py file that sits next to your settings.py file:

from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.

from .celery import app as celery_app

Project layout should look like:

├── manage.py
├── myproject
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── requirements.txt

Step 3:

Celery uses “brokers” to pass messages between a Django Project and the Celery workers. In this article, we will use Redis as the message broker.

First, install Redis from the official download page and then turn to your terminal, in a new terminal window, fire up the server:

$ redis-server

You can test that Redis is working properly by typing this into your terminal:

$ redis-cli ping

Redis should reply with PONG – try it!

Once Redis is up, add the following code to your settings.py file:

# CELERY STUFF
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Nairobi'

You also need to add Redis as a dependency in the Django Project:

$ pip install redis==2.10.3
$ pip freeze > requirements.txt

Test that the Celery worker is ready to receive tasks:

$ celery -A myproject worker -l info

Kill the process with CTRL-C. Now, test that the Celery task scheduler is ready for action:

$ celery -A myproject beat -l info

That’s it! You can now use Celery with Django. For more information on setting up Celery with Django, please check out the official Celery documentation.