Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed processors to use appropriate config #55

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Circuitry 3.x.x (TBD)

* Fixed async publisher to use correct configuration options. *Matt Huggins*
* Fixed issue with `circuitry help` missing dependency. *Matt Huggins*
* Fixed issue with `circuitry:setup` rake task when no topics are defined. *Matt Huggins*
* Fixed issues with `circuitry:setup` rake task in vanilla Ruby projects. *Matt Huggins*
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ The `publish` method also accepts options that impact instantiation of the
will be attempted before giving up. If the timeout is exceeded, an exception
will raised to be handled by your application or `error_handler`. *(default:
15)*
* `:config` - A custom configuration object. Generally this option can be ignored.
*(default: `Circuitry.publisher_config`)*

```ruby
obj = { foo: 'foo', bar: 'bar' }
Expand Down Expand Up @@ -258,6 +260,8 @@ The `subscribe` method also accepts options that impact instantiation of the
short-polling. *(default: 10)*
* `:batch_size` - The number of messages to retrieve in a single SQS request.
*(default: 10)*
* `:config` - A custom configuration object. Generally this option can be ignored.
*(default: `Circuitry.subscriber_config`)*

```ruby
options = {
Expand Down
5 changes: 2 additions & 3 deletions lib/circuitry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'circuitry/locks/noop'
require 'circuitry/locks/redis'
require 'circuitry/middleware/chain'
require 'circuitry/pool'
require 'circuitry/processor'
require 'circuitry/processors/batcher'
require 'circuitry/processors/forker'
Expand Down Expand Up @@ -46,9 +47,7 @@ def subscribe(options = {}, &block)
end

def flush
Processors.constants.each do |const|
Processors.const_get(const).flush
end
Pool.flush
end
end
end
16 changes: 7 additions & 9 deletions lib/circuitry/concerns/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ module ClassMethods
def async_strategies
[:fork, :thread, :batch]
end

def default_async_strategy
raise NotImplementedError, "#{name} must implement class method `default_async_strategy`"
end
end

def process_asynchronously(&block)
send(:"process_via_#{async}", &block)
processor = send(:"process_via_#{async}", &block)
processor.process
Pool << processor
end

def async=(value)
value = case value
when false, nil then false
when true then self.class.default_async_strategy
when true then config.async_strategy
when *self.class.async_strategies then value
else raise ArgumentError, async_value_error(value)
end
Expand Down Expand Up @@ -58,15 +56,15 @@ def platform_supports_forking?
end

def process_via_fork(&block)
Processors::Forker.process(&block)
Processors::Forker.new(config, &block)
end

def process_via_thread(&block)
Processors::Threader.process(&block)
Processors::Threader.new(config, &block)
end

def process_via_batch(&block)
Processors::Batcher.process(&block)
Processors::Batcher.new(config, &block)
end
end
end
Expand Down
34 changes: 34 additions & 0 deletions lib/circuitry/pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module Circuitry
module Pool
class << self
def <<(processor)
raise ArgumentError, 'processor must be a Circuitry::Processor' unless processor.is_a?(Circuitry::Processor)
pool << processor
end

def flush
while (processor = pool.shift)
processor.wait
end
end

def size
pool.size
end

def empty?
pool.empty?
end

def any?
pool.any?
end

private

def pool
@pool ||= []
end
end
end
end
39 changes: 24 additions & 15 deletions lib/circuitry/processor.rb
Original file line number Diff line number Diff line change
@@ -1,38 +1,47 @@
module Circuitry
module Processor
def process(&_block)
raise NotImplementedError, "#{self} must implement class method `process`"
class Processor
attr_reader :config, :block

def initialize(config, &block)
raise ArgumentError, 'no block given' unless block_given?

self.config = config
self.block = block
end

def flush
raise NotImplementedError, "#{self} must implement class method `flush`"
def process
raise NotImplementedError, "#{self} must implement instance method `process`"
end

def on_exit
Circuitry.subscriber_config.on_async_exit
def wait
raise NotImplementedError, "#{self} must implement instance method `wait`"
end

protected

def safely_process
yield
def safely_process(&block)
block.call
rescue => e
logger.error("Error handling message: #{e}")
error_handler.call(e) if error_handler
end

def pool
@pool ||= []
ensure
on_exit.call if on_exit
end

private

attr_writer :config, :block

def logger
Circuitry.subscriber_config.logger
config.logger
end

def error_handler
Circuitry.subscriber_config.error_handler
config.error_handler
end

def on_exit
config.on_async_exit
end
end
end
19 changes: 6 additions & 13 deletions lib/circuitry/processors/batcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@

module Circuitry
module Processors
module Batcher
class << self
include Processor

def process(&block)
raise ArgumentError, 'no block given' unless block_given?
pool << block
end
class Batcher < Processor
def process
# noop
end

def flush
while (block = pool.shift)
safely_process(&block)
end
end
def wait
safely_process(&block)
end
end
end
Expand Down
22 changes: 11 additions & 11 deletions lib/circuitry/processors/forker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

module Circuitry
module Processors
module Forker
class << self
include Processor
class Forker < Processor
def process
Process.detach(pid)
end

def process(&block)
pid = fork do
safely_process(&block)
on_exit.call if on_exit
end
def wait
# noop
end

Process.detach(pid)
end
private

def flush
def pid
@pid ||= fork do
safely_process(&block)
end
end
end
Expand Down
25 changes: 11 additions & 14 deletions lib/circuitry/processors/threader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,20 @@

module Circuitry
module Processors
module Threader
class << self
include Processor
class Threader < Processor
def process
thread
end

def process(&block)
raise ArgumentError, 'no block given' unless block_given?
def wait
thread.join
end

pool << Thread.new do
safely_process(&block)
on_exit.call if on_exit
end
end
private

def flush
pool.each(&:join)
ensure
pool.clear
def thread
@thread ||= Thread.new do
safely_process(&block)
end
end
end
Expand Down
17 changes: 7 additions & 10 deletions lib/circuitry/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ class Publisher
::Aws::SNS::Errors::InternalFailure
].freeze

attr_reader :timeout
attr_reader :config, :timeout

def initialize(options = {})
options = DEFAULT_OPTIONS.merge(options)

self.config = options[:config] || Circuitry.publisher_config
self.async = options[:async]
self.timeout = options[:timeout]
end
Expand All @@ -43,10 +44,6 @@ def publish(topic_name, object)
end
end

def self.default_async_strategy
Circuitry.publisher_config.async_strategy
end

protected

def publish_message(topic_name, message)
Expand All @@ -68,22 +65,22 @@ def publish_message(topic_name, message)
end
end

attr_writer :timeout

private

attr_writer :config, :timeout

def logger
Circuitry.publisher_config.logger
config.logger
end

def can_publish?
Circuitry.publisher_config.aws_options.values.all? do |value|
config.aws_options.values.all? do |value|
!value.nil? && !value.empty?
end
end

def middleware
Circuitry.publisher_config.middleware
config.middleware
end
end
end
Loading