FIFO is a ruby queueing library built on top of Amazon SQS (Simple Queue Service). Like DelayedJob it encapsulates the common pattern of executing time consuming tasks in the background but unlike DelayedJob it doesn’t rely on a database.
- Built on Amazon’s reliable and scalable queue service.
- Connections to SQS are opened lazily negating any initial load time.
- Multiple queues can be used simultaneously.
- Doesn’t poll the database.
- Rails ActiveRecord objects maintain state through the queue.
- Built-in retry mechanism.
FIFO is extracted from the Mine (http://getmine.com) codebase. Here are some of the things we use it for:
- Sending emails
- Processing images
- Indexing
- Sharing to social networks
- Cache management
- Launching cron jobs
- Communication between disjoint parts of a complex application (cron, web, processing servers)
sudo gem install fifo
Setup the aws access id and aws secret key. They can be found here: https://portal.aws.amazon.com/gp/aws/securityCredentials
require 'fifo' FIFO::QueueManager.setup <aws_access_id>, <aws_secret_key>
Create a queue object. If a queue by this name doesn’t exist on the Amazon SQS account this will create one.
For Rails, this can be added to a file in “config/initializers/”. Additionally, suffixes can be added to queue names for modularity. For example, adding “_staging” to queue names can segregate queues for the staging enviroment or adding machine IPs to queue names can create local queues.
There is an additional parameter for message visibility. For more details see: http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/Welcome.html
queue = FIFO::Queue.new <queue_name>
To prevent delays at the start of the application SQS connections are opened lazily. This saves time when initialzing multiple queues:
queues = [] (1..10).each do |i| queues << FIFO::Queue.new <queue_name_i> end
The most basic case takes a Ruby object, a method to be called and a list of method parameters. Example of a ruby command and how to run it in the background:
"World".insert 0,"Hello " queue.push "World",:insert,0,"Hello "
FIFO is designed to work with ActiveRecord objects. Freshness of ActiveRecord objects is ensured by reloading them from the database when they’re popped from the queue. Consider an instance method to process images:
user.process_profile_image {:max_width => 200,:max_height => 150} queue.push user,:process_profile_image,{:max_width => 200,:max_height => 150}
FIFO also works with ActiveRecord class methods. For example, sending newsletters:
UserMailer.deliver_newsletter emails queue.push UserMailer,:deliver_newsletter,emails
Every item pushed into the queue is converted into a FIFO::Payload object which executes the passed method.
Basic flow of processing an item from the queue:
queue.push "Hello World",:length payload = queue.pop payload.process => 11
Retry functionality takes a failed payload and adds it back into the queue. Payload objects maintain total number of processing attempts.
queue.push "Hello World",:length begin payload = queue.pop payload.process rescue => ex payload.retry if payload.attempts < 3 end
Process all items in the queue:
while(true) payload = queue.pop break unless payload begin payload.process rescue => ex payload.retry if payload.attempts < 3 end end
Multiple queues can be used to simulate priority:
while(true) payload = queue.pop payload = queue1.pop unless payload payload = queue2.pop unless payload begin payload.process rescue => ex payload.retry if payload.attempts < 3 end sleep 10 end
The Payload flow can be entirely skipped by popping the raw response from the queue:
queue.pop :raw => true
sudo gem install daemons Install daemon_generator from: https://github.com/dougal/daemon_generator ./script/generate daemon <name>
require File.dirname(__FILE__) + "/../../config/environment" $running = true Signal.trap("TERM") do $running = false end FIFO::QueueManager.setup <aws_access_id>, <aws_secret_key> queue = FIFO::Queue.new <queue_name> logger = Logger.new(File.join(RAILS_ROOT,"log/<name>.rb.log")) while($running) do payload = queue.pop if payload begin start_time = Time.now payload.process end_time = Time.now logger.info "Finished #{payload.to_s} #{end_time - start_time}" rescue => ex if payload.attempts < 3 logger.info "Recovering #{payload.to_s}" payload.retry else #Log Exception end end end sleep 5 end
To control the state of the daemon:
lib/daemon/<name>_ctl <start|stop|run>
Things to know about Amazon’s SQS:
- The order of items isn’t always maintained. It’s not 100% fifo but close enough.
- There are multiple connection modes to SQS: per_request, per_thread, single
- http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/Welcome.html
FIFO is built by the wonderful people who make Mine: http://getmine.com