This repository has been archived by the owner on Jan 10, 2024. It is now read-only.
forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 1
/
broker_pool_spec.rb
98 lines (77 loc) · 3.6 KB
/
broker_pool_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# frozen_string_literal: true
describe Kafka::BrokerPool do
let(:connection_builder) { double('connection_builder') }
let(:connection) { double('connection') }
let(:logger) { LOGGER }
let(:broker_pool) do
described_class.new(
connection_builder: connection_builder,
logger: logger,
)
end
let(:broker) { double('broker') }
let(:host) { "localhost" }
let(:port) { 9092 }
let(:node_id) { 0 }
before do
allow(Kafka::Broker).to receive(:new).and_call_original
allow(connection_builder).to receive(:build_connection).with(host, port) { connection }
end
describe "#connect" do
it "creates a new broker everytime it is called with node_id nil" do
# Call without node_id the first time.
allow(Kafka::Broker).to receive(:new).once { broker }
expect(broker_pool.connect(host, port)).to eq(broker)
# Call without node_id the second time returns new broker.
second_broker = double()
allow(Kafka::Broker).to receive(:new).once { second_broker }
expect(broker_pool.connect(host, port)).to eq(second_broker)
end
it "creates a new broker the first time it is called with a particular node_id" do
allow(Kafka::Broker).to receive(:new).once { broker }
expect(broker_pool.connect(host, port, node_id: node_id)).to eq(broker)
end
it "does not create a new broker if address & node_id match" do
allow(broker).to receive(:address_match?).with(host, port) { true }
allow(Kafka::Broker).to receive(:new).once { broker }
expect(broker_pool.connect(host, port, node_id: node_id)).to eq(broker)
expect(broker_pool.connect(host, port, node_id: node_id)).to eq(broker)
end
it "disconnects existing broker if new broker address does not match pre-existing broker address for a given node_id" do
allow(broker).to receive(:address_match?).with("#{host}1", "#{port}1") { false }
allow(broker).to receive(:disconnect).once
allow(Kafka::Broker).to receive(:new).once { broker }
expect(broker_pool.connect(host, port, node_id: node_id)).to eq(broker)
new_host = "#{host}1"
new_port = "#{port}1"
allow(connection_builder).to receive(:build_connection).with(new_host, new_port) { connection }
allow(Kafka::Broker).to receive(:new).once
broker_pool.connect(new_host, new_port, node_id: node_id)
expect(broker).to have_received(:disconnect)
end
it "creates a new broker if address does not match pre-existing broker address for a given node_id" do
allow(broker).to receive(:address_match?).with("#{host}1", "#{port}1") { false }
allow(broker).to receive(:disconnect).once
allow(Kafka::Broker).to receive(:new).once { broker }
new_host = "#{host}1"
new_port = "#{port}1"
allow(connection_builder).to receive(:build_connection).with(new_host, new_port) { connection }
second_broker = double()
allow(Kafka::Broker).to receive(:new).once { second_broker }
actual_broker = broker_pool.connect(new_host, new_port, node_id: node_id)
expect(actual_broker).to eq(second_broker)
end
end
describe "#close" do
it "disconnects all broker connections" do
broker1 = broker_pool.connect(host, port, node_id: node_id)
allow(connection_builder).to receive(:build_connection).with("new.host", port) { connection }
broker2 = broker_pool.connect("new.host", port, node_id: 1)
allow(broker1).to receive(:disconnect)
allow(broker2).to receive(:disconnect)
broker_pool.close
expect(broker1).to have_received(:disconnect)
expect(broker2).to have_received(:disconnect)
end
end
end