diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a70e00..d4b57ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Circuitry 3.x.x (TBD) +* Fixed async publisher to use correct configuration options. *Matt Huggins* * Fixed issue with `circuitry help` missing dependency. *Matt Huggins* * Fixed issue with `circuitry:setup` rake task when no topics are defined. *Matt Huggins* * Fixed issues with `circuitry:setup` rake task in vanilla Ruby projects. *Matt Huggins* diff --git a/README.md b/README.md index bd35281..1467430 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,8 @@ The `publish` method also accepts options that impact instantiation of the will be attempted before giving up. If the timeout is exceeded, an exception will raised to be handled by your application or `error_handler`. *(default: 15)* +* `:config` - A custom configuration object. Generally this option can be ignored. + *(default: `Circuitry.publisher_config`)* ```ruby obj = { foo: 'foo', bar: 'bar' } @@ -258,6 +260,8 @@ The `subscribe` method also accepts options that impact instantiation of the short-polling. *(default: 10)* * `:batch_size` - The number of messages to retrieve in a single SQS request. *(default: 10)* +* `:config` - A custom configuration object. Generally this option can be ignored. + *(default: `Circuitry.subscriber_config`)* ```ruby options = { diff --git a/lib/circuitry.rb b/lib/circuitry.rb index ac99d0c..b4f392b 100644 --- a/lib/circuitry.rb +++ b/lib/circuitry.rb @@ -7,6 +7,7 @@ require 'circuitry/locks/noop' require 'circuitry/locks/redis' require 'circuitry/middleware/chain' +require 'circuitry/pool' require 'circuitry/processor' require 'circuitry/processors/batcher' require 'circuitry/processors/forker' @@ -46,9 +47,7 @@ def subscribe(options = {}, &block) end def flush - Processors.constants.each do |const| - Processors.const_get(const).flush - end + Pool.flush end end end diff --git a/lib/circuitry/concerns/async.rb b/lib/circuitry/concerns/async.rb index c92b7ea..07dbe06 100644 --- a/lib/circuitry/concerns/async.rb +++ b/lib/circuitry/concerns/async.rb @@ -17,20 +17,18 @@ module ClassMethods def async_strategies [:fork, :thread, :batch] end - - def default_async_strategy - raise NotImplementedError, "#{name} must implement class method `default_async_strategy`" - end end def process_asynchronously(&block) - send(:"process_via_#{async}", &block) + processor = send(:"process_via_#{async}", &block) + processor.process + Pool << processor end def async=(value) value = case value when false, nil then false - when true then self.class.default_async_strategy + when true then config.async_strategy when *self.class.async_strategies then value else raise ArgumentError, async_value_error(value) end @@ -58,15 +56,15 @@ def platform_supports_forking? end def process_via_fork(&block) - Processors::Forker.process(&block) + Processors::Forker.new(config, &block) end def process_via_thread(&block) - Processors::Threader.process(&block) + Processors::Threader.new(config, &block) end def process_via_batch(&block) - Processors::Batcher.process(&block) + Processors::Batcher.new(config, &block) end end end diff --git a/lib/circuitry/pool.rb b/lib/circuitry/pool.rb new file mode 100644 index 0000000..c9fc86f --- /dev/null +++ b/lib/circuitry/pool.rb @@ -0,0 +1,34 @@ +module Circuitry + module Pool + class << self + def <<(processor) + raise ArgumentError, 'processor must be a Circuitry::Processor' unless processor.is_a?(Circuitry::Processor) + pool << processor + end + + def flush + while (processor = pool.shift) + processor.wait + end + end + + def size + pool.size + end + + def empty? + pool.empty? + end + + def any? + pool.any? + end + + private + + def pool + @pool ||= [] + end + end + end +end diff --git a/lib/circuitry/processor.rb b/lib/circuitry/processor.rb index 91afc6a..f98cbc5 100644 --- a/lib/circuitry/processor.rb +++ b/lib/circuitry/processor.rb @@ -1,38 +1,47 @@ module Circuitry - module Processor - def process(&_block) - raise NotImplementedError, "#{self} must implement class method `process`" + class Processor + attr_reader :config, :block + + def initialize(config, &block) + raise ArgumentError, 'no block given' unless block_given? + + self.config = config + self.block = block end - def flush - raise NotImplementedError, "#{self} must implement class method `flush`" + def process + raise NotImplementedError, "#{self} must implement instance method `process`" end - def on_exit - Circuitry.subscriber_config.on_async_exit + def wait + raise NotImplementedError, "#{self} must implement instance method `wait`" end protected - def safely_process - yield + def safely_process(&block) + block.call rescue => e logger.error("Error handling message: #{e}") error_handler.call(e) if error_handler - end - - def pool - @pool ||= [] + ensure + on_exit.call if on_exit end private + attr_writer :config, :block + def logger - Circuitry.subscriber_config.logger + config.logger end def error_handler - Circuitry.subscriber_config.error_handler + config.error_handler + end + + def on_exit + config.on_async_exit end end end diff --git a/lib/circuitry/processors/batcher.rb b/lib/circuitry/processors/batcher.rb index 84241cf..9c0d2d8 100644 --- a/lib/circuitry/processors/batcher.rb +++ b/lib/circuitry/processors/batcher.rb @@ -2,20 +2,13 @@ module Circuitry module Processors - module Batcher - class << self - include Processor - - def process(&block) - raise ArgumentError, 'no block given' unless block_given? - pool << block - end + class Batcher < Processor + def process + # noop + end - def flush - while (block = pool.shift) - safely_process(&block) - end - end + def wait + safely_process(&block) end end end diff --git a/lib/circuitry/processors/forker.rb b/lib/circuitry/processors/forker.rb index 4a0d10e..1b490be 100644 --- a/lib/circuitry/processors/forker.rb +++ b/lib/circuitry/processors/forker.rb @@ -2,20 +2,20 @@ module Circuitry module Processors - module Forker - class << self - include Processor + class Forker < Processor + def process + Process.detach(pid) + end - def process(&block) - pid = fork do - safely_process(&block) - on_exit.call if on_exit - end + def wait + # noop + end - Process.detach(pid) - end + private - def flush + def pid + @pid ||= fork do + safely_process(&block) end end end diff --git a/lib/circuitry/processors/threader.rb b/lib/circuitry/processors/threader.rb index 2220538..3f9cc63 100644 --- a/lib/circuitry/processors/threader.rb +++ b/lib/circuitry/processors/threader.rb @@ -2,23 +2,20 @@ module Circuitry module Processors - module Threader - class << self - include Processor + class Threader < Processor + def process + thread + end - def process(&block) - raise ArgumentError, 'no block given' unless block_given? + def wait + thread.join + end - pool << Thread.new do - safely_process(&block) - on_exit.call if on_exit - end - end + private - def flush - pool.each(&:join) - ensure - pool.clear + def thread + @thread ||= Thread.new do + safely_process(&block) end end end diff --git a/lib/circuitry/publisher.rb b/lib/circuitry/publisher.rb index 2629d03..b8636bc 100644 --- a/lib/circuitry/publisher.rb +++ b/lib/circuitry/publisher.rb @@ -20,11 +20,12 @@ class Publisher ::Aws::SNS::Errors::InternalFailure ].freeze - attr_reader :timeout + attr_reader :config, :timeout def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) + self.config = options[:config] || Circuitry.publisher_config self.async = options[:async] self.timeout = options[:timeout] end @@ -43,10 +44,6 @@ def publish(topic_name, object) end end - def self.default_async_strategy - Circuitry.publisher_config.async_strategy - end - protected def publish_message(topic_name, message) @@ -68,22 +65,22 @@ def publish_message(topic_name, message) end end - attr_writer :timeout - private + attr_writer :config, :timeout + def logger - Circuitry.publisher_config.logger + config.logger end def can_publish? - Circuitry.publisher_config.aws_options.values.all? do |value| + config.aws_options.values.all? do |value| !value.nil? && !value.empty? end end def middleware - Circuitry.publisher_config.middleware + config.middleware end end end diff --git a/lib/circuitry/subscriber.rb b/lib/circuitry/subscriber.rb index c0a8c84..6941eae 100644 --- a/lib/circuitry/subscriber.rb +++ b/lib/circuitry/subscriber.rb @@ -12,7 +12,7 @@ class Subscriber include Concerns::Async include Services::SQS - attr_reader :queue, :timeout, :wait_time, :batch_size, :lock + attr_reader :config, :timeout, :wait_time, :batch_size, :lock DEFAULT_OPTIONS = { lock: true, @@ -30,7 +30,7 @@ def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) self.subscribed = false - self.queue = Queue.find(Circuitry.subscriber_config.queue_name).url + self.config = options[:config] || Circuitry.subscriber_config %i[lock async timeout wait_time batch_size].each do |sym| send(:"#{sym}=", options[sym]) @@ -41,40 +41,38 @@ def initialize(options = {}) def subscribe(&block) raise ArgumentError, 'block required' if block.nil? + raise SubscribeError, 'already subscribed' if subscribed? raise SubscribeError, 'AWS configuration is not set' unless can_subscribe? - logger.info("Subscribing to queue: #{queue}") - - self.subscribed = true + subscribed! poll(&block) - self.subscribed = false - - logger.info("Unsubscribed from queue: #{queue}") rescue *CONNECTION_ERRORS => e logger.error("Connection error to queue: #{queue}: #{e}") raise SubscribeError, e.message + ensure + unsubscribed! end def subscribed? subscribed end - def self.async_strategies - super - [:batch] + def queue + @queue ||= Queue.find(config.queue_name).url end - def self.default_async_strategy - Circuitry.subscriber_config.async_strategy + def self.async_strategies + super - [:batch] end protected - attr_writer :queue, :timeout, :wait_time, :batch_size + attr_writer :config, :timeout, :wait_time, :batch_size attr_accessor :subscribed def lock=(value) value = case value - when true then Circuitry.subscriber_config.lock_strategy + when true then config.lock_strategy when false then Circuitry::Locks::NOOP.new when Circuitry::Locks::Base then value else raise ArgumentError, lock_value_error(value) @@ -99,6 +97,16 @@ def trap_signals end end + def subscribed! + logger.info("Subscribing to queue: #{queue}") unless subscribed? + self.subscribed = true + end + + def unsubscribed! + logger.info("Unsubscribed from queue: #{queue}") if subscribed? + self.subscribed = false + end + def poll(&block) poller = Aws::SQS::QueuePoller.new(queue, client: sqs) @@ -183,21 +191,21 @@ def delete_message(message) end def logger - Circuitry.subscriber_config.logger + config.logger end def error_handler - Circuitry.subscriber_config.error_handler + config.error_handler end def can_subscribe? - Circuitry.subscriber_config.aws_options.values.all? do |value| + config.aws_options.values.all? do |value| !value.nil? && !value.empty? end end def middleware - Circuitry.subscriber_config.middleware + config.middleware end end end diff --git a/spec/circuitry/concerns/async_spec.rb b/spec/circuitry/concerns/async_spec.rb index d7203ac..b7dc16c 100644 --- a/spec/circuitry/concerns/async_spec.rb +++ b/spec/circuitry/concerns/async_spec.rb @@ -3,17 +3,13 @@ async_class = Class.new do include Circuitry::Concerns::Async - def self.default_async_strategy - :thread - end - def self.async_strategies [:fork, :thread, :batch] end -end -incomplete_async_class = Class.new do - include Circuitry::Concerns::Async + def config + Circuitry.subscriber_config + end end RSpec.describe Circuitry::Concerns::Async, type: :model do @@ -40,20 +36,8 @@ def self.async_strategies end describe 'with true' do - describe 'when the class has defined a default async strategy' do - it 'sets async to the default value' do - expect(subject.class).to receive(:default_async_strategy).at_least(:once).and_call_original - subject.async = true - expect(subject.async).to eq subject.class.default_async_strategy - end - end - - describe 'when the class has not defined a default async strategy' do - subject { incomplete_async_class.new } - - it 'raises an error' do - expect { subject.async = true }.to raise_error(NotImplementedError) - end + it 'sets async to the default value' do + expect { subject.async = true }.to change { subject.async }.to(subject.config.async_strategy) end end @@ -87,39 +71,50 @@ def self.async_strategies end describe '#process_asynchronously' do - let(:block) { ->{ } } + let(:block) { ->(_) { } } + let(:processor) { double('Circuitry::Processor', process: nil, wait: nil, is_a?: true) } - describe 'via forking' do + shared_examples_for 'an asynchronous processor' do before do - allow(subject).to receive(:async).and_return(:fork) + allow(Circuitry::Pool).to receive(:<<) + end + + it 'delegates to the processor' do + subject.process_asynchronously(&block) + expect(processor).to have_received(:process) end - it 'delegates to fork processor' do - expect(Circuitry::Processors::Forker).to receive(:process).with(no_args, &block) + it 'adds the processor to the pool' do subject.process_asynchronously(&block) + expect(Circuitry::Pool).to have_received(:<<).with(processor) + end + end + + describe 'via forking' do + before do + allow(subject).to receive(:async).and_return(:fork) + allow(Circuitry::Processors::Forker).to receive(:new).with(any_args, &block).and_return(processor) end + + it_behaves_like 'an asynchronous processor' end describe 'via threading' do before do allow(subject).to receive(:async).and_return(:thread) + allow(Circuitry::Processors::Threader).to receive(:new).with(any_args, &block).and_return(processor) end - it 'delegates to thread processor' do - expect(Circuitry::Processors::Threader).to receive(:process).with(no_args, &block) - subject.process_asynchronously(&block) - end + it_behaves_like 'an asynchronous processor' end describe 'via batching' do before do allow(subject).to receive(:async).and_return(:batch) + allow(Circuitry::Processors::Batcher).to receive(:new).with(any_args, &block).and_return(processor) end - it 'delegates to batch processor' do - expect(Circuitry::Processors::Batcher).to receive(:process).with(no_args, &block) - subject.process_asynchronously(&block) - end + it_behaves_like 'an asynchronous processor' end end end diff --git a/spec/circuitry/processor_spec.rb b/spec/circuitry/processor_spec.rb index 06d146f..3085810 100644 --- a/spec/circuitry/processor_spec.rb +++ b/spec/circuitry/processor_spec.rb @@ -1,43 +1,40 @@ require 'spec_helper' -processor_class = Class.new do - include Circuitry::Processor - - def process(&block) +processor_class = Class.new(Circuitry::Processor) do + def process block.call end - def flush - pool.clear + def wait + # noop end end -incomplete_processor_class = Class.new do - include Circuitry::Processor -end +incomplete_processor_class = Class.new(Circuitry::Processor) RSpec.describe Circuitry::Processor, type: :model do - subject { processor_class.new } + subject { processor_class.new(config, &block) } - describe '.process' do - let(:block) { ->{ } } + let(:config) { double('Circuitry::PublisherConfig', logger: nil, error_handler: nil, on_async_exit: nil) } + let(:block) { ->{ } } + describe '#process' do describe 'when the class has defined process' do it 'raises an error' do - expect { subject.process(&block) }.to_not raise_error + expect { subject.process }.to_not raise_error end end describe 'when the class has not defined process' do - subject { incomplete_processor_class.new } + subject { incomplete_processor_class.new(config, &block) } it 'raises an error' do - expect { subject.process(&block) }.to raise_error(NotImplementedError) + expect { subject.process }.to raise_error(NotImplementedError) end end end - describe '.safely_process' do + describe '#safely_process' do def process subject.send(:safely_process, &block) end @@ -46,7 +43,7 @@ def process let(:block) { ->{ raise StandardError } } before do - allow(Circuitry.subscriber_config.logger).to receive(:error) + allow(config.logger).to receive(:error) end it 'does not re-raise the error' do @@ -55,19 +52,19 @@ def process it 'logs an error' do process - expect(Circuitry.subscriber_config.logger).to have_received(:error) + expect(config.logger).to have_received(:error) end describe 'when an error handler is defined' do let(:error_handler) { double('Proc', call: true) } before do - allow(Circuitry.subscriber_config).to receive(:error_handler).and_return(error_handler) + allow(config).to receive(:error_handler).and_return(error_handler) end it 'handles the error' do process - expect(Circuitry.subscriber_config.error_handler).to have_received(:call) + expect(config.error_handler).to have_received(:call) end end @@ -76,45 +73,43 @@ def process before do allow_message_expectations_on_nil - allow(Circuitry.subscriber_config).to receive(:error_handler).and_return(error_handler) + allow(config).to receive(:error_handler).and_return(error_handler) allow(error_handler).to receive(:call) end it 'does not handle the error' do process - expect(Circuitry.subscriber_config.error_handler).to_not have_received(:call) + expect(config.error_handler).to_not have_received(:call) end end end describe 'when the block does not raise an error' do - let(:block) { ->{ } } - it 'does not log an error' do - expect(Circuitry.subscriber_config.logger).to_not receive(:error) + expect(config.logger).to_not receive(:error) process end it 'does not handle an error' do allow_message_expectations_on_nil - expect(Circuitry.subscriber_config.error_handler).to_not receive(:call) + expect(config.error_handler).to_not receive(:call) process end end end - describe '#flush' do - describe 'when the class has defined flush' do + describe '#wait' do + describe 'when the class has defined wait' do it 'does not raise an error' do - expect { subject.flush }.to_not raise_error + expect { subject.wait }.to_not raise_error end end - describe 'when the class has not defined flush' do - subject { incomplete_processor_class.new } + describe 'when the class has not defined wait' do + subject { incomplete_processor_class.new(config, &block) } it 'raises an error' do - expect { subject.flush }.to raise_error(NotImplementedError) + expect { subject.wait }.to raise_error(NotImplementedError) end end end diff --git a/spec/circuitry/processors/batcher_spec.rb b/spec/circuitry/processors/batcher_spec.rb index 8315743..e55759d 100644 --- a/spec/circuitry/processors/batcher_spec.rb +++ b/spec/circuitry/processors/batcher_spec.rb @@ -1,39 +1,24 @@ require 'spec_helper' RSpec.describe Circuitry::Processors::Batcher, type: :model do - subject { described_class } + subject { described_class.new(config, &block) } - it { is_expected.to be_a Circuitry::Processor } - - describe '.batch' do - let(:pool) { double('Array', '<<' => []) } - let(:block) { -> {} } + let(:config) { double('Circuitry::PublisherConfig', logger: nil, error_handler: nil, on_async_exit: nil) } + let(:block) { -> {} } - before do - allow(subject).to receive(:pool).and_return(pool) - end + it { is_expected.to be_a Circuitry::Processor } - it 'adds the block to the pool' do - subject.process(&block) - expect(pool).to have_received(:<<).with(block) + describe '#process' do + it 'does not call the block' do + expect(block).to_not receive(:call) + subject.process end end - describe '.flush' do - let(:pool) { [-> {}, -> {}] } - - before do - allow(subject).to receive(:pool).and_return(pool) - end - - it 'calls each block' do - subject.flush - pool.each { |block| expect(block).to have_received(:call) } - end - - it 'clears the pool' do - subject.flush - expect(pool).to be_empty + describe '#wait' do + it 'calls the block' do + expect(block).to receive(:call) + subject.wait end end end diff --git a/spec/circuitry/processors/forker_spec.rb b/spec/circuitry/processors/forker_spec.rb index 83a56b4..47a71c7 100644 --- a/spec/circuitry/processors/forker_spec.rb +++ b/spec/circuitry/processors/forker_spec.rb @@ -1,35 +1,37 @@ require 'spec_helper' RSpec.describe Circuitry::Processors::Forker, type: :model do - subject { described_class } + subject { described_class.new(config, &block) } + + let(:config) { double('Circuitry::PublisherConfig', logger: nil, error_handler: nil, on_async_exit: nil) } + let(:block) { ->{ } } it { is_expected.to be_a Circuitry::Processor } it_behaves_like 'an asyncronous processor' - describe '.fork' do + describe '#process' do before do allow(subject).to receive(:fork).and_return(pid) allow(Process).to receive(:detach) end let(:pid) { 'pid' } - let(:block) { ->{ } } it 'forks a process' do - subject.process(&block) + subject.process expect(subject).to have_received(:fork) end it 'detaches the forked process' do - subject.process(&block) + subject.process expect(Process).to have_received(:detach).with(pid) end end - describe '.flush' do + describe '#wait' do it 'does nothing' do - expect { subject.flush }.to_not raise_error + expect { subject.wait }.to_not raise_error end end end diff --git a/spec/circuitry/processors/threader_spec.rb b/spec/circuitry/processors/threader_spec.rb index 4fa3a38..1ca54ba 100644 --- a/spec/circuitry/processors/threader_spec.rb +++ b/spec/circuitry/processors/threader_spec.rb @@ -1,48 +1,38 @@ require 'spec_helper' RSpec.describe Circuitry::Processors::Threader, type: :model do - subject { described_class } + subject { described_class.new(config, &block) } + + let(:config) { double('Circuitry::PublisherConfig', logger: nil, error_handler: nil, on_async_exit: nil) } + let(:block) { ->{ } } it { is_expected.to be_a Circuitry::Processor } it_behaves_like 'an asyncronous processor' - describe '.process' do - let(:pool) { double('Array', '<<': []) } - let(:block) { ->{ } } - let(:thread) { double('Thread', join: true) } - + describe '#process' do before do - allow(subject).to receive(:pool).and_return(pool) allow(Thread).to receive(:new).and_return(thread) end + let(:thread) { double('Thread', join: true) } + it 'wraps the block in a thread' do - subject.process(&block) + subject.process expect(Thread).to have_received(:new).with(no_args, &block) end - - it 'adds the thread to the pool' do - subject.process(&block) - expect(pool).to have_received(:<<).with(thread) - end end - describe '.flush' do - let(:pool) { [double('Thread', join: true), double('Thread', join: true)] } - + describe '#wait' do before do - allow(subject).to receive(:pool).and_return(pool) + allow(Thread).to receive(:new).and_return(thread) end - it 'joins each thread' do - subject.flush - pool.each { |thread| expect(thread).to have_received(:join) } - end + let(:thread) { double('Thread', join: true) } - it 'clears the pool' do - subject.flush - expect(pool).to be_empty + it 'joins the thread' do + subject.wait + expect(thread).to have_received(:join) end end end diff --git a/spec/circuitry_spec.rb b/spec/circuitry_spec.rb index 5628e1e..e184e4e 100644 --- a/spec/circuitry_spec.rb +++ b/spec/circuitry_spec.rb @@ -58,18 +58,8 @@ end describe '.flush' do - it 'flushes batches' do - expect(Circuitry::Processors::Batcher).to receive(:flush) - subject.flush - end - - it 'flushes forks' do - expect(Circuitry::Processors::Forker).to receive(:flush) - subject.flush - end - - it 'flushes threads' do - expect(Circuitry::Processors::Threader).to receive(:flush) + it 'flushes the pool' do + expect(Circuitry::Pool).to receive(:flush) subject.flush end end diff --git a/spec/support/examples/async_processor_examples.rb b/spec/support/examples/async_processor_examples.rb index 76bcce2..f72d301 100644 --- a/spec/support/examples/async_processor_examples.rb +++ b/spec/support/examples/async_processor_examples.rb @@ -6,12 +6,12 @@ before do allow(subject).to receive(:fork) { |&block| block.call } allow(Process).to receive(:detach) - allow(Circuitry.subscriber_config).to receive(:on_async_exit).and_return(on_async_exit) + allow(config).to receive(:on_async_exit).and_return(on_async_exit) end it 'calls the proc' do - subject.process(&block) - subject.flush + subject.process + subject.wait expect(on_async_exit).to have_received(:call) end end