Configure Celery with SQS and Django on Elastic Beanstalk
Has your users complained about the loading issue on the web app you developed? That might be because of some long I/O bound call or a time consuming process. For example, when a customer signs up to a website and we need to send confirmation email which in normal case the email will be sent and then reply 200 OK response is sent on signup POST. However we can send email later, after sending 200 OK response, right? This is not so straight forward when you are working with a framework like Django, which is tightly binded to MVC paradigm.
So, how do we do it? The very first thought in mind would be python threading module. Well, Python threads are implemented as pthreads (kernel threads), and because of the global interpreter lock (GIL), a Python process only runs one thread at a time. And again threads are hard to manage, maintain code and scale it.
Introduction
Celery is here to rescue. It can help when you have a time consuming task (heavy compute or I/O bound tasks) between request-response cycle. Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. In this post I will walk you through the celery setup procedure with Django and SQS on Elastic Beanstalk.
Prerequisite
Audience for this blog requires to have knowledge about Django and AWS Elastic Beanstalk.
Why Celery?
Celery is very easy to integrate with existing code base. Just write a decorator above the definition of a function declaring a celery task and call that function with a .delay method of that function.
Define Celery task
1from celery import Celery
2app = Celery('hello', broker='amqp://guest@localhost//')
3
4@app.task
5def hello():
6 return 'hello world'Call Celery task in worker process
1# Calling a celery task
2hello.delay()Broker
To work with celery, we need a message broker. As of writing this blog, Celery supports RabbitMQ, Redis, and Amazon SQS (not fully) as message broker solutions. Unless you do not want to stick to AWS ecosystem (as in my case), I recommend to go with RabbitMQ or Redis because SQS does not yet support remote control commands and events. One of the reason to use SQS is its pricing. One million SQS free request per month for every user.
Proceeding with SQS, go to AWS SQS dashboard and create a new SQS queue. Click on create new queue button. Depending upon the requirement we can select any type of the queue. We will name queue as dev-celery.
Installation
Activate your virtual environment, if you have configured one and install celery.
pip install celery[sqs]Configuration
Celery has built-in support of Django. It will pick its setting parameter from Django's settings.py which are prepended by CELERY_ (CELERY word needs to be defined while initializing celery app as namespace). So put below setting parameter in settings.py.
1# Amazon credentials will be taken from environment variable.
2CELERY_BROKER_URL = 'sqs://'AWS login credentials should be present in the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
1CELERY_BROKER_TRANSPORT_OPTIONS = {
2 'region': 'us-west-2',
3 'visibility_timeout': 3600,
4 'polling_interval': 10,
5}
6
7CELERYD_PREFETCH_MULTIPLIER = 0Now let us configure celery app within Django code. Create a celery.py file besides Django's settings.py.
1from __future__ import absolute_import, unicode_literals
2import os
3from celery import Celery
4
5# set the default Django settings module for the 'celery' program.
6os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
7
8app = Celery('proj')
9
10# Using a string here means the worker doesn't have to serialize
11# the configuration object to child processes.
12# - namespace='CELERY' means all celery-related configuration keys
13# should have a `CELERY_` prefix.
14app.config_from_object('django.conf:settings', namespace='CELERY')
15
16# Load task modules from all registered Django app configs.
17app.autodiscover_tasks()
18
19@app.task(bind=True)
20def debug_task(self):
21 print('Request: {0!r}'.format(self.request))Now put below code in project __init__.py.
1from __future__ import absolute_import, unicode_literals
2
3# This will make sure the app is always imported when Django starts
4# so that shared_task will use this app.
5from .celery import app as celery_app
6
7__all__ = ('celery_app',)Testing
Now let us test the configuration. Open terminal and start celery.
$ celery worker --app=proj --loglevel=INFO
-------------- celery@lintel v4.1.0 (latentcall)
--- ***** *** * -- Linux-4.15.0-24-generic-x86_64-with-Ubuntu-18.04-bio
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x7f0ba29fa3d0
- ** ---------- .> transport: sqs://localhost//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celeryNow open Django shell in another terminal:
$ python manage.py shell
In [1]: from proj import celery
In [2]: celery.debug_task() # Not through celery
In [3]: celery.debug_task.delay() # This is through celeryAfter executing the task function with delay method, that task should run in the worker process which is listening to events in other terminal. Celery sends a message to SQS with details of the task and the worker process receives it and executes the task.
Deploy celery worker process on AWS Elastic Beanstalk
Celery provides "multi" sub command to run process in daemon mode, but this cannot be used on production. Celery recommends various daemonization tools. AWS Elastic Beanstalk already uses supervisord for managing web server process. Celery can also be configured using supervisord tool.
Create two files under .ebextensions directory. celery-worker.sh extracts the environment variable and forms celery configuration, which is copied to /opt/python/etc/celery.conf and supervisord is restarted.
File: celery-worker.sh
#!/usr/bin/env bash
# Get django environment variables
celeryenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/export //g'`
celeryenv=${celeryenv%?}
# Create celery configuration script
celeryconf="[program:celeryd-worker]
command=/opt/python/run/venv/bin/celery worker -A PROJECT_NAME -P solo --loglevel=INFO
directory=/opt/python/current/app
user=nobody
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600
killasgroup=true
priority=998
environment=$celeryenv"
# Create the celery supervisord conf script
echo "$celeryconf" | tee /opt/python/etc/celery.conf
# Add configuration script to supervisord conf
if ! grep -Fxq "[include]" /opt/python/etc/supervisord.conf; then
echo "[include]" | tee -a /opt/python/etc/supervisord.conf
echo "files: celery.conf" | tee -a /opt/python/etc/supervisord.conf
fi
# Restart celery through supervisord
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf reread
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf update
/usr/local/bin/supervisorctl -c /opt/python/etc/supervisord.conf restart celeryd-workerFile: celery.config
1packages:
2 yum:
3 libcurl-devel: []
4
5container_commands:
6 01_mkdir_for_log_and_pid:
7 command: "mkdir -p /var/log/celery/ /var/run/celery/"
8 02_celery_configure:
9 command: "cp .ebextensions/celery-worker.sh /opt/elasticbeanstalk/hooks/appdeploy/post/"
10 cwd: "/opt/python/ondeck/app"
11 03_celery_run:
12 command: "/opt/elasticbeanstalk/hooks/appdeploy/post/celery-worker.sh"Add these files to git and deploy to Elastic Beanstalk. This completes the setup of Celery with SQS on AWS Elastic Beanstalk!
Need Help With Your Project?
We've delivered production-ready solutions for startups and enterprises. Let's discuss your project.
Get in Touch