All posts by Sumeet P

Configure Celery with SQS and Django on Elastic Beanstalk

 Introduction

Has your users complained about the loading issue on the web app you developed. That might be because of some long I/O bound call or a time consuming process. For example, when a customer signs up to website and we need to send confirmation email which in normal case the email will be sent and then reply 200 OK response is sent on signup POST. However we can send email later, after sending 200 OK response, right?. This is not so straight forward when you are working with  a framework like Django, which is tightly binded to MVC paradigm.

So, how do we do it ? The very first thought in mind would be python threading module. Well, Python threads are implemented as pthreads (kernel threads), and because of the global interpreter lock (GIL), a Python process only runs one thread at a time. And again threads are hard to manage, maintain code and scale it.

Perequisite

Audience for this blog requires to have knowledge about Django and AWS elastic beanstalk.

Celery

Celery is here to rescue. It can help when you have a time consuming task (heavy compute or I/O bound tasks) between request-response cycle. Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. In this post I will walk you through the celery setup procedure with django and SQS on elastic beanstalk.

Why Celery ?   

Celery is very easy to integrate with existing code base. Just write a decorator above the definition of a function declaring a celery task and call that function with a .delay method of that function.

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def hello():
    return 'hello world'
# Calling a celery task
hello.delay()

Broker

To work with celery, we need a message broker. As of writing this blog, Celery supports RabbitMQ, Redis, and Amazon SQS (not fully) as message broker solutions. Unless you don’t want to stick to AWS ecosystem (as in my case), I recommend to go with RabbitMQ or Redis because SQS does not yet support remote control commands and events. For more info check here. One of the reason to use SQS is its pricing. One million SQS free request per month for every user.

Proceeding with SQS, go to AWS SQS dashboard and create a new SQS queues. Click on create new queue button.

Depending upon the requirement we can select any type of the queue. We will name queue as dev-celery.

Installation

Celery has a very nice documentation. Installation and configuration is described here. For convenience here are the steps

Activate your virtual environment, if you have configured one and install cerely.

pip install celery[sqs]

Configuration

Celery has built-in support of django. It will pick its setting parameter from django’s settings.py which are prepended by CELERY_ (‘CELERY’ word needs to be defined while initializing celery app as namespace). So put below setting parameter in settings.py

# Amazon credentials will be taken from environment variable.
CELERY_BROKER_URL = 'sqs://'

AWS login credentials should be present in the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

CELERY_BROKER_TRANSPORT_OPTIONS = {'region': 'us-west-2',
                                   'visibility_timeout': 3600,
                                   'polling_interval': 10,
                                   'queue_name_prefix': '%s-' % {True: 'dev',
                                                                 False: 'production'}[DEBUG],
                                   'CELERYD_PREFETCH_MULTIPLIER': 0,
                                  }


Now let’s configure celery app within django code. Create a celery.py file besides django’s settings.py.

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
   print('Request: {0!r}'.format(self.request))

Now put below code in projects __init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Testing

Now let’s test the configuration. Open terminal start celery

Terminal 1

$ celery worker --app=proj --loglevel=INFO
-------------- celery@lintel v4.1.0 (latentcall)
---- **** -----
--- * ***  * -- Linux-4.15.0-24-generic-x86_64-with-Ubuntu-18.04-bionic 2018-07-04 11:18:57
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         enq_web:0x7f0ba29fa3d0
- ** ---------- .> transport:   sqs://localhost//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
               .> celery           exchange=celery(direct) key=celery
[tasks]
 . enq_web._celery.debug_task

 

All the task which are registered to use celery using celery decorators appear here while starting celery. If you find that your task does not appear here then make sure that the module containing the task is imported on startup.

Now open django shell in another terminal

Terminal 2

$ python manage.py shell

In [1]: from proj import celery
In [2]: celery.debug_task() # ←← ← Not through celery 
In [3]: celery.debug_task.delay() # ←← ← This is through celery

After executing the task function with delay method, that task should run in the worker process which is listening to events in other terminal. Here celery sent a message to SQS with details of the task and worker process which was listening to SQS, received it and task was executed in worker process. Below is what you should see in terminal 1

Terminal 1

Request: <Context: {'origin': 'gen14099@lintel', u'args': [], 'chain': None, 'root_id': '041be6c3-419d-4aa0-822f-d50da1b340a0', 'expires': None, u'is_eager': False, u'correlation_id': '041be6c3-419d-4aa0-822f-d50da1b340a0', 'chord': None, u'reply_to': 'd2e76b9b-094b-33b4-a873-db5d2ace8881', 'id': '041be6c3-419d-4aa0-822f-d50da1b340a0', 'kwargsrepr': '{}', 'lang': 'py', 'retries': 0, 'task': 'proj.celery.debug_task', 'group': None, 'timelimit': [None, None], u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': 'celery', u'exchange': u''}, u'hostname': u'celery@lintel', 'called_directly': False, 'parent_id': None, 'argsrepr': '()', 'errbacks': None, 'callbacks': None, u'kwargs': {}, 'eta': None, '_protected': 1}>

Deploy celery worker process on AWS elastic beanstalk

Celery provides “multi” sub command to run process in daemon mode, but this cannot be used on production. Celery recommends various daemonization tools http://docs.celeryproject.org/en/latest/userguide/daemonizing.html

AWS elastic beanstalk already use supervisord for managing web server process. Celery can also be configured using supervisord tool. Celery’s official documentation has a nice example of supervisord config for celery. https://github.com/celery/celery/tree/master/extra/supervisord. Based on that we write quite a few commands under .ebextensions directory.

Create two files under .ebextensions directory. Celery.sh file extract the environment variable and forms celery configuration, which copied to /opt/python/etc/celery.conf file and supervisord is restarted. Here main celery command:

celery worker -A PROJECT_NAME -P solo --loglevel=INFO -n worker.%%h.

At the time if writing this blog celery had https://github.com/celery/celery/issues/3759 issue. As a work around to this issue we add “-P solo”. This will run task sequentially for a single worker process.

#!/usr/bin/env bash

# Get django environment variables
celeryenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/export //g' | sed 's/$PATH/%(ENV_PATH)s/g' | sed 's/$PYTHONPATH//g' | sed 's/$LD_LIBRARY_PATH//g'`
celeryenv=${celeryenv%?}

# Create celery configuraiton script
celeryconf="[program:celeryd-worker]
; Set full path to celery program if using virtualenv
command=/opt/python/run/venv/bin/celery worker -A PROJECT_NAME -P solo --loglevel=INFO -n worker.%%h

directory=/opt/python/current/app/enq_web
user=nobody
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

environment=$celeryenv
"

# Create the celery supervisord conf script
echo "$celeryconf" | tee /opt/python/etc/celery.conf

# Add configuration script to supervisord conf (if not there already)
if ! grep -Fxq "[include]" /opt/python/etc/supervisord.conf
  then
  echo "[include]" | tee -a /opt/python/etc/supervisord.conf
  echo "files: celery.conf" | tee -a /opt/python/etc/supervisord.conf
fi

# Reread the supervisord config
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf reread

# Update supervisord in cache without restarting all services
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf update

# Start/Restart celeryd through supervisord
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf restart celeryd-worker

Now create elastic beanstalk configuration file as below. Make sure you have pycurl and celery in requirements.txt. To install pycurl libcurl-devel needs to be installed from yum package manager.

packages:
  yum:
    libcurl-devel: []

container_commands:
    01_mkdir_for_log_and_pid:
        command: "mkdir -p /var/log/celery/ /var/run/celery/"
    02_celery_configure:
        command: "cp .ebextensions/celery-worker.sh /opt/elasticbeanstalk/hooks/appdeploy/post/ && chmod 744 /opt/elasticbeanstalk/hooks/appdeploy/post/celery-worker.sh"
        cwd: "/opt/python/ondeck/app"
    03_celery_run:
        command: "/opt/elasticbeanstalk/hooks/appdeploy/post/celery-worker.sh"

Add these files to git and deploy to elastic beanstalk.

Below is the figure describing the architecture with django, celery and elastic beanstalk.

Implementing Webhook Handler in Python.

What is Webhook ?

Webhook is an asynchronous HTTP callback on an event occurrence. It is a simple server to server communication for reporting a specific event occurred on a server. The server on which event occurred will fire a HTTP POST request to another server on a URL which is provided by receiving server.

For example, whenever your colleague pushes code commits to github, an event has occurred on github’s server. Now if a webhook URL is provided in github settings, a webhook will be fired to that URL. This webhook will be a HTTP POST request with commit details inside the body in a specified format.  More details on github webhook can be found here.

In this post, I will share my experience of implementing webhook handler in python. For the readers, basic knowledge on implementing web application in python would be better.

Webhook Handler

A Webhook can be handled by simply providing a URL endpoint in a web application. Following is an example using Django. Add webhook url in urls.py

from django.conf.urls import url
import views

urlpatterns = [
    url(r'^webhook', views.webhook, name='webhook'),
]

Now create view function in views.py which will parse the data and process it.  In most of the cases, webhook data is sent in JSON format. So lets load the webhook data and sent the data to process_webhook function.

Most of the web applications accept POST request after verifying CSRF token, but here we need to exempt it from this check. So put @csrf_token decorator above the view function. Also put an @require_post decorator to ensure the request is only POST.

from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST

@require_POST
@csrf_exempt
def webhook(request):

    # Load the event data from JSON
    data = json.loads(request.body)
    # And process it
    process_webhook(data)

    return 200, 'Processed.'

The above implementation of URL endpoint will remain different for various other python web framework like Flask, tornado, twisted. But the below code  process_webhook function implementation will remain same irrespective of any framework.

Processing event

There may be different type events we need to handle. So, before proceeding to implement process_webhook function, lets create a python module named webhook_events.py, which will contain a single function for each type of event wherein will be the logic for that particular event. In other words, we are going to map event name with its function, which will handle the logic for that particular type of webhook event.

def event_one(event):
    # do something for
    # for event 'event.one'


def event_two(event):
    # do something for
    # for event 'event.two'

There are many ways to implement process_webhook function and how we map a webhook event with its function. We are going to discuss different implementation of process_webhook based on extendability. Most basic version of that is below.

import webhook_events

def process_webhook(event):
    event_name = event['name']

    if event_name == 'event.one':
        webhook_event.event_one(event)

    elif event_name == 'event.two':
        webhook_event.event_two(event)

    # and so on

A Better way

Now suppose, there are 10s of webhook to be served. We certainly don’t want to write repetitive code. So below is a better way of implementing process_webhook. Here we just replace dot in event name with underscore, so that we get the function name written in webhook_events.py for that event. If the function is not found that means event is not registered (not being served). In this way, no matter the number webhook to be served, just write the function to handle it, in webhook_events.py

import webhook_events

def process_webhook(event):
    event_name = event['name']

    function_name = event_name.replace('.', '_')
    function = getattr(webhook_events, function_name, None)

    if function:
        function(event)
    else:
        print('Event %s is not registered.' % event_name)

Decorators

More robust and pythonic way of implementing process_webhook is by using decorators. Lets define a decorator in webhook_events.py which will map the event_name to its function. Here the EVENT_MAP is dictionary inside a setting module, which will contain event name as key and event function as its value.

from django.conf import settings

def register(event_name):

    def wrapper(event_function):
        
        # Initializing settings.event_map if not already
        event_map = getattr(settings, 'EVENT_MAP', None)
        if not event_map:
            settings.EVENT_MAP = dict()
        
        # Mapping event name to its function
        settings.EVENT_MAP[event_name] = event_function
    
        return event_function

    return wrapper


@register('event.one')
def event_one(event):
    # do something for
    # for event 'event.one'


@register('event.two')
def event_two(event):
    # do something for
    # for event 'event.two'

In this case, the process_webhook will look like below:

def process_webhook(event):
    event_name = event['name']
    function = settings.EVENT_MAP.get(event_name, None)

    if function:
        function(event)
    else:
        print('Event %s is not registered.' % event_name)

This is the way which I prefer to implement webhook handler in python. How would you prefer ? Please feel free to comment below.

Twisted Python networking library

How to implement Websocket server using Twisted.

HTTP is a request-response type one way protocol. For the web application where continuous data is to be send, websocket was introduced. Unlike HTTP, websocket provides full duplex communication. Websocket, which can be said as an upgraded version of HTTP, is standardized to be used over TCP like HTTP. In this article I will share my experience in implementing websocket with twisted, a framework of python for internet. If you are familiar with websocket, then you can skip to twisted.web or else below is a little introduction to websocket.

WebSocket

To initiate communication using websocket, a Handshake need to be done between client and server. This procedure is backward compatible to HTTP’s request – response structure. First the client sends a handshake request to the server which looks like:

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

Sending Upgrade header in request with value websocket will acknowledge server about websocket communication. Now if server supports websocket with specified sub-protocols (Sec-WebSocket-Protocol) and version (Sec-WebSocket-Version), it will send adequate response . Possible response could be:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
In response, server will send 101 Switching Protocols code and Sec-WebSocket-Accept whose value is calculated using Sec-WebSocket-Key. you can find more information here. After a successful handshake, any of the peer can send data to each other which must be encoded in binary format described in websocket RFC. A high-level overview of the framing is given in the following figure.
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

Twisted.web

websocket using twisted wire diagramAs in normal twisted.web server , at TCP level, we have HTTPChannel class (a child class of T.I.protocol.Protocol) and server.Site class (which is the child class of T.I.protocol.ServerFactory). Also a Resource instance needs to be passed to server.site class, so that it can serve GET request.

Whenever a data is received, DataReceived method of HTTPChannel is invoked. Now if data starts with ‘GET’, allow the HTTPChannel handle it, which will invoke the render function of the root resource provided to Site class. Render will set 101 response code and will compute the websocket response key. During handshake do not send any raw data, because if handshake is successful this will be considered as framed binary data. Even if you want to send, frame it and send.

If data doesn’t start with ‘GET’, that means we can assume it is a binary encoded message. Now this message can be decoded using Frame.py, which is a very simple data framing module following WebSocket specification. Data send to the client by server should be unmasked as per the websocket specification.

Below is code example of an echo websocket server.

import base64, hashlib
from twisted.internet import reactor
from twisted.web.server import (Site, http, resource)

class EchoResource(resource.Resource):
    isLeaf = 1

    def render(self, request):

        # Processing the Key as per RFC 6455
        key = request.getHeader('Sec-WebSocket-Key')
        h = hashlib.sha1(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
        request.setHeader('Sec-WebSocket-Accept', base64.b64encode(h.digest()))

        # setting response headers
        request.setHeader('Upgrade', 'websocket')
        request.setHeader('Connection', 'Upgrade')
        request.setResponseCode(101)
        return ''


class EchoChannel(http.HTTPChannel):

    def dataReceived(self, data):

        if data.startswith('GET'):
            # This will invoke the render method of resource provided
            http.HTTPChannel.dataReceived(self, data)

        else:
            # decoding Data using Frame module wrote by Morgan Philips.
            f = frame.Frame(bytearray(data))
            received_message = f.message()
            print received_message

            # Sending back the received message.
            msg = frame.Frame.buildMessage(received_message, mask=False)
            self.transport.write(str(msg))


class EchoSite(Site):
    def buildProtocol(self, addr):
        channel = EchoChannel()
        channel.requestFactory = self.requestFactory
        channel.site = self
        return channel

site = EchoSite(EchoResource())

if __name__ == '__main__':
    reactor.listenTCP(8080, site)
    reactor.run()