Skip to content

Commit

Permalink
WIP: [#42] Async batch publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
thoughtless committed Aug 5, 2014
1 parent a09d13f commit 78539b5
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 2 deletions.
2 changes: 1 addition & 1 deletion lib/firehose/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ module Client
autoload :Consumer, 'firehose/client/consumer'
autoload :Producer, 'firehose/client/producer'
end
end
end
21 changes: 20 additions & 1 deletion lib/firehose/client/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ module Client
module Producer
# Publish messages to Firehose via an HTTP interface.
class Http
autoload :Async, 'firehose/client/producer/async'

# A DSL for publishing requests. This doesn't so much, but lets us call
# Firehose::Client::Producer::Http#publish('message').to('channel'). Slick eh? If you don't like it,
# just all Firehose::Client::Producer::Http#put('message', 'channel')
# For async sending use Firehose::Client::Producer::Http#publish('message').async.to('channel')
# or Firehose::Client::Producer::Http#async_put('message', 'channel')
class Builder
def initialize(producer, message)
@producer, @message = producer, message
self
end

def to(channel, opts={}, &callback)
@producer.put(@message, channel, opts, &callback)
@producer.send(@method || :put, @message, channel, opts, &callback)
end

def async
@method = :async_put
self
end
end

Expand All @@ -26,6 +35,7 @@ def to(channel, opts={}, &callback)
def initialize(uri = Firehose::URI)
@uri = ::URI.parse(uri.to_s)
@uri.scheme ||= 'http'
@async = Async.new self
end

# A DSL for publishing messages.
Expand All @@ -42,6 +52,15 @@ def put(message, channel, opts, &block)
PutRequest.new(self, message, channel, opts, &block).process
end

# Publish the message in batches via a background process.
# Note that although the interface here looks identical to #put, the argument
# passed to the callback block will be slightly different. Since the
# messages are published in blocks, there is only a single response
# object. That is what will be passed back.
def async_put(message, channel, opts, &block)
@async.enqueue(message, channel, opts, &block)
end

# Handle errors that could happen while publishing a message.
def on_error(&block)
@error_handler = block
Expand Down
103 changes: 103 additions & 0 deletions lib/firehose/client/producer/async.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
require "thread"

module Firehose
module Client
module Producer
class Http
class Async
def initialize(producer)
@queue = Queue.new
@worker_mutex = Mutex.new
@worker = Worker.new @queue, producer

at_exit { @worker_thread && @worker_thread[:should_exit] = true }
end

# Although the options here are specified for each message, they can
# actually be more global. Specifically:
# * :timeout is global. The most recently specified timeout will be
# used.
# * :ttl is per channel. The most recently specified ttl for a given
# channel will be used.
def enqueue(message, channel, opts, &block)
ensure_worker_running
@queue << [message, channel, opts, block]
end

private

def ensure_worker_running
return if worker_running?
@worker_mutex.synchronize do
return if worker_running?
@worker_thread = Thread.new do
@worker.run
end
end
end

def worker_running?
@worker_thread && @worker_thread.alive?
end


class Worker
def initialize(queue, producer)
@queue = queue
@batch = []
@callbacks = []
@lock = Mutex.new
@producer = producer
end

def run
until Thread.current[:should_exit]
return if @queue.empty?

@lock.synchronize do
until @queue.empty?
@batch << @queue.pop
end
end


@producer.batch_publish(batch_data, :timeout => @timeout) do |response|
@callbacks.each do |callback|
callback.call response
end

@lock.synchronize do
@batch.clear
@callbacks.clear
end
end
end
end

def is_requesting?
@lock.synchronize { !@batch.empty? }
end

private

def batch_data
hash = {}
# We're mutating instance variables, so we need a lock to be safe.
@lock.synchronize do
@batch.each do |message, channel, opts, block|
@timeout = opts[:timeout] if opts[:timeout] # Most recent overwrites globally.
hash[channel] ||= {:messages => []}
hash[channel][:messages] << message
hash[channel][:ttl] = opts[:ttl] if opts[:ttl] # Most recent overwrites for each channel.
@callbacks << block if block
# TODO: Maybe we don't actually need to support blocks.
end
end
hash
end
end
end
end
end
end
end

0 comments on commit 78539b5

Please sign in to comment.