This repository has been archived by the owner on Jan 10, 2024. It is now read-only.
forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcluster_spec.rb
149 lines (128 loc) · 4.2 KB
/
cluster_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# frozen_string_literal: true
describe Kafka::Cluster do
let(:broker) { double(:broker) }
let(:broker_pool) { double(:broker_pool) }
let(:cluster) {
Kafka::Cluster.new(
seed_brokers: [URI("kafka://test1:9092")],
broker_pool: broker_pool,
logger: LOGGER,
)
}
describe "#get_leader" do
before do
allow(broker_pool).to receive(:connect) { broker }
allow(broker).to receive(:disconnect)
end
it "raises LeaderNotAvailable if there's no leader for the partition" do
metadata = Kafka::Protocol::MetadataResponse.new(
brokers: [
Kafka::BrokerInfo.new(
node_id: 42,
host: "test1",
port: 9092,
)
],
controller_id: 42,
topics: [
Kafka::Protocol::MetadataResponse::TopicMetadata.new(
topic_name: "greetings",
partitions: [
Kafka::Protocol::MetadataResponse::PartitionMetadata.new(
partition_id: 42,
leader: 2,
partition_error_code: 5, # <-- this is the important bit.
)
]
)
],
)
allow(broker).to receive(:fetch_metadata) { metadata }
expect {
cluster.get_leader("greetings", 42)
}.to raise_error Kafka::LeaderNotAvailable
end
it "raises InvalidTopic if the topic is invalid" do
metadata = Kafka::Protocol::MetadataResponse.new(
brokers: [
Kafka::BrokerInfo.new(
node_id: 42,
host: "test1",
port: 9092,
)
],
controller_id: 42,
topics: [
Kafka::Protocol::MetadataResponse::TopicMetadata.new(
topic_name: "greetings",
topic_error_code: 17, # <-- this is the important bit.
partitions: []
)
],
)
allow(broker).to receive(:fetch_metadata) { metadata }
expect {
cluster.get_leader("greetings", 42)
}.to raise_error Kafka::InvalidTopic
end
it "raises ConnectionError if unable to connect to any of the seed brokers" do
cluster = Kafka::Cluster.new(
seed_brokers: [URI("kafka://not-there:9092"), URI("kafka://not-here:9092")],
broker_pool: broker_pool,
logger: LOGGER,
)
allow(broker_pool).to receive(:connect).and_raise(Kafka::ConnectionError)
expect {
cluster.get_leader("greetings", 42)
}.to raise_exception(Kafka::ConnectionError)
end
end
describe "#add_target_topics" do
it "raises ArgumentError if the topic is nil" do
expect {
cluster.add_target_topics([nil])
}.to raise_exception(ArgumentError)
end
it "raises ArgumentError if the topic is empty" do
expect {
cluster.add_target_topics([""])
}.to raise_exception(ArgumentError)
end
end
describe "#cluster_info" do
let(:cluster) {
Kafka::Cluster.new(
seed_brokers: [URI("kafka://test1:9092")],
broker_pool: broker_pool,
logger: LOGGER,
resolve_seed_brokers: resolve_seed_brokers,
)
}
before do
allow(broker).to receive(:fetch_metadata) { raise Kafka::ConnectionError, "Operation timed out" }
allow(broker).to receive(:disconnect)
end
context "when resolve_seed_brokers is false" do
let(:resolve_seed_brokers) { false }
it "tries the seed broker hostnames as is" do
expect(broker_pool).to receive(:connect).with("test1", 9092) { broker }
expect {
cluster.cluster_info
}.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092: Operation timed out})
end
end
context "when resolve_seed_brokers is true" do
let(:resolve_seed_brokers) { true }
before do
allow(Resolv).to receive(:getaddresses) { ["127.0.0.1", "::1"] }
end
it "tries all the resolved IP addresses" do
expect(broker_pool).to receive(:connect).with("127.0.0.1", 9092) { broker }
expect(broker_pool).to receive(:connect).with("::1", 9092) { broker }
expect {
cluster.cluster_info
}.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092 \(127\.0\.0\.1\): Operation timed out})
end
end
end
end