This repository has been archived by the owner on Jan 10, 2024. It is now read-only.
forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 1
/
async_producer_spec.rb
104 lines (80 loc) · 3.15 KB
/
async_producer_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# frozen_string_literal: true
class FakeInstrumenter
Metric = Struct.new(:name, :payload)
def initialize
@metrics = Hash.new {|h, k| h[k] = [] }
end
def metrics_for(name)
@metrics[name]
end
def instrument(name, payload = {})
yield if block_given?
@metrics[name] << Metric.new(name, payload)
end
end
describe Kafka::AsyncProducer do
let(:sync_producer) { double(:sync_producer, produce: nil, shutdown: nil, deliver_messages: nil) }
let(:log) { StringIO.new }
let(:logger) { Logger.new(log) }
let(:instrumenter) { FakeInstrumenter.new }
let(:async_producer) {
Kafka::AsyncProducer.new(
sync_producer: sync_producer,
instrumenter: instrumenter,
max_retries: 2,
retry_backoff: 0.2,
logger: logger,
)
}
describe "#deliver_messages" do
it "instruments the error after failing to deliver buffered messages" do
allow(sync_producer).to receive(:buffer_size) { 42 }
allow(sync_producer).to receive(:deliver_messages) { raise Kafka::DeliveryFailed.new("something happened", []) }
async_producer.produce("hello", topic: "greetings")
async_producer.deliver_messages
sleep 0.2 # wait for worker to call deliver_messages
async_producer.shutdown
metric = instrumenter.metrics_for("error.async_producer").first
expect(metric.payload[:error]).to be_a(Kafka::DeliveryFailed)
end
end
describe "#shutdown" do
it "delivers buffered messages" do
async_producer.produce("hello", topic: "greetings")
async_producer.shutdown
expect(sync_producer).to have_received(:deliver_messages)
end
it "instruments a failure to deliver buffered messages" do
allow(sync_producer).to receive(:buffer_size) { 42 }
allow(sync_producer).to receive(:deliver_messages) { raise Kafka::Error, "uh-oh!" }
async_producer.produce("hello", topic: "greetings")
async_producer.shutdown
expect(log.string).to include "Failed to deliver messages during shutdown: uh-oh!"
metric = instrumenter.metrics_for("drop_messages.async_producer").first
expect(metric.payload[:message_count]).to eq 42
end
end
describe "#produce" do
it "delivers buffered messages" do
async_producer.produce("hello", topic: "greetings")
sleep 0.2 # wait for worker to call produce
expect(sync_producer).to have_received(:produce)
async_producer.shutdown
end
it "retries until configured max_retries" do
allow(sync_producer).to receive(:produce) {raise Kafka::BufferOverflow}
async_producer.produce("hello", topic: "greetings")
sleep 0.3 # wait for all retries to be done
expect(log.string).to include "Failed to asynchronously produce messages due to BufferOverflow"
metric = instrumenter.metrics_for("error.async_producer").first
expect(metric.payload[:error]).to be_a(Kafka::BufferOverflow)
expect(sync_producer).to have_received(:produce).exactly(3).times
async_producer.shutdown
end
it "requires `topic` to be a String" do
expect {
async_producer.produce("hello", topic: :topic)
}.to raise_exception(NoMethodError, /to_str/)
end
end
end