diff --git a/README.md b/README.md index bd35281..1467430 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,8 @@ The `publish` method also accepts options that impact instantiation of the will be attempted before giving up. If the timeout is exceeded, an exception will raised to be handled by your application or `error_handler`. *(default: 15)* +* `:config` - A custom configuration object. Generally this option can be ignored. + *(default: `Circuitry.publisher_config`)* ```ruby obj = { foo: 'foo', bar: 'bar' } @@ -258,6 +260,8 @@ The `subscribe` method also accepts options that impact instantiation of the short-polling. *(default: 10)* * `:batch_size` - The number of messages to retrieve in a single SQS request. *(default: 10)* +* `:config` - A custom configuration object. Generally this option can be ignored. + *(default: `Circuitry.subscriber_config`)* ```ruby options = { diff --git a/lib/circuitry/concerns/async.rb b/lib/circuitry/concerns/async.rb index 373aa4b..07dbe06 100644 --- a/lib/circuitry/concerns/async.rb +++ b/lib/circuitry/concerns/async.rb @@ -17,10 +17,6 @@ module ClassMethods def async_strategies [:fork, :thread, :batch] end - - def default_async_strategy - raise NotImplementedError, "#{name} must implement class method `default_async_strategy`" - end end def process_asynchronously(&block) @@ -32,7 +28,7 @@ def process_asynchronously(&block) def async=(value) value = case value when false, nil then false - when true then self.class.default_async_strategy + when true then config.async_strategy when *self.class.async_strategies then value else raise ArgumentError, async_value_error(value) end diff --git a/lib/circuitry/publisher.rb b/lib/circuitry/publisher.rb index 87aa033..b8636bc 100644 --- a/lib/circuitry/publisher.rb +++ b/lib/circuitry/publisher.rb @@ -20,11 +20,12 @@ class Publisher ::Aws::SNS::Errors::InternalFailure ].freeze - attr_reader :timeout + attr_reader :config, :timeout def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) + self.config = options[:config] || Circuitry.publisher_config self.async = options[:async] self.timeout = options[:timeout] end @@ -43,10 +44,6 @@ def publish(topic_name, object) end end - def self.default_async_strategy - Circuitry.publisher_config.async_strategy - end - protected def publish_message(topic_name, message) @@ -68,13 +65,9 @@ def publish_message(topic_name, message) end end - def config - Circuitry.publisher_config - end - private - attr_writer :timeout + attr_writer :config, :timeout def logger config.logger diff --git a/lib/circuitry/subscriber.rb b/lib/circuitry/subscriber.rb index 220ffd3..6941eae 100644 --- a/lib/circuitry/subscriber.rb +++ b/lib/circuitry/subscriber.rb @@ -12,7 +12,7 @@ class Subscriber include Concerns::Async include Services::SQS - attr_reader :queue, :timeout, :wait_time, :batch_size, :lock + attr_reader :config, :timeout, :wait_time, :batch_size, :lock DEFAULT_OPTIONS = { lock: true, @@ -30,7 +30,7 @@ def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) self.subscribed = false - self.queue = Queue.find(config.queue_name).url + self.config = options[:config] || Circuitry.subscriber_config %i[lock async timeout wait_time batch_size].each do |sym| send(:"#{sym}=", options[sym]) @@ -41,35 +41,33 @@ def initialize(options = {}) def subscribe(&block) raise ArgumentError, 'block required' if block.nil? + raise SubscribeError, 'already subscribed' if subscribed? raise SubscribeError, 'AWS configuration is not set' unless can_subscribe? - logger.info("Subscribing to queue: #{queue}") - - self.subscribed = true + subscribed! poll(&block) - self.subscribed = false - - logger.info("Unsubscribed from queue: #{queue}") rescue *CONNECTION_ERRORS => e logger.error("Connection error to queue: #{queue}: #{e}") raise SubscribeError, e.message + ensure + unsubscribed! end def subscribed? subscribed end - def self.async_strategies - super - [:batch] + def queue + @queue ||= Queue.find(config.queue_name).url end - def self.default_async_strategy - Circuitry.subscriber_config.async_strategy + def self.async_strategies + super - [:batch] end protected - attr_writer :queue, :timeout, :wait_time, :batch_size + attr_writer :config, :timeout, :wait_time, :batch_size attr_accessor :subscribed def lock=(value) @@ -83,10 +81,6 @@ def lock=(value) @lock = value end - def config - Circuitry.subscriber_config - end - private def lock_value_error(value) @@ -103,6 +97,16 @@ def trap_signals end end + def subscribed! + logger.info("Subscribing to queue: #{queue}") unless subscribed? + self.subscribed = true + end + + def unsubscribed! + logger.info("Unsubscribed from queue: #{queue}") if subscribed? + self.subscribed = false + end + def poll(&block) poller = Aws::SQS::QueuePoller.new(queue, client: sqs) diff --git a/spec/circuitry/concerns/async_spec.rb b/spec/circuitry/concerns/async_spec.rb index dc5c63b..b7dc16c 100644 --- a/spec/circuitry/concerns/async_spec.rb +++ b/spec/circuitry/concerns/async_spec.rb @@ -3,10 +3,6 @@ async_class = Class.new do include Circuitry::Concerns::Async - def self.default_async_strategy - :thread - end - def self.async_strategies [:fork, :thread, :batch] end @@ -16,14 +12,6 @@ def config end end -incomplete_async_class = Class.new do - include Circuitry::Concerns::Async - - def config - Circuitry.subscriber_config - end -end - RSpec.describe Circuitry::Concerns::Async, type: :model do subject { async_class.new } @@ -48,20 +36,8 @@ def config end describe 'with true' do - describe 'when the class has defined a default async strategy' do - it 'sets async to the default value' do - expect(subject.class).to receive(:default_async_strategy).at_least(:once).and_call_original - subject.async = true - expect(subject.async).to eq subject.class.default_async_strategy - end - end - - describe 'when the class has not defined a default async strategy' do - subject { incomplete_async_class.new } - - it 'raises an error' do - expect { subject.async = true }.to raise_error(NotImplementedError) - end + it 'sets async to the default value' do + expect { subject.async = true }.to change { subject.async }.to(subject.config.async_strategy) end end