Python Message Queues with Amazon Simple Queue Service

Tuesday, 16th November 2010 - Michael Halls-Moore - 3 Comments

Message queues are now all the rage. They've gained traction as a good practice in web development over the last few years and many robust backend solutions now exist. However, it's all good sudo apt-getting a message broker binary, but how do you actually use a one in your code? More importantly, what can brokers and queues be used for and when?

Message queues are designed to handle tasks that can be performed asynchronously from the process that generated them, by a set of distributed "worker" units. For instance, let's imagine that your users are continuously uploading videos to your content distribution site and the videos require encoding before being displayed. The users shouldn't be waiting for the encodes as the site will likely timeout. Instead, the tasks should be processed in the background and control of the site should be returned to the user immediately. However, encoding is a highly CPU intensive job and will tie-up the resources of the app server. Hence the task can be "farmed out" to other dedicated worker units in order to lighten the load. This is exactly the type of task where message queues shine.

If you search for help on Python and message queues then you are likely to be bombarded by multiple independent options for queue implementation. I'm not going to discuss the pros and cons of each setup, rather I'll discuss a particular service that I have found straightforward to use and can get you going immediately without too much hassle. This service is the Amazon Simple Queue Service (SQS).

SQS allows you to create multiple queues, with their own messages each containing up to 256kb of text. A standard use case would be to encode a task and its properties into a XML or JSON object as text and then have a worker unit read the queue and decode the message at the other end.

If you haven't done so already, sign up for Amazon Web Services (AWS) in order to generate your AWS Access Key and AWS Secret Key, which are your security credentials used to communicate with AWS. The next task is to get hold of an open soure Python AWS interface called Boto. Boto allows you to script communication with many aspects of AWS, including the Simple Queue Service (SQS), Simple Storage Service (S3) and the Elastic Compute Cloud (EC2) among other services. Note: Make sure to download the latest version of Boto and install via setuptools, as the binaries which ship with Ubuntu are out of date and do not possess certain methods utilised for EBS-booted instances.

Although Django enforces it, it is always good practice in any project (web or otherwise) to have a centralised settings.py file which stores all of your configuration options. In this file you will need to append your Amazon Web Services credentials as follows:

# settings.py
AWS_ACCESS_KEY_ID = '*************'
AWS_SECRET_ACCESS_KEY = '*************'

The next step is to use Boto to create a queue. We need to import the Boto library as well as the SQSConnection and Message objects. We also need to import our settings.py file, which should be in your project path.

A connection object is created by passing the two Amazon identity credentials as arguments to the SQSConnection constructor. The conn object can then be used to create a queue. The two parameters are the name of the queue (you can have multiple queues) and the message read-lock period (in seconds).

The message read-lock period is the duration through which a message will be unavailable to be read by an additional reader before it is sent back to the queue. This might be the case if the message task has failed to be completed, allowing another worker unit to take over after the read-lock period. Finally, we create a message object and write some text to the message body.

# queue_write.py
import boto
 
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
 
from myproject import settings
 
conn = SQSConnection(setings.AWS_ACCESS_KEY_ID,
                     settings.AWS_SECRET_ACCESS_KEY)
q = conn.create_queue('TestQueue', 120)
 
m = Message()
m.set_body('This is a test message')
q.write(m)

The above code will create a new queue called 'TestQueue', set its message read-lock period to 120 seconds and then add a test message. It is a good idea to make use of IPython to create a separate connection to the queue. This allows us to see if the message has arrived by running the count() method of the queue class. The queue is not instantaneous. There will be a delay in adding your message to the queue, so be patient. Note that "creating" a queue with the same name will return the current queue if it exists, rather than overwriting it.

In [1]: # Ipython console queue debug
 
In [2]: # --Import preamble as above--
 
In [3]: q = conn.create_queue('TestQueue', 120)
 
In [4]: q.count(m)
Out [4]: 1

We now want to connect to the queue with a separate process to read the message. In a production code, the contents of the message will be acted upon to perform the task. Once the task is completed the message will be deleted so that another unit does not perform the same task.

We create a result set (rs) object by pulling a message from the queue, which will be returned as a list of length 1 if there are any remaining messages. We then select the first item of the list and print out its contents, then delete the message. I've listed out the imports and definitions for completeness, but they are the same as before.

# queue_read.py
import boto
 
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
 
from myproject import settings
 
conn = SQSConnection(setings.AWS_ACCESS_KEY_ID,
                     settings.AWS_SECRET_ACCESS_KEY)
q = conn.create_queue('TestQueue', 120)
 
rs = q.get_messages()
m = rs[0]
print m.get_body()
q.delete_message(m)

You may be wondering why Boto returns the message information as a list, rather than a single object. This is because we can pull multiple messages from the queue in one go, which might provide optimisation benefits depending upon your particular messaging needs. Here I am obtaining 10 messages from my queue:

rs = get_messages(10)

That is the basic workflow for interacting with the queue. A solid production code would wrap the reading of the message and the task parsing into exception blocks to catch any unanticipated errors. In an implementation that I have created, I coded a class object which handles the task of encoding and decoding the JSON messages being sent to and from the queue.

Amazon SQS lends itself very nicely to stage environment testing as well. If you have a separate settings.py file for your stage setup, then you can put the name of your queue(s) in here, prefixed with 'Stage'. This allows you to run all tests against your stage queue, without affecting your production queue.

That about wraps up my introduction for Python, Boto and Simple Queue Service. For more information on the topics listed here it is worth visiting the following links:

3 comments ... read them below or add one
  • 18th November 2010
    3:28 pm

    Have you looked into what you'll do with the messages when you receive them and what you'll do with the tasks generally?

    We used SQS at a previous project but at this point are getting more value out of Celery and consequently can't use SQS (since Celery only uses AMQP backends like RabbitMQ). Any thoughts on that issue?

  • 19th November 2010
    3:58 am

    Yeah, I have a specific class that wraps the message opening and deleting to validate that the task is accurate and that it can be performed. There is definitely scope for a second article which expands on that somewhat.

    I was going to use Celery and Rabbit prior to using SQS. However, I had to turn around the project quickly. The costs of SQS are trivial for my work ($3-4 per month at most) and it was straightforward to setup. It's great being able to interact with it via Boto so easily.

    Which aspects of Celery/Rabbit do you think are better than SQS?

  • 26th September 2011
    12:51 pm

    0MQ is an even more flexible queuing backend -- depending on the project, this is another option you should consider! Thanks for the great overview of SQS/Boto. Cheers,