This repository has been archived by the owner on Jan 10, 2024. It is now read-only.
forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 1
/
message_buffer_spec.rb
109 lines (80 loc) · 3.08 KB
/
message_buffer_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# frozen_string_literal: true
describe Kafka::MessageBuffer do
let(:buffer) { Kafka::MessageBuffer.new }
describe "#concat" do
it "adds the messages to the buffer" do
buffer.concat(["foo"], topic: "foo", partition: 0)
expect(buffer.size).to eq 1
expect(buffer.bytesize).to eq 3
end
it "handles empty arrays" do
buffer.concat([], topic: "foo", partition: 0)
expect(buffer.size).to eq 0
expect(buffer.bytesize).to eq 0
end
end
describe "#clear_messages" do
it "clears messages for the given topic and partition" do
buffer.concat(["yolo"], topic: "x", partition: 0)
buffer.clear_messages(topic: "x", partition: 0)
expect(buffer.size).to eq 0
end
it "handles clearing a topic that's not in the buffer" do
buffer.clear_messages(topic: "x", partition: 0)
expect(buffer.size).to eq 0
end
it "handles clearing a partition that's not in the buffer" do
buffer.concat(["yolo"], topic: "x", partition: 0)
buffer.clear_messages(topic: "x", partition: 1)
expect(buffer.size).to eq 1
end
end
describe "#size" do
it "returns the number of messages in the buffer" do
buffer.concat(["a", "b", "c"], topic: "bar", partition: 3)
buffer.concat(["a", "b", "c"], topic: "bar", partition: 1)
expect(buffer.size).to eq 6
end
it "keeps track of how many messages have been cleared" do
buffer.concat(["a", "b", "c"], topic: "bar", partition: 3)
buffer.concat(["a", "b", "c"], topic: "bar", partition: 1)
buffer.clear_messages(topic: "bar", partition: 3)
expect(buffer.size).to eq 3
end
it "buffers messages quickly", performance: true do
num_topics = 20
num_partitions = 20
num_messages = 10_000
(1...num_messages).each do |i|
topic = num_topics % i
partition = num_partitions % i
buffer.write("hello", topic: topic, partition: partition)
end
expect { buffer.size }.to perform_at_least(10000).ips
end
end
describe "#bytesize" do
it "returns the bytesize of the messages in the buffer" do
buffer.write(value: "foo", key: "bar", topic: "yolos", partition: 1)
buffer.write(value: "baz", key: "bim", topic: "yolos", partition: 1)
expect(buffer.bytesize).to eq 12
end
it "keeps track of concatenations" do
message = Kafka::Protocol::Message.new(value: "baz", key: "bim")
buffer.write(value: "foo", key: "bar", topic: "yolos", partition: 1)
buffer.concat([message], topic: "yolos", partition: 1)
expect(buffer.bytesize).to eq 12
end
it "keeps track of when messages are cleared" do
buffer.write(value: "foo", key: "bar", topic: "yolos", partition: 1)
buffer.write(value: "baz", key: "bim", topic: "yolos", partition: 2)
buffer.clear_messages(topic: "yolos", partition: 1)
expect(buffer.bytesize).to eq 6
end
it "is reset when #clear is called" do
buffer.write(value: "baz", key: "bim", topic: "yolos", partition: 2)
buffer.clear
expect(buffer.bytesize).to eq 0
end
end
end