From 78539b523f6815475db84963787f6bbeacb964c0 Mon Sep 17 00:00:00 2001 From: Paul Cortens Date: Tue, 5 Aug 2014 01:00:48 -0700 Subject: [PATCH] WIP: [#42] Async batch publishing --- lib/firehose/client.rb | 2 +- lib/firehose/client/producer.rb | 21 +++++- lib/firehose/client/producer/async.rb | 103 ++++++++++++++++++++++++++ 3 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 lib/firehose/client/producer/async.rb diff --git a/lib/firehose/client.rb b/lib/firehose/client.rb index cf270a2..81cdd81 100644 --- a/lib/firehose/client.rb +++ b/lib/firehose/client.rb @@ -7,4 +7,4 @@ module Client autoload :Consumer, 'firehose/client/consumer' autoload :Producer, 'firehose/client/producer' end -end \ No newline at end of file +end diff --git a/lib/firehose/client/producer.rb b/lib/firehose/client/producer.rb index a469a2d..8ed3015 100644 --- a/lib/firehose/client/producer.rb +++ b/lib/firehose/client/producer.rb @@ -6,9 +6,13 @@ module Client module Producer # Publish messages to Firehose via an HTTP interface. class Http + autoload :Async, 'firehose/client/producer/async' + # A DSL for publishing requests. This doesn't so much, but lets us call # Firehose::Client::Producer::Http#publish('message').to('channel'). Slick eh? If you don't like it, # just all Firehose::Client::Producer::Http#put('message', 'channel') + # For async sending use Firehose::Client::Producer::Http#publish('message').async.to('channel') + # or Firehose::Client::Producer::Http#async_put('message', 'channel') class Builder def initialize(producer, message) @producer, @message = producer, message @@ -16,7 +20,12 @@ def initialize(producer, message) end def to(channel, opts={}, &callback) - @producer.put(@message, channel, opts, &callback) + @producer.send(@method || :put, @message, channel, opts, &callback) + end + + def async + @method = :async_put + self end end @@ -26,6 +35,7 @@ def to(channel, opts={}, &callback) def initialize(uri = Firehose::URI) @uri = ::URI.parse(uri.to_s) @uri.scheme ||= 'http' + @async = Async.new self end # A DSL for publishing messages. @@ -42,6 +52,15 @@ def put(message, channel, opts, &block) PutRequest.new(self, message, channel, opts, &block).process end + # Publish the message in batches via a background process. + # Note that although the interface here looks identical to #put, the argument + # passed to the callback block will be slightly different. Since the + # messages are published in blocks, there is only a single response + # object. That is what will be passed back. + def async_put(message, channel, opts, &block) + @async.enqueue(message, channel, opts, &block) + end + # Handle errors that could happen while publishing a message. def on_error(&block) @error_handler = block diff --git a/lib/firehose/client/producer/async.rb b/lib/firehose/client/producer/async.rb new file mode 100644 index 0000000..970317b --- /dev/null +++ b/lib/firehose/client/producer/async.rb @@ -0,0 +1,103 @@ +require "thread" + +module Firehose + module Client + module Producer + class Http + class Async + def initialize(producer) + @queue = Queue.new + @worker_mutex = Mutex.new + @worker = Worker.new @queue, producer + + at_exit { @worker_thread && @worker_thread[:should_exit] = true } + end + + # Although the options here are specified for each message, they can + # actually be more global. Specifically: + # * :timeout is global. The most recently specified timeout will be + # used. + # * :ttl is per channel. The most recently specified ttl for a given + # channel will be used. + def enqueue(message, channel, opts, &block) + ensure_worker_running + @queue << [message, channel, opts, block] + end + + private + + def ensure_worker_running + return if worker_running? + @worker_mutex.synchronize do + return if worker_running? + @worker_thread = Thread.new do + @worker.run + end + end + end + + def worker_running? + @worker_thread && @worker_thread.alive? + end + + + class Worker + def initialize(queue, producer) + @queue = queue + @batch = [] + @callbacks = [] + @lock = Mutex.new + @producer = producer + end + + def run + until Thread.current[:should_exit] + return if @queue.empty? + + @lock.synchronize do + until @queue.empty? + @batch << @queue.pop + end + end + + + @producer.batch_publish(batch_data, :timeout => @timeout) do |response| + @callbacks.each do |callback| + callback.call response + end + + @lock.synchronize do + @batch.clear + @callbacks.clear + end + end + end + end + + def is_requesting? + @lock.synchronize { !@batch.empty? } + end + + private + + def batch_data + hash = {} + # We're mutating instance variables, so we need a lock to be safe. + @lock.synchronize do + @batch.each do |message, channel, opts, block| + @timeout = opts[:timeout] if opts[:timeout] # Most recent overwrites globally. + hash[channel] ||= {:messages => []} + hash[channel][:messages] << message + hash[channel][:ttl] = opts[:ttl] if opts[:ttl] # Most recent overwrites for each channel. + @callbacks << block if block + # TODO: Maybe we don't actually need to support blocks. + end + end + hash + end + end + end + end + end + end +end