Skip to content

Commit

Permalink
Fixed processors to use appropriate config
Browse files Browse the repository at this point in the history
  • Loading branch information
mhuggins committed Apr 20, 2016
1 parent db5a8d1 commit 1e4b602
Show file tree
Hide file tree
Showing 16 changed files with 211 additions and 188 deletions.
5 changes: 2 additions & 3 deletions lib/circuitry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'circuitry/locks/noop'
require 'circuitry/locks/redis'
require 'circuitry/middleware/chain'
require 'circuitry/pool'
require 'circuitry/processor'
require 'circuitry/processors/batcher'
require 'circuitry/processors/forker'
Expand Down Expand Up @@ -46,9 +47,7 @@ def subscribe(options = {}, &block)
end

def flush
Processors.constants.each do |const|
Processors.const_get(const).flush
end
Pool.flush
end
end
end
10 changes: 6 additions & 4 deletions lib/circuitry/concerns/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ def default_async_strategy
end

def process_asynchronously(&block)
send(:"process_via_#{async}", &block)
processor = send(:"process_via_#{async}", &block)
processor.process
Pool << processor
end

def async=(value)
Expand Down Expand Up @@ -58,15 +60,15 @@ def platform_supports_forking?
end

def process_via_fork(&block)
Processors::Forker.process(&block)
Processors::Forker.new(config, &block)
end

def process_via_thread(&block)
Processors::Threader.process(&block)
Processors::Threader.new(config, &block)
end

def process_via_batch(&block)
Processors::Batcher.process(&block)
Processors::Batcher.new(config, &block)
end
end
end
Expand Down
34 changes: 34 additions & 0 deletions lib/circuitry/pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module Circuitry
module Pool
class << self
def <<(processor)
raise ArgumentError, 'processor must be a Circuitry::Processor' unless processor.is_a?(Circuitry::Processor)
pool << processor
end

def flush
while (processor = pool.shift)
processor.wait
end
end

def size
pool.size
end

def empty?
pool.empty?
end

def any?
pool.any?
end

private

def pool
@pool ||= []
end
end
end
end
39 changes: 24 additions & 15 deletions lib/circuitry/processor.rb
Original file line number Diff line number Diff line change
@@ -1,38 +1,47 @@
module Circuitry
module Processor
def process(&_block)
raise NotImplementedError, "#{self} must implement class method `process`"
class Processor
attr_reader :config, :block

def initialize(config, &block)
raise ArgumentError, 'no block given' unless block_given?

self.config = config
self.block = block
end

def flush
raise NotImplementedError, "#{self} must implement class method `flush`"
def process
raise NotImplementedError, "#{self} must implement instance method `process`"
end

def on_exit
Circuitry.subscriber_config.on_async_exit
def wait
raise NotImplementedError, "#{self} must implement instance method `wait`"
end

protected

def safely_process
yield
def safely_process(&block)
block.call
rescue => e
logger.error("Error handling message: #{e}")
error_handler.call(e) if error_handler
end

def pool
@pool ||= []
ensure
on_exit.call if on_exit
end

private

attr_writer :config, :block

def logger
Circuitry.subscriber_config.logger
config.logger
end

def error_handler
Circuitry.subscriber_config.error_handler
config.error_handler
end

def on_exit
config.on_async_exit
end
end
end
19 changes: 6 additions & 13 deletions lib/circuitry/processors/batcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@

module Circuitry
module Processors
module Batcher
class << self
include Processor

def process(&block)
raise ArgumentError, 'no block given' unless block_given?
pool << block
end
class Batcher < Processor
def process
# noop
end

def flush
while (block = pool.shift)
safely_process(&block)
end
end
def wait
safely_process(&block)
end
end
end
Expand Down
22 changes: 11 additions & 11 deletions lib/circuitry/processors/forker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

module Circuitry
module Processors
module Forker
class << self
include Processor
class Forker < Processor
def process
Process.detach(pid)
end

def process(&block)
pid = fork do
safely_process(&block)
on_exit.call if on_exit
end
def wait
# noop
end

Process.detach(pid)
end
private

def flush
def pid
@pid ||= fork do
safely_process(&block)
end
end
end
Expand Down
25 changes: 11 additions & 14 deletions lib/circuitry/processors/threader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,20 @@

module Circuitry
module Processors
module Threader
class << self
include Processor
class Threader < Processor
def process
thread
end

def process(&block)
raise ArgumentError, 'no block given' unless block_given?
def wait
thread.join
end

pool << Thread.new do
safely_process(&block)
on_exit.call if on_exit
end
end
private

def flush
pool.each(&:join)
ensure
pool.clear
def thread
@thread ||= Thread.new do
safely_process(&block)
end
end
end
Expand Down
12 changes: 8 additions & 4 deletions lib/circuitry/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,26 @@ def publish_message(topic_name, message)
end
end

attr_writer :timeout
def config
Circuitry.publisher_config
end

private

attr_writer :timeout

def logger
Circuitry.publisher_config.logger
config.logger
end

def can_publish?
Circuitry.publisher_config.aws_options.values.all? do |value|
config.aws_options.values.all? do |value|
!value.nil? && !value.empty?
end
end

def middleware
Circuitry.publisher_config.middleware
config.middleware
end
end
end
16 changes: 10 additions & 6 deletions lib/circuitry/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def initialize(options = {})
options = DEFAULT_OPTIONS.merge(options)

self.subscribed = false
self.queue = Queue.find(Circuitry.subscriber_config.queue_name).url
self.queue = Queue.find(config.queue_name).url

%i[lock async timeout wait_time batch_size].each do |sym|
send(:"#{sym}=", options[sym])
Expand Down Expand Up @@ -74,7 +74,7 @@ def self.default_async_strategy

def lock=(value)
value = case value
when true then Circuitry.subscriber_config.lock_strategy
when true then config.lock_strategy
when false then Circuitry::Locks::NOOP.new
when Circuitry::Locks::Base then value
else raise ArgumentError, lock_value_error(value)
Expand All @@ -83,6 +83,10 @@ def lock=(value)
@lock = value
end

def config
Circuitry.subscriber_config
end

private

def lock_value_error(value)
Expand Down Expand Up @@ -183,21 +187,21 @@ def delete_message(message)
end

def logger
Circuitry.subscriber_config.logger
config.logger
end

def error_handler
Circuitry.subscriber_config.error_handler
config.error_handler
end

def can_subscribe?
Circuitry.subscriber_config.aws_options.values.all? do |value|
config.aws_options.values.all? do |value|
!value.nil? && !value.empty?
end
end

def middleware
Circuitry.subscriber_config.middleware
config.middleware
end
end
end
Loading

0 comments on commit 1e4b602

Please sign in to comment.