From 1e4b602038ba016030630ee13e4bf9acaf5e2208 Mon Sep 17 00:00:00 2001 From: mhuggins Date: Tue, 12 Apr 2016 22:42:39 -0600 Subject: [PATCH] Fixed processors to use appropriate config --- lib/circuitry.rb | 5 +- lib/circuitry/concerns/async.rb | 10 ++-- lib/circuitry/pool.rb | 34 +++++++++++ lib/circuitry/processor.rb | 39 +++++++----- lib/circuitry/processors/batcher.rb | 19 ++---- lib/circuitry/processors/forker.rb | 22 +++---- lib/circuitry/processors/threader.rb | 25 ++++---- lib/circuitry/publisher.rb | 12 ++-- lib/circuitry/subscriber.rb | 16 +++-- spec/circuitry/concerns/async_spec.rb | 45 ++++++++++---- spec/circuitry/processor_spec.rb | 59 +++++++++---------- spec/circuitry/processors/batcher_spec.rb | 39 ++++-------- spec/circuitry/processors/forker_spec.rb | 16 ++--- spec/circuitry/processors/threader_spec.rb | 38 +++++------- spec/circuitry_spec.rb | 14 +---- .../examples/async_processor_examples.rb | 6 +- 16 files changed, 211 insertions(+), 188 deletions(-) create mode 100644 lib/circuitry/pool.rb diff --git a/lib/circuitry.rb b/lib/circuitry.rb index ac99d0c..b4f392b 100644 --- a/lib/circuitry.rb +++ b/lib/circuitry.rb @@ -7,6 +7,7 @@ require 'circuitry/locks/noop' require 'circuitry/locks/redis' require 'circuitry/middleware/chain' +require 'circuitry/pool' require 'circuitry/processor' require 'circuitry/processors/batcher' require 'circuitry/processors/forker' @@ -46,9 +47,7 @@ def subscribe(options = {}, &block) end def flush - Processors.constants.each do |const| - Processors.const_get(const).flush - end + Pool.flush end end end diff --git a/lib/circuitry/concerns/async.rb b/lib/circuitry/concerns/async.rb index c92b7ea..373aa4b 100644 --- a/lib/circuitry/concerns/async.rb +++ b/lib/circuitry/concerns/async.rb @@ -24,7 +24,9 @@ def default_async_strategy end def process_asynchronously(&block) - send(:"process_via_#{async}", &block) + processor = send(:"process_via_#{async}", &block) + processor.process + Pool << processor end def async=(value) @@ -58,15 +60,15 @@ def platform_supports_forking? end def process_via_fork(&block) - Processors::Forker.process(&block) + Processors::Forker.new(config, &block) end def process_via_thread(&block) - Processors::Threader.process(&block) + Processors::Threader.new(config, &block) end def process_via_batch(&block) - Processors::Batcher.process(&block) + Processors::Batcher.new(config, &block) end end end diff --git a/lib/circuitry/pool.rb b/lib/circuitry/pool.rb new file mode 100644 index 0000000..c9fc86f --- /dev/null +++ b/lib/circuitry/pool.rb @@ -0,0 +1,34 @@ +module Circuitry + module Pool + class << self + def <<(processor) + raise ArgumentError, 'processor must be a Circuitry::Processor' unless processor.is_a?(Circuitry::Processor) + pool << processor + end + + def flush + while (processor = pool.shift) + processor.wait + end + end + + def size + pool.size + end + + def empty? + pool.empty? + end + + def any? + pool.any? + end + + private + + def pool + @pool ||= [] + end + end + end +end diff --git a/lib/circuitry/processor.rb b/lib/circuitry/processor.rb index 91afc6a..f98cbc5 100644 --- a/lib/circuitry/processor.rb +++ b/lib/circuitry/processor.rb @@ -1,38 +1,47 @@ module Circuitry - module Processor - def process(&_block) - raise NotImplementedError, "#{self} must implement class method `process`" + class Processor + attr_reader :config, :block + + def initialize(config, &block) + raise ArgumentError, 'no block given' unless block_given? + + self.config = config + self.block = block end - def flush - raise NotImplementedError, "#{self} must implement class method `flush`" + def process + raise NotImplementedError, "#{self} must implement instance method `process`" end - def on_exit - Circuitry.subscriber_config.on_async_exit + def wait + raise NotImplementedError, "#{self} must implement instance method `wait`" end protected - def safely_process - yield + def safely_process(&block) + block.call rescue => e logger.error("Error handling message: #{e}") error_handler.call(e) if error_handler - end - - def pool - @pool ||= [] + ensure + on_exit.call if on_exit end private + attr_writer :config, :block + def logger - Circuitry.subscriber_config.logger + config.logger end def error_handler - Circuitry.subscriber_config.error_handler + config.error_handler + end + + def on_exit + config.on_async_exit end end end diff --git a/lib/circuitry/processors/batcher.rb b/lib/circuitry/processors/batcher.rb index 84241cf..9c0d2d8 100644 --- a/lib/circuitry/processors/batcher.rb +++ b/lib/circuitry/processors/batcher.rb @@ -2,20 +2,13 @@ module Circuitry module Processors - module Batcher - class << self - include Processor - - def process(&block) - raise ArgumentError, 'no block given' unless block_given? - pool << block - end + class Batcher < Processor + def process + # noop + end - def flush - while (block = pool.shift) - safely_process(&block) - end - end + def wait + safely_process(&block) end end end diff --git a/lib/circuitry/processors/forker.rb b/lib/circuitry/processors/forker.rb index 4a0d10e..1b490be 100644 --- a/lib/circuitry/processors/forker.rb +++ b/lib/circuitry/processors/forker.rb @@ -2,20 +2,20 @@ module Circuitry module Processors - module Forker - class << self - include Processor + class Forker < Processor + def process + Process.detach(pid) + end - def process(&block) - pid = fork do - safely_process(&block) - on_exit.call if on_exit - end + def wait + # noop + end - Process.detach(pid) - end + private - def flush + def pid + @pid ||= fork do + safely_process(&block) end end end diff --git a/lib/circuitry/processors/threader.rb b/lib/circuitry/processors/threader.rb index 2220538..3f9cc63 100644 --- a/lib/circuitry/processors/threader.rb +++ b/lib/circuitry/processors/threader.rb @@ -2,23 +2,20 @@ module Circuitry module Processors - module Threader - class << self - include Processor + class Threader < Processor + def process + thread + end - def process(&block) - raise ArgumentError, 'no block given' unless block_given? + def wait + thread.join + end - pool << Thread.new do - safely_process(&block) - on_exit.call if on_exit - end - end + private - def flush - pool.each(&:join) - ensure - pool.clear + def thread + @thread ||= Thread.new do + safely_process(&block) end end end diff --git a/lib/circuitry/publisher.rb b/lib/circuitry/publisher.rb index 2629d03..87aa033 100644 --- a/lib/circuitry/publisher.rb +++ b/lib/circuitry/publisher.rb @@ -68,22 +68,26 @@ def publish_message(topic_name, message) end end - attr_writer :timeout + def config + Circuitry.publisher_config + end private + attr_writer :timeout + def logger - Circuitry.publisher_config.logger + config.logger end def can_publish? - Circuitry.publisher_config.aws_options.values.all? do |value| + config.aws_options.values.all? do |value| !value.nil? && !value.empty? end end def middleware - Circuitry.publisher_config.middleware + config.middleware end end end diff --git a/lib/circuitry/subscriber.rb b/lib/circuitry/subscriber.rb index c0a8c84..220ffd3 100644 --- a/lib/circuitry/subscriber.rb +++ b/lib/circuitry/subscriber.rb @@ -30,7 +30,7 @@ def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) self.subscribed = false - self.queue = Queue.find(Circuitry.subscriber_config.queue_name).url + self.queue = Queue.find(config.queue_name).url %i[lock async timeout wait_time batch_size].each do |sym| send(:"#{sym}=", options[sym]) @@ -74,7 +74,7 @@ def self.default_async_strategy def lock=(value) value = case value - when true then Circuitry.subscriber_config.lock_strategy + when true then config.lock_strategy when false then Circuitry::Locks::NOOP.new when Circuitry::Locks::Base then value else raise ArgumentError, lock_value_error(value) @@ -83,6 +83,10 @@ def lock=(value) @lock = value end + def config + Circuitry.subscriber_config + end + private def lock_value_error(value) @@ -183,21 +187,21 @@ def delete_message(message) end def logger - Circuitry.subscriber_config.logger + config.logger end def error_handler - Circuitry.subscriber_config.error_handler + config.error_handler end def can_subscribe? - Circuitry.subscriber_config.aws_options.values.all? do |value| + config.aws_options.values.all? do |value| !value.nil? && !value.empty? end end def middleware - Circuitry.subscriber_config.middleware + config.middleware end end end diff --git a/spec/circuitry/concerns/async_spec.rb b/spec/circuitry/concerns/async_spec.rb index d7203ac..dc5c63b 100644 --- a/spec/circuitry/concerns/async_spec.rb +++ b/spec/circuitry/concerns/async_spec.rb @@ -10,10 +10,18 @@ def self.default_async_strategy def self.async_strategies [:fork, :thread, :batch] end + + def config + Circuitry.subscriber_config + end end incomplete_async_class = Class.new do include Circuitry::Concerns::Async + + def config + Circuitry.subscriber_config + end end RSpec.describe Circuitry::Concerns::Async, type: :model do @@ -87,39 +95,50 @@ def self.async_strategies end describe '#process_asynchronously' do - let(:block) { ->{ } } + let(:block) { ->(_) { } } + let(:processor) { double('Circuitry::Processor', process: nil, wait: nil, is_a?: true) } - describe 'via forking' do + shared_examples_for 'an asynchronous processor' do before do - allow(subject).to receive(:async).and_return(:fork) + allow(Circuitry::Pool).to receive(:<<) end - it 'delegates to fork processor' do - expect(Circuitry::Processors::Forker).to receive(:process).with(no_args, &block) + it 'delegates to the processor' do subject.process_asynchronously(&block) + expect(processor).to have_received(:process) + end + + it 'adds the processor to the pool' do + subject.process_asynchronously(&block) + expect(Circuitry::Pool).to have_received(:<<).with(processor) + end + end + + describe 'via forking' do + before do + allow(subject).to receive(:async).and_return(:fork) + allow(Circuitry::Processors::Forker).to receive(:new).with(any_args, &block).and_return(processor) end + + it_behaves_like 'an asynchronous processor' end describe 'via threading' do before do allow(subject).to receive(:async).and_return(:thread) + allow(Circuitry::Processors::Threader).to receive(:new).with(any_args, &block).and_return(processor) end - it 'delegates to thread processor' do - expect(Circuitry::Processors::Threader).to receive(:process).with(no_args, &block) - subject.process_asynchronously(&block) - end + it_behaves_like 'an asynchronous processor' end describe 'via batching' do before do allow(subject).to receive(:async).and_return(:batch) + allow(Circuitry::Processors::Batcher).to receive(:new).with(any_args, &block).and_return(processor) end - it 'delegates to batch processor' do - expect(Circuitry::Processors::Batcher).to receive(:process).with(no_args, &block) - subject.process_asynchronously(&block) - end + it_behaves_like 'an asynchronous processor' end end end diff --git a/spec/circuitry/processor_spec.rb b/spec/circuitry/processor_spec.rb index 06d146f..3085810 100644 --- a/spec/circuitry/processor_spec.rb +++ b/spec/circuitry/processor_spec.rb @@ -1,43 +1,40 @@ require 'spec_helper' -processor_class = Class.new do - include Circuitry::Processor - - def process(&block) +processor_class = Class.new(Circuitry::Processor) do + def process block.call end - def flush - pool.clear + def wait + # noop end end -incomplete_processor_class = Class.new do - include Circuitry::Processor -end +incomplete_processor_class = Class.new(Circuitry::Processor) RSpec.describe Circuitry::Processor, type: :model do - subject { processor_class.new } + subject { processor_class.new(config, &block) } - describe '.process' do - let(:block) { ->{ } } + let(:config) { double('Circuitry::PublisherConfig', logger: nil, error_handler: nil, on_async_exit: nil) } + let(:block) { ->{ } } + describe '#process' do describe 'when the class has defined process' do it 'raises an error' do - expect { subject.process(&block) }.to_not raise_error + expect { subject.process }.to_not raise_error end end describe 'when the class has not defined process' do - subject { incomplete_processor_class.new } + subject { incomplete_processor_class.new(config, &block) } it 'raises an error' do - expect { subject.process(&block) }.to raise_error(NotImplementedError) + expect { subject.process }.to raise_error(NotImplementedError) end end end - describe '.safely_process' do + describe '#safely_process' do def process subject.send(:safely_process, &block) end @@ -46,7 +43,7 @@ def process let(:block) { ->{ raise StandardError } } before do - allow(Circuitry.subscriber_config.logger).to receive(:error) + allow(config.logger).to receive(:error) end it 'does not re-raise the error' do @@ -55,19 +52,19 @@ def process it 'logs an error' do process - expect(Circuitry.subscriber_config.logger).to have_received(:error) + expect(config.logger).to have_received(:error) end describe 'when an error handler is defined' do let(:error_handler) { double('Proc', call: true) } before do - allow(Circuitry.subscriber_config).to receive(:error_handler).and_return(error_handler) + allow(config).to receive(:error_handler).and_return(error_handler) end it 'handles the error' do process - expect(Circuitry.subscriber_config.error_handler).to have_received(:call) + expect(config.error_handler).to have_received(:call) end end @@ -76,45 +73,43 @@ def process before do allow_message_expectations_on_nil - allow(Circuitry.subscriber_config).to receive(:error_handler).and_return(error_handler) + allow(config).to receive(:error_handler).and_return(error_handler) allow(error_handler).to receive(:call) end it 'does not handle the error' do process - expect(Circuitry.subscriber_config.error_handler).to_not have_received(:call) + expect(config.error_handler).to_not have_received(:call) end end end describe 'when the block does not raise an error' do - let(:block) { ->{ } } - it 'does not log an error' do - expect(Circuitry.subscriber_config.logger).to_not receive(:error) + expect(config.logger).to_not receive(:error) process end it 'does not handle an error' do allow_message_expectations_on_nil - expect(Circuitry.subscriber_config.error_handler).to_not receive(:call) + expect(config.error_handler).to_not receive(:call) process end end end - describe '#flush' do - describe 'when the class has defined flush' do + describe '#wait' do + describe 'when the class has defined wait' do it 'does not raise an error' do - expect { subject.flush }.to_not raise_error + expect { subject.wait }.to_not raise_error end end - describe 'when the class has not defined flush' do - subject { incomplete_processor_class.new } + describe 'when the class has not defined wait' do + subject { incomplete_processor_class.new(config, &block) } it 'raises an error' do - expect { subject.flush }.to raise_error(NotImplementedError) + expect { subject.wait }.to raise_error(NotImplementedError) end end end diff --git a/spec/circuitry/processors/batcher_spec.rb b/spec/circuitry/processors/batcher_spec.rb index 8315743..e55759d 100644 --- a/spec/circuitry/processors/batcher_spec.rb +++ b/spec/circuitry/processors/batcher_spec.rb @@ -1,39 +1,24 @@ require 'spec_helper' RSpec.describe Circuitry::Processors::Batcher, type: :model do - subject { described_class } + subject { described_class.new(config, &block) } - it { is_expected.to be_a Circuitry::Processor } - - describe '.batch' do - let(:pool) { double('Array', '<<' => []) } - let(:block) { -> {} } + let(:config) { double('Circuitry::PublisherConfig', logger: nil, error_handler: nil, on_async_exit: nil) } + let(:block) { -> {} } - before do - allow(subject).to receive(:pool).and_return(pool) - end + it { is_expected.to be_a Circuitry::Processor } - it 'adds the block to the pool' do - subject.process(&block) - expect(pool).to have_received(:<<).with(block) + describe '#process' do + it 'does not call the block' do + expect(block).to_not receive(:call) + subject.process end end - describe '.flush' do - let(:pool) { [-> {}, -> {}] } - - before do - allow(subject).to receive(:pool).and_return(pool) - end - - it 'calls each block' do - subject.flush - pool.each { |block| expect(block).to have_received(:call) } - end - - it 'clears the pool' do - subject.flush - expect(pool).to be_empty + describe '#wait' do + it 'calls the block' do + expect(block).to receive(:call) + subject.wait end end end diff --git a/spec/circuitry/processors/forker_spec.rb b/spec/circuitry/processors/forker_spec.rb index 83a56b4..47a71c7 100644 --- a/spec/circuitry/processors/forker_spec.rb +++ b/spec/circuitry/processors/forker_spec.rb @@ -1,35 +1,37 @@ require 'spec_helper' RSpec.describe Circuitry::Processors::Forker, type: :model do - subject { described_class } + subject { described_class.new(config, &block) } + + let(:config) { double('Circuitry::PublisherConfig', logger: nil, error_handler: nil, on_async_exit: nil) } + let(:block) { ->{ } } it { is_expected.to be_a Circuitry::Processor } it_behaves_like 'an asyncronous processor' - describe '.fork' do + describe '#process' do before do allow(subject).to receive(:fork).and_return(pid) allow(Process).to receive(:detach) end let(:pid) { 'pid' } - let(:block) { ->{ } } it 'forks a process' do - subject.process(&block) + subject.process expect(subject).to have_received(:fork) end it 'detaches the forked process' do - subject.process(&block) + subject.process expect(Process).to have_received(:detach).with(pid) end end - describe '.flush' do + describe '#wait' do it 'does nothing' do - expect { subject.flush }.to_not raise_error + expect { subject.wait }.to_not raise_error end end end diff --git a/spec/circuitry/processors/threader_spec.rb b/spec/circuitry/processors/threader_spec.rb index 4fa3a38..1ca54ba 100644 --- a/spec/circuitry/processors/threader_spec.rb +++ b/spec/circuitry/processors/threader_spec.rb @@ -1,48 +1,38 @@ require 'spec_helper' RSpec.describe Circuitry::Processors::Threader, type: :model do - subject { described_class } + subject { described_class.new(config, &block) } + + let(:config) { double('Circuitry::PublisherConfig', logger: nil, error_handler: nil, on_async_exit: nil) } + let(:block) { ->{ } } it { is_expected.to be_a Circuitry::Processor } it_behaves_like 'an asyncronous processor' - describe '.process' do - let(:pool) { double('Array', '<<': []) } - let(:block) { ->{ } } - let(:thread) { double('Thread', join: true) } - + describe '#process' do before do - allow(subject).to receive(:pool).and_return(pool) allow(Thread).to receive(:new).and_return(thread) end + let(:thread) { double('Thread', join: true) } + it 'wraps the block in a thread' do - subject.process(&block) + subject.process expect(Thread).to have_received(:new).with(no_args, &block) end - - it 'adds the thread to the pool' do - subject.process(&block) - expect(pool).to have_received(:<<).with(thread) - end end - describe '.flush' do - let(:pool) { [double('Thread', join: true), double('Thread', join: true)] } - + describe '#wait' do before do - allow(subject).to receive(:pool).and_return(pool) + allow(Thread).to receive(:new).and_return(thread) end - it 'joins each thread' do - subject.flush - pool.each { |thread| expect(thread).to have_received(:join) } - end + let(:thread) { double('Thread', join: true) } - it 'clears the pool' do - subject.flush - expect(pool).to be_empty + it 'joins the thread' do + subject.wait + expect(thread).to have_received(:join) end end end diff --git a/spec/circuitry_spec.rb b/spec/circuitry_spec.rb index 5628e1e..e184e4e 100644 --- a/spec/circuitry_spec.rb +++ b/spec/circuitry_spec.rb @@ -58,18 +58,8 @@ end describe '.flush' do - it 'flushes batches' do - expect(Circuitry::Processors::Batcher).to receive(:flush) - subject.flush - end - - it 'flushes forks' do - expect(Circuitry::Processors::Forker).to receive(:flush) - subject.flush - end - - it 'flushes threads' do - expect(Circuitry::Processors::Threader).to receive(:flush) + it 'flushes the pool' do + expect(Circuitry::Pool).to receive(:flush) subject.flush end end diff --git a/spec/support/examples/async_processor_examples.rb b/spec/support/examples/async_processor_examples.rb index 76bcce2..f72d301 100644 --- a/spec/support/examples/async_processor_examples.rb +++ b/spec/support/examples/async_processor_examples.rb @@ -6,12 +6,12 @@ before do allow(subject).to receive(:fork) { |&block| block.call } allow(Process).to receive(:detach) - allow(Circuitry.subscriber_config).to receive(:on_async_exit).and_return(on_async_exit) + allow(config).to receive(:on_async_exit).and_return(on_async_exit) end it 'calls the proc' do - subject.process(&block) - subject.flush + subject.process + subject.wait expect(on_async_exit).to have_received(:call) end end