From b83d2af48289908bb7be101cebe29a17bd48706a Mon Sep 17 00:00:00 2001 From: mhuggins Date: Wed, 20 Apr 2016 13:43:19 -0600 Subject: [PATCH] Updated publisher & subscriber to accept config as option --- lib/circuitry/concerns/async.rb | 6 +---- lib/circuitry/publisher.rb | 13 +++------ lib/circuitry/subscriber.rb | 38 +++++++++++++++------------ spec/circuitry/concerns/async_spec.rb | 20 ++------------ 4 files changed, 27 insertions(+), 50 deletions(-) diff --git a/lib/circuitry/concerns/async.rb b/lib/circuitry/concerns/async.rb index 373aa4b..07dbe06 100644 --- a/lib/circuitry/concerns/async.rb +++ b/lib/circuitry/concerns/async.rb @@ -17,10 +17,6 @@ module ClassMethods def async_strategies [:fork, :thread, :batch] end - - def default_async_strategy - raise NotImplementedError, "#{name} must implement class method `default_async_strategy`" - end end def process_asynchronously(&block) @@ -32,7 +28,7 @@ def process_asynchronously(&block) def async=(value) value = case value when false, nil then false - when true then self.class.default_async_strategy + when true then config.async_strategy when *self.class.async_strategies then value else raise ArgumentError, async_value_error(value) end diff --git a/lib/circuitry/publisher.rb b/lib/circuitry/publisher.rb index 87aa033..b8636bc 100644 --- a/lib/circuitry/publisher.rb +++ b/lib/circuitry/publisher.rb @@ -20,11 +20,12 @@ class Publisher ::Aws::SNS::Errors::InternalFailure ].freeze - attr_reader :timeout + attr_reader :config, :timeout def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) + self.config = options[:config] || Circuitry.publisher_config self.async = options[:async] self.timeout = options[:timeout] end @@ -43,10 +44,6 @@ def publish(topic_name, object) end end - def self.default_async_strategy - Circuitry.publisher_config.async_strategy - end - protected def publish_message(topic_name, message) @@ -68,13 +65,9 @@ def publish_message(topic_name, message) end end - def config - Circuitry.publisher_config - end - private - attr_writer :timeout + attr_writer :config, :timeout def logger config.logger diff --git a/lib/circuitry/subscriber.rb b/lib/circuitry/subscriber.rb index 220ffd3..6941eae 100644 --- a/lib/circuitry/subscriber.rb +++ b/lib/circuitry/subscriber.rb @@ -12,7 +12,7 @@ class Subscriber include Concerns::Async include Services::SQS - attr_reader :queue, :timeout, :wait_time, :batch_size, :lock + attr_reader :config, :timeout, :wait_time, :batch_size, :lock DEFAULT_OPTIONS = { lock: true, @@ -30,7 +30,7 @@ def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) self.subscribed = false - self.queue = Queue.find(config.queue_name).url + self.config = options[:config] || Circuitry.subscriber_config %i[lock async timeout wait_time batch_size].each do |sym| send(:"#{sym}=", options[sym]) @@ -41,35 +41,33 @@ def initialize(options = {}) def subscribe(&block) raise ArgumentError, 'block required' if block.nil? + raise SubscribeError, 'already subscribed' if subscribed? raise SubscribeError, 'AWS configuration is not set' unless can_subscribe? - logger.info("Subscribing to queue: #{queue}") - - self.subscribed = true + subscribed! poll(&block) - self.subscribed = false - - logger.info("Unsubscribed from queue: #{queue}") rescue *CONNECTION_ERRORS => e logger.error("Connection error to queue: #{queue}: #{e}") raise SubscribeError, e.message + ensure + unsubscribed! end def subscribed? subscribed end - def self.async_strategies - super - [:batch] + def queue + @queue ||= Queue.find(config.queue_name).url end - def self.default_async_strategy - Circuitry.subscriber_config.async_strategy + def self.async_strategies + super - [:batch] end protected - attr_writer :queue, :timeout, :wait_time, :batch_size + attr_writer :config, :timeout, :wait_time, :batch_size attr_accessor :subscribed def lock=(value) @@ -83,10 +81,6 @@ def lock=(value) @lock = value end - def config - Circuitry.subscriber_config - end - private def lock_value_error(value) @@ -103,6 +97,16 @@ def trap_signals end end + def subscribed! + logger.info("Subscribing to queue: #{queue}") unless subscribed? + self.subscribed = true + end + + def unsubscribed! + logger.info("Unsubscribed from queue: #{queue}") if subscribed? + self.subscribed = false + end + def poll(&block) poller = Aws::SQS::QueuePoller.new(queue, client: sqs) diff --git a/spec/circuitry/concerns/async_spec.rb b/spec/circuitry/concerns/async_spec.rb index dc5c63b..85278a3 100644 --- a/spec/circuitry/concerns/async_spec.rb +++ b/spec/circuitry/concerns/async_spec.rb @@ -3,10 +3,6 @@ async_class = Class.new do include Circuitry::Concerns::Async - def self.default_async_strategy - :thread - end - def self.async_strategies [:fork, :thread, :batch] end @@ -48,20 +44,8 @@ def config end describe 'with true' do - describe 'when the class has defined a default async strategy' do - it 'sets async to the default value' do - expect(subject.class).to receive(:default_async_strategy).at_least(:once).and_call_original - subject.async = true - expect(subject.async).to eq subject.class.default_async_strategy - end - end - - describe 'when the class has not defined a default async strategy' do - subject { incomplete_async_class.new } - - it 'raises an error' do - expect { subject.async = true }.to raise_error(NotImplementedError) - end + it 'sets async to the default value' do + expect { subject.async = true }.to change { subject.async }.to(subject.config.async_strategy) end end