Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for Redis spawning new clients for each topic/queue #21

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ easiest way to contribute.

* [Jeremy Hinegardner](https://github.com/copiousfreetime)
* [Kevin Barnes](https://github.com/vinbarnes)
* [Bruno Antunes](https://github.com/sardaukar)

[GitHub Account]: https://github.com/signup/free "GitHub Signup"
[GitHub Issues]: https://github.com/copiousfreetime/fixme/issues "Fixme Issues"
Expand Down
5 changes: 3 additions & 2 deletions lib/qup/adapter/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Redis < ::Qup::Adapter
def initialize( uri, options = {} )
@uri = uri
@options = options
@client = ::Redis.new host: @uri.host, port: @uri.port
@closed = false
end

Expand All @@ -24,7 +25,7 @@ def initialize( uri, options = {} )
#
# Returns a Qup::Queue
def queue( name )
Qup::Adapter::Redis::Queue.new( @uri, name )
Qup::Adapter::Redis::Queue.new( @client, name )
end

# Internal: Create a new Topic from this Adapter
Expand All @@ -33,7 +34,7 @@ def queue( name )
#
# Returns a Qup::Topic
def topic( name )
Qup::Adapter::Redis::Topic.new( @uri, name )
Qup::Adapter::Redis::Topic.new( @client, name )
end

# Internal: Close the Redis adapter
Expand Down
5 changes: 2 additions & 3 deletions lib/qup/adapter/redis/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ class Connection
# name - the String name of the Connection
#
# Returns a new Connection.
def initialize( uri, name )
@uri = uri
@client = Redis.new :host => @uri.host, :port => @uri.port
def initialize( client, name )
@client = client
@name = name
end

Expand Down
6 changes: 3 additions & 3 deletions lib/qup/adapter/redis/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ class Queue < Connection

# Internal: create a new Queue
#
# uri - the connection uri for the Redis Client
# client - the Redis client
# name - the String name of the Queue
# topic_name - (optional) the String name of a parent topic
#
# Returns a new Queue.
def initialize( uri, name, topic_name = nil )
super uri, name
def initialize( client, name, topic_name = nil )
super( client, name )
@topic_name = topic_name
@open_messages = {}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/qup/adapter/redis/topic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def publisher
def subscriber(name)
subscriber_name = "#{@name}.#{name}"
@client.sadd @name, subscriber_name
queue = ::Qup::Adapter::Redis::Queue.new(@uri, subscriber_name, @name)
queue = ::Qup::Adapter::Redis::Queue.new(@client, subscriber_name, @name)
::Qup::Subscriber.new( self, queue )
end

Expand Down Expand Up @@ -77,7 +77,7 @@ def subscriber_names
def subscribers
subs = {}
subscriber_names.each do |sname|
subs[sname] = ::Qup::Adapter::Redis::Queue.new(@uri, sname, @name)
subs[sname] = ::Qup::Adapter::Redis::Queue.new(@client, sname, @name)
end
return subs
end
Expand Down
4 changes: 2 additions & 2 deletions spec/qup/adapter/redis/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

describe "#destroy" do
it "removes its name from the parent topic's subscriber set" do
redis.smembers("parent").should be == ["test"]
expect( redis.smembers("parent") ).to eq ["test"]
queue.destroy
redis.smembers("parent").should be == []
expect( redis.smembers("parent") ).to eq []
end
end

Expand Down
4 changes: 2 additions & 2 deletions spec/qup/adapter/redis/topic_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
end

it "unregisters itself from the Topic when unsubscribed" do
lambda do
expect {
subscriber.unsubscribe
end.should change(topic, :subscriber_count).by(-1)
}.to change(topic, :subscriber_count).by(-1)
end
end
end
10 changes: 7 additions & 3 deletions spec/qup/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Qup::AdapterTest < Qup::Adapter

describe 'Adapter Registration' do
it 'registers an adapter' do
Qup::Adapters['quptest'].should eq Qup::AdapterTest
expect( Qup::Adapters['quptest'] ).to eq Qup::AdapterTest
end
end

Expand All @@ -15,13 +15,17 @@ class Qup::AdapterTest < Qup::Adapter

%w[ close closed? ].each do |method|
it "##{method} kaboom!" do
lambda { api.send( method ) }.should raise_error( NotImplementedError, "please implement '#{method}'" )
expect {
api.send( method )
}.to raise_error( NotImplementedError, "please implement '#{method}'" )
end
end

%w[ queue topic ].each do |method|
it "##{method} kaboom!" do
lambda { api.send( method, 'foo' ) }.should raise_error( NotImplementedError, "please implement '#{method}'" )
expect {
api.send( method, 'foo' )
}.to raise_error( NotImplementedError, "please implement '#{method}'" )
end
end

Expand Down
28 changes: 14 additions & 14 deletions spec/qup/backoff_sleeper_sleeper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,43 @@

module Qup
describe BackoffSleeper do
before { Kernel.stub(:sleep) }
before { allow(Kernel).to receive_messages(sleep: nil) }

describe "#length" do
it "it returns the multiplier averaged with the multiplier multiplied by rand" do
Kernel.stub(:rand).with().and_return(0.5)
expect(Kernel).to receive(:rand).with(no_args).and_return(0.5)

sleeper = BackoffSleeper.new
sleeper.stub(:multiplier => 1)
allow(sleeper).to receive_messages(multiplier: 1)

sleeper.length.should be == ((1 + (1 * 0.5)) / 2)
expect( sleeper.length ).to eq( (1 + (1 * 0.5) ) / 2)
end
end

describe "#tick" do
it "starts count at 0" do
subject.count.should be == 0
expect( subject.count ).to eq 0
end

it "increments count by 1 everytime it's called" do
subject.tick
subject.count.should be == 1
expect( subject.count ).to eq 1

subject.tick
subject.count.should be == 2
expect( subject.count ).to eq 2
end

it "sleeps for #length if length is > 0" do
Kernel.should_receive(:sleep).with(0.123)
expect( Kernel ).to receive(:sleep).with(0.123)

subject.stub(:length => 0.123)
allow( subject ).to receive_messages(length: 0.123)
subject.tick
end

it "doesn't call sleep if the length is 0" do
Kernel.should_not_receive(:sleep).with(0)
expect( Kernel ).to_not receive(:sleep).with(0)

subject.stub(:length => 0)
allow( subject).to receive_messages(length: 0)
subject.tick
end
end
Expand All @@ -52,7 +52,7 @@ module Qup

describe "#multiplier" do
it "starts at 0" do
subject.multiplier.should be == 0
expect( subject.multiplier ).to eq 0
end

it "backs off exponentially" do
Expand All @@ -61,12 +61,12 @@ module Qup
subject.multiplier
end

multipliers.should be == [0.01, 0.1, 1]
expect( multipliers ).to eq [0.01, 0.1, 1]
end

it "maxes out at the last value of MULTIPLIERS" do
100.times { subject.tick }
subject.multiplier.should == BackoffSleeper::MULTIPLIERS.last
expect( subject.multiplier ).to eq BackoffSleeper::MULTIPLIERS.last
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions spec/qup/batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def messages
:session_options => session_options,
:queue_uri => "maildir://#{Dir.mktmpdir}"
})
batch_consumer.session.options.should == session_options
expect( batch_consumer.session.options ).to eq session_options
end
end

Expand Down Expand Up @@ -81,7 +81,7 @@ def process(*)
})

batch_consumer.run
client.messages.should == ["A", "B"]
expect( client.messages ).to eq ["A", "B"]
end

it "returns when max_age is met" do
Expand Down Expand Up @@ -112,7 +112,7 @@ def process(*)
})

batch_consumer.run
client.messages.should == ["A"]
expect( client.messages ).to eq ["A"]

end

Expand All @@ -129,9 +129,9 @@ def process(*)
:queue_name => queue_name
})

client.should_receive(:setup).once.ordered
client.should_receive(:process).once.ordered
client.should_receive(:teardown).once.ordered
expect( client ).to receive(:setup).once.ordered
expect( client ).to receive(:process).once.ordered
expect( client ).to receive(:teardown).once.ordered

batch_consumer.run
end
Expand Down
20 changes: 10 additions & 10 deletions spec/qup/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,31 @@

it "consumes an item from the queue" do
msg = consumer.consume
msg.data.should eq 'consumption'
expect( msg.data ).to eq 'consumption'
queue.acknowledge msg
queue.depth.should eq 0
expect( queue.depth ).to eq 0
end

it "acknowledges messages it has consumed" do
msg = consumer.consume
msg.data.should eq 'consumption'
queue.depth.should eq 1
expect( msg.data ).to eq 'consumption'
expect( queue.depth ).to eq 1
consumer.acknowledge( msg )
queue.depth.should eq 0
expect( queue.depth ).to eq 0
end

it "consumes auto-acknowledges msgs in a block" do
consumer.consume do |msg|
msg.data.should eq 'consumption'
expect( msg.data ).to eq 'consumption'
end
queue.depth.should eq 0
expect( queue.depth ).to eq 0
end

it "knows how deep the consumer's queue is" do
consumer.depth.should eq 1
expect( consumer.depth ).to eq 1
consumer.consume do |msg|
msg.data.should eq 'consumption'
expect( msg.data ).to eq 'consumption'
end
queue.depth.should eq 0
expect( queue.depth ).to eq 0
end
end
4 changes: 2 additions & 2 deletions spec/qup/message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
let( :message ) { Qup::Message.new( "my unique key", "some data" ) }

it "has a key" do
message.key.should == 'my unique key'
expect( message.key ).to eq 'my unique key'
end

it 'has data' do
message.data.should == 'some data'
expect( message.data ).to eq 'some data'
end
end
4 changes: 2 additions & 2 deletions spec/qup/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
end

it "produces items onto the queue" do
queue.depth.should eq 0
expect( queue.depth ).to eq 0
producer.produce( 'production' )
queue.depth.should eq 1
expect( queue.depth ).to eq 1
end
end
8 changes: 6 additions & 2 deletions spec/qup/queue_api_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ class Qup::QueueAPITest

%w[ name depth flush destroy consume ].each do |method|
it "##{method} kaboom!" do
lambda { api.send( method ) }.should raise_error( NotImplementedError, "please implement '#{method}'" )
expect {
api.send( method )
}.to raise_error( NotImplementedError, "please implement '#{method}'" )
end
end

%w[ produce acknowledge ].each do |method|
it "##{method} kaboom!" do
lambda { api.send( method, nil ) }.should raise_error( NotImplementedError, "please implement '#{method}'" )
expect {
api.send( method, nil )
}.to raise_error( NotImplementedError, "please implement '#{method}'" )
end
end
end
Loading