This repository has been archived by the owner on Jan 10, 2024. It is now read-only.
forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconnection_spec.rb
116 lines (88 loc) · 3.34 KB
/
connection_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# frozen_string_literal: true
require 'fake_server'
describe Kafka::Connection do
let(:logger) { LOGGER }
let(:host) { "127.0.0.1" }
let(:server) { TCPServer.new(host, 0) }
let(:port) { server.addr[1] }
let(:connection) {
Kafka::Connection.new(
host: host,
port: port,
client_id: "test",
logger: logger,
instrumenter: Kafka::Instrumenter.new(client_id: "test"),
connect_timeout: 0.1,
socket_timeout: 0.1,
)
}
let!(:broker) { FakeServer.start(server) }
describe "#send_request" do
let(:api_key) { 0 }
let(:request) { double(:request, api_key: api_key) }
let(:response_decoder) { double(:response_decoder) }
before do
allow(request).to receive(:encode) {|encoder| encoder.write_string("hello!") }
allow(request).to receive(:response_class) { response_decoder }
allow(response_decoder).to receive(:decode) {|decoder|
decoder.string
}
end
it "sends requests to a broker and reads back the response" do
response = connection.send_request(request)
expect(response).to eq "hello!"
end
it "skips responses to previous requests" do
# By passing nil as the final argument we're telling Connection that we're
# not expecting a response, so it won't read one. However, the fake broker
# *is* writing a response, so we'll get that the next time we read a response,
# causing a mismatch. This simulates the client killing the connection due
# to e.g. a timeout, then resuming with a new request -- the old response
# still sits in the connection waiting to be read.
allow(request).to receive(:response_class) { nil }
connection.send_request(request)
allow(request).to receive(:encode) {|encoder| encoder.write_string("goodbye!") }
allow(request).to receive(:response_class) { response_decoder }
response = connection.send_request(request)
expect(response).to eq "goodbye!"
end
it "disconnects on network errors" do
response = connection.send_request(request)
expect(response).to eq "hello!"
broker.kill
# join to avoid race where send_request fires before broker thread ends
broker.join
expect {
connection.send_request(request)
}.to raise_error(Kafka::ConnectionError)
end
it "re-opens the connection after a network error" do
connection.send_request(request)
broker.kill
# join to avoid race where send_request fires before broker thread ends
broker.join
# Connection is torn down
connection.send_request(request) rescue nil
server.close
server = TCPServer.new(host, port)
broker = FakeServer.start(server)
# Connection is re-established
response = connection.send_request(request)
expect(response).to eq "hello!"
end
it "emits a notification" do
events = []
subscriber = proc {|*args|
events << ActiveSupport::Notifications::Event.new(*args)
}
ActiveSupport::Notifications.subscribed(subscriber, "request.connection.kafka") do
connection.send_request(request)
end
expect(events.count).to eq 1
event = events.first
expect(event.payload[:api]).to eq :produce
expect(event.payload[:request_size]).to eq 22
expect(event.payload[:response_size]).to eq 12
end
end
end