Skip to content

Commit

Permalink
Updated publisher & subscriber to accept config as option
Browse files Browse the repository at this point in the history
  • Loading branch information
mhuggins committed Apr 20, 2016
1 parent 1e4b602 commit f225b41
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 58 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ The `publish` method also accepts options that impact instantiation of the
will be attempted before giving up. If the timeout is exceeded, an exception
will raised to be handled by your application or `error_handler`. *(default:
15)*
* `:config` - A custom configuration object. Generally this option can be ignored.
*(default: `Circuitry.publisher_config`)*

```ruby
obj = { foo: 'foo', bar: 'bar' }
Expand Down Expand Up @@ -258,6 +260,8 @@ The `subscribe` method also accepts options that impact instantiation of the
short-polling. *(default: 10)*
* `:batch_size` - The number of messages to retrieve in a single SQS request.
*(default: 10)*
* `:config` - A custom configuration object. Generally this option can be ignored.
*(default: `Circuitry.subscriber_config`)*

```ruby
options = {
Expand Down
6 changes: 1 addition & 5 deletions lib/circuitry/concerns/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ module ClassMethods
def async_strategies
[:fork, :thread, :batch]
end

def default_async_strategy
raise NotImplementedError, "#{name} must implement class method `default_async_strategy`"
end
end

def process_asynchronously(&block)
Expand All @@ -32,7 +28,7 @@ def process_asynchronously(&block)
def async=(value)
value = case value
when false, nil then false
when true then self.class.default_async_strategy
when true then config.async_strategy
when *self.class.async_strategies then value
else raise ArgumentError, async_value_error(value)
end
Expand Down
13 changes: 3 additions & 10 deletions lib/circuitry/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ class Publisher
::Aws::SNS::Errors::InternalFailure
].freeze

attr_reader :timeout
attr_reader :config, :timeout

def initialize(options = {})
options = DEFAULT_OPTIONS.merge(options)

self.config = options[:config] || Circuitry.publisher_config
self.async = options[:async]
self.timeout = options[:timeout]
end
Expand All @@ -43,10 +44,6 @@ def publish(topic_name, object)
end
end

def self.default_async_strategy
Circuitry.publisher_config.async_strategy
end

protected

def publish_message(topic_name, message)
Expand All @@ -68,13 +65,9 @@ def publish_message(topic_name, message)
end
end

def config
Circuitry.publisher_config
end

private

attr_writer :timeout
attr_writer :config, :timeout

def logger
config.logger
Expand Down
38 changes: 21 additions & 17 deletions lib/circuitry/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Subscriber
include Concerns::Async
include Services::SQS

attr_reader :queue, :timeout, :wait_time, :batch_size, :lock
attr_reader :config, :timeout, :wait_time, :batch_size, :lock

DEFAULT_OPTIONS = {
lock: true,
Expand All @@ -30,7 +30,7 @@ def initialize(options = {})
options = DEFAULT_OPTIONS.merge(options)

self.subscribed = false
self.queue = Queue.find(config.queue_name).url
self.config = options[:config] || Circuitry.subscriber_config

%i[lock async timeout wait_time batch_size].each do |sym|
send(:"#{sym}=", options[sym])
Expand All @@ -41,35 +41,33 @@ def initialize(options = {})

def subscribe(&block)
raise ArgumentError, 'block required' if block.nil?
raise SubscribeError, 'already subscribed' if subscribed?
raise SubscribeError, 'AWS configuration is not set' unless can_subscribe?

logger.info("Subscribing to queue: #{queue}")

self.subscribed = true
subscribed!
poll(&block)
self.subscribed = false

logger.info("Unsubscribed from queue: #{queue}")
rescue *CONNECTION_ERRORS => e
logger.error("Connection error to queue: #{queue}: #{e}")
raise SubscribeError, e.message
ensure
unsubscribed!
end

def subscribed?
subscribed
end

def self.async_strategies
super - [:batch]
def queue
@queue ||= Queue.find(config.queue_name).url
end

def self.default_async_strategy
Circuitry.subscriber_config.async_strategy
def self.async_strategies
super - [:batch]
end

protected

attr_writer :queue, :timeout, :wait_time, :batch_size
attr_writer :config, :timeout, :wait_time, :batch_size
attr_accessor :subscribed

def lock=(value)
Expand All @@ -83,10 +81,6 @@ def lock=(value)
@lock = value
end

def config
Circuitry.subscriber_config
end

private

def lock_value_error(value)
Expand All @@ -103,6 +97,16 @@ def trap_signals
end
end

def subscribed!
logger.info("Subscribing to queue: #{queue}") unless subscribed?
self.subscribed = true
end

def unsubscribed!
logger.info("Unsubscribed from queue: #{queue}") if subscribed?
self.subscribed = false
end

def poll(&block)
poller = Aws::SQS::QueuePoller.new(queue, client: sqs)

Expand Down
28 changes: 2 additions & 26 deletions spec/circuitry/concerns/async_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
async_class = Class.new do
include Circuitry::Concerns::Async

def self.default_async_strategy
:thread
end

def self.async_strategies
[:fork, :thread, :batch]
end
Expand All @@ -16,14 +12,6 @@ def config
end
end

incomplete_async_class = Class.new do
include Circuitry::Concerns::Async

def config
Circuitry.subscriber_config
end
end

RSpec.describe Circuitry::Concerns::Async, type: :model do
subject { async_class.new }

Expand All @@ -48,20 +36,8 @@ def config
end

describe 'with true' do
describe 'when the class has defined a default async strategy' do
it 'sets async to the default value' do
expect(subject.class).to receive(:default_async_strategy).at_least(:once).and_call_original
subject.async = true
expect(subject.async).to eq subject.class.default_async_strategy
end
end

describe 'when the class has not defined a default async strategy' do
subject { incomplete_async_class.new }

it 'raises an error' do
expect { subject.async = true }.to raise_error(NotImplementedError)
end
it 'sets async to the default value' do
expect { subject.async = true }.to change { subject.async }.to(subject.config.async_strategy)
end
end

Expand Down

0 comments on commit f225b41

Please sign in to comment.