Skip to content

Commit

Permalink
add idempotent? flag (#492)
Browse files Browse the repository at this point in the history
* add idempotent? flag

* fix typo
  • Loading branch information
mensfeld authored May 29, 2024
1 parent 5a38054 commit 6a2dcb5
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# WaterDrop changelog

## 2.7.3 (Unreleased)
- [Enhancement] Provide `#idempotent?` similar to `#transactional?`.
- [Enhancement] Provide alias to `#with` named `#variant`.

## 2.7.2 (2024-05-09)
Expand Down
33 changes: 21 additions & 12 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,27 @@ def purge
end
end

# Builds the variant alteration and returns it.
#
# @param args [Object] anything `Producer::Variant` initializer accepts
# @return [WaterDrop::Producer::Variant] variant proxy to use with alterations
def with(**args)
ensure_active!

Variant.new(self, **args)
end

alias variant with

# @return [Boolean] true if current producer is idempotent
def idempotent?
# Every transactional producer is idempotent by default always
return true if transactional?
return @idempotent if instance_variable_defined?(:'@idempotent')

@idempotent = config.kafka.to_h.key?(:'enable.idempotence')
end

# Flushes the buffers in a sync way and closes the producer
# @param force [Boolean] should we force closing even with outstanding messages after the
# max wait timeout
Expand Down Expand Up @@ -210,18 +231,6 @@ def close(force: false)
end
end

# Builds the variant alteration and returns it.
#
# @param args [Object] anything `Producer::Variant` initializer accepts
# @return [WaterDrop::Producer::Variant] variant proxy to use with alterations
def with(**args)
ensure_active!

Variant.new(self, **args)
end

alias variant with

# Closes the producer with forced close after timeout, purging any outgoing data
def close!
close(force: true)
Expand Down
10 changes: 10 additions & 0 deletions spec/lib/waterdrop/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,15 @@
.not_to raise_error
end
end

context 'when we try to create and use transactional producer without idempotence' do
subject(:producer) { build(:transactional_producer, idempotent: false) }

it 'expect not to allow it' do
expect do
producer.produce_sync(topic: 'test', payload: 'test')
end.to raise_error(Rdkafka::Config::ClientCreationError)
end
end
end
end
20 changes: 20 additions & 0 deletions spec/lib/waterdrop/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,26 @@ def on_oauthbearer_token_refresh(_); end
end
end

describe '#idempotent?' do
context 'when producer is transactional' do
subject(:producer) { create(:transactional_producer) }

it { expect(producer.idempotent?).to eq(true) }
end

context 'when it is a regular producer' do
subject(:producer) { create(:producer) }

it { expect(producer.idempotent?).to eq(false) }
end

context 'when it is an idempotent producer' do
subject(:producer) { create(:idempotent_producer) }

it { expect(producer.idempotent?).to eq(true) }
end
end

describe '#close' do
before { allow(producer).to receive(:client).and_call_original }

Expand Down
4 changes: 3 additions & 1 deletion spec/support/factories/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
transactional_id { SecureRandom.uuid }
transaction_timeout_ms { 30_000 }
request_required_acks { 'all' }
idempotent { true }
end

kafka do
Expand All @@ -48,7 +49,8 @@
'request.required.acks': request_required_acks,
'transactional.id': transactional_id,
'transaction.timeout.ms': transaction_timeout_ms,
'message.timeout.ms': transaction_timeout_ms
'message.timeout.ms': transaction_timeout_ms,
'enable.idempotence': idempotent
}
end
end
Expand Down

0 comments on commit 6a2dcb5

Please sign in to comment.