SQS to the rescue

tl;dr Amazon SQS is currently one of the best options for a queuing service. It's inexpensive (first 1 million requests per month are free, thereafter $0.50 per 1 million requests), fast, reliable, scalable, fully managed message queuing service. It just works.

For the follow examples I'm using the aws-sdk-ruby, which is a great SDK, because it's transparent to the official API documentation, no abstractions, no different method names etc, what you see on the documentation is what you get on the sdk.

Getting your hands dirty

Let's begin by bootstrapping a queue using the create method:

sqs.queues.create 'myqueue', visibility_timeout: 600, message_retention_period: 345600

Then let's send a message using the send_message method:

sqs.queues.named('myqueue').send_message 'HELLO'

Finally let's consume a message using the receive_message method:

sqs_msg = sqs.queues.named('myqueue').receive_message
do_something(sqs_msg)
sqs_msg.delete

Catching things up - we created a queue that holds messages for 4 days message_retention_period: 345600, which means: if we don't consume a message within 4 days the message will be lost.

The visibility_timeout is the time in seconds we have to consume a message after its reception. When we send a message to SQS send_message, the message is listed on "Messages Available", when we receive a message receive_message the message is moved to "Messages in Flight", if we don't delete the message within 600 seconds (which is the visibility timeout we configured above) the message will be moved back to "Messages Available" and become ready to be consumed again. Read more about the SQS Message Lifecycle.

Be generous while configuring the default visibility_timeout for a queue. If your worker in the worst case takes 2 minutes to consume a message, set the visibility_timeout to at least 4 minutes. It doesn't hurt and it will be better than having the same message being consumed more than the expected.

Auto retrying errors

begin
  sqs_msg = sqs.queues.named('myqueue').receive_message
  do_something(sqs_msg)
  sqs_msg.delete
rescue => e
  error e
end

In the code above if do_something raises an exception, the message will become available again as soon as its visibility_timeout expires, so other workers will be able to re-attempt the message.

This behaviour will also cover in case your process or server crashes. Basically if you don't call sqs_msg.delete, the message will become available again no matter what :)

Dead Letter Queues

Let's retry stuff, but not forever, right? Sometimes no matter how many times we re-attempt a message, it will continue failing. To do not discard or retry forever these messages addicted to failure, we can let them rest in peace using a Dead Letter Queue .

The code below creates myqueue_failures and associates it with myqueue as a dead letter queue:

dl_queue = sqs.queues.create('myqueue_failures')

sqs.queues.create('myqueue', redrive_policy: %Q{ {"maxReceiveCount":"3", "deadLetterTargetArn":"#{dl_queue.arn}"}" })

The redrive_policy above tells SQS to move a message from myqueue to myqueue_failures if its receive count reaches 3.

Delaying a message perform_in

SQS supports delaying a message up to 15 minutes, before it becomes available to be consumed using the delay_seconds option:

sqs.queues.named('myqueue').send_message 'HELLO', delay_seconds: 60

But as we are all hackerz and we know that the visibility_timeout supports up to 12 hours, we can use it to extend the delay.

Sending a message with extended delay

perform_at = Time.now + 1.hour # max: 12.hours

sqs.queues.named('myqueue').send_message 'H311O', 
  message_attributes: { 
    'perform_at' => { string_value: perform_at.to_s, 
                      data_type: 'String' }

Receiving a message with extended delay

sqs_msg = sqs.queues.named('myqueue').receive_message, message_attribute_names: ['perform_at']

delay = Time.parse(sqs_msg.message_attributes['perform_at'][:string_value]) - Time.now

if delay > 0
  sqs_msg.visibility_timeout = delay.to_i
else
  do_something(sqs_msg)
  sqs_msg.delete
end

Be careful with this workaround, because it will increase the message receive count.

1 million != 1 million

first 1 million requests per month are free, thereafter $0.50 per 1 million requests)

1 million requests doesn't mean 1 million jobs consumed, as we need at least 3 requests to fully consume a message:

  1. send_message
  2. receive_message
  3. sqs_msg.delete

Although these requests can be executed in batches up to 10 messages, you will need all of them to consume a message, and even if you are using the poll method with a high wait_time_seconds, you will probably make some empty requests while looking for new messages.

SNS to SQS

SNS distributes messages across multiple SQS queues and more.

Sending the same msg to my_queue1 and my_queue2 using SQS:

sqs.queues.named('my_queue1').send_message(msg)
sqs.queues.named('my_queue2').send_message(msg)

Using SNS to SQS:

topic.publish(msg)
# sends to my_queue1 and my_queue2

SNS will fanout the message to both queues, consequently you will pay (SNS to SQS is free) and wait for only one request to SNS, instead of two to SQS.

Trouble in paradise

SQS is really good, the Ruby SDK is great, but… how to consume messages continuously?

# myqueue_worker.rb
sqs.queues.named('myqueue').poll { | sqs_msg | do_something(sqs_msg) }

Then:

ruby myqueue_worker.rb

Doesn't seem to be very reliable, right? No threads, consumes messages one by one, and if you need to consume from other queue you will need to start a new process ruby myotherqueue_worker.rb, which is waste of resources when one queue is empty and others have messages available.

Introducing Shoryuken

Shoryuken is a project based on Sidekiq, that uses all the cool stuff Sidekiq did with processes, Celluloid, cli etc, but for SQS.

No more ruby myotherqueue_worker.rb, Shoryuken will handle that for you, including signal handling.

A single Shoryuken process can consume messages from multiple queues, load balancing the consumption.

It's transparent to SQS, it passes the ReceivedMessage to the workers, no abstractions, no Rescue compatibility, SQS as is!

It accepts multiple workers per queue.

Check more on the project's README.

Conclusion

SQS is cheap, you don't need to worry about managing your queue services, setup Redis etc.

Keep your focus on your workers/jobs (which should be the most important to you) and let Amazon to take care of the infrastructure, it works!