From a3abb5a128f75e3c75e4a38026bdf81ada31b42f Mon Sep 17 00:00:00 2001 From: marlena-b Date: Fri, 27 Sep 2024 12:11:08 +0200 Subject: [PATCH] Limit automatic retries to one --- ecommerce/pricing/lib/pricing/services.rb | 18 +++++----- .../processes/order_item_invoicing_process.rb | 22 ++++++------ .../lib/processes/release_payment_process.rb | 22 ++++++------ .../lib/processes/reservation_process.rb | 13 ++++--- .../lib/processes/shipment_process.rb | 22 ++++++------ .../lib/processes/three_plus_one_free.rb | 19 +++++----- infra/lib/infra.rb | 1 + infra/lib/infra/retry.rb | 9 +++++ infra/test/retry_test.rb | 36 +++++++++++++++++++ .../read_models/single_table_read_model.rb | 26 +++++++------- 10 files changed, 124 insertions(+), 64 deletions(-) create mode 100644 infra/lib/infra/retry.rb create mode 100644 infra/test/retry_test.rb diff --git a/ecommerce/pricing/lib/pricing/services.rb b/ecommerce/pricing/lib/pricing/services.rb index 7c1128e12..bfd29ebbc 100644 --- a/ecommerce/pricing/lib/pricing/services.rb +++ b/ecommerce/pricing/lib/pricing/services.rb @@ -114,27 +114,29 @@ def call(command) end class OnCalculateTotalValue + include Infra::Retry + def initialize(event_store) @repository = Infra::AggregateRootRepository.new(event_store) @event_store = event_store end def call(command) - @repository.with_aggregate(Offer, command.aggregate_id) do |order| - order.calculate_total_value(PricingCatalog.new(@event_store), time_promotions_discount) + with_retry do + @repository.with_aggregate(Offer, command.aggregate_id) do |order| + order.calculate_total_value(PricingCatalog.new(@event_store), time_promotions_discount) + end end - rescue RubyEventStore::WrongExpectedEventVersion - retry end def calculate_sub_amounts(command) - @repository.with_aggregate(Offer, command.aggregate_id) do |order| - order.calculate_sub_amounts(PricingCatalog.new(@event_store), time_promotions_discount) + with_retry do + @repository.with_aggregate(Offer, command.aggregate_id) do |order| + order.calculate_sub_amounts(PricingCatalog.new(@event_store), time_promotions_discount) + end end - rescue RubyEventStore::WrongExpectedEventVersion - retry end private diff --git a/ecommerce/processes/lib/processes/order_item_invoicing_process.rb b/ecommerce/processes/lib/processes/order_item_invoicing_process.rb index d03b1a0a1..b4454ca00 100644 --- a/ecommerce/processes/lib/processes/order_item_invoicing_process.rb +++ b/ecommerce/processes/lib/processes/order_item_invoicing_process.rb @@ -1,5 +1,7 @@ module Processes class OrderItemInvoicingProcess + include Infra::Retry + def initialize(event_store, command_bus) @event_store = event_store @command_bus = command_bus @@ -26,16 +28,16 @@ def call(event) attr_reader :event_store, :command_bus def build_state(event) - stream_name = "OrderInvoicingProcess$#{event.data.fetch(:order_id)}$#{event.data.fetch(:product_id)}" - past = event_store.read.stream(stream_name).to_a - last_stored = past.size - 1 - event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) - ProcessState.new.tap do |state| - past.each { |ev| state.call(ev) } - state.call(event) + with_retry do + stream_name = "OrderInvoicingProcess$#{event.data.fetch(:order_id)}$#{event.data.fetch(:product_id)}" + past = event_store.read.stream(stream_name).to_a + last_stored = past.size - 1 + event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) + ProcessState.new.tap do |state| + past.each { |ev| state.call(ev) } + state.call(event) + end end - rescue RubyEventStore::WrongExpectedEventVersion - retry end class ProcessState @@ -84,4 +86,4 @@ def call distributed_amounts end end -end \ No newline at end of file +end diff --git a/ecommerce/processes/lib/processes/release_payment_process.rb b/ecommerce/processes/lib/processes/release_payment_process.rb index e85a338b5..fb13a72db 100644 --- a/ecommerce/processes/lib/processes/release_payment_process.rb +++ b/ecommerce/processes/lib/processes/release_payment_process.rb @@ -1,5 +1,7 @@ module Processes class ReleasePaymentProcess + include Infra::Retry + def initialize(event_store, command_bus) @event_store = event_store @command_bus = command_bus @@ -19,16 +21,16 @@ def release_payment(state) attr_reader :command_bus, :event_store def build_state(event) - stream_name = "PaymentProcess$#{event.data.fetch(:order_id)}" - past_events = event_store.read.stream(stream_name).to_a - last_stored = past_events.size - 1 - event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) - ProcessState.new.tap do |state| - past_events.each { |ev| state.call(ev) } - state.call(event) + with_retry do + stream_name = "PaymentProcess$#{event.data.fetch(:order_id)}" + past_events = event_store.read.stream(stream_name).to_a + last_stored = past_events.size - 1 + event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) + ProcessState.new.tap do |state| + past_events.each { |ev| state.call(ev) } + state.call(event) + end end - rescue RubyEventStore::WrongExpectedEventVersion - retry end class ProcessState @@ -60,4 +62,4 @@ def release? end end end -end \ No newline at end of file +end diff --git a/ecommerce/processes/lib/processes/reservation_process.rb b/ecommerce/processes/lib/processes/reservation_process.rb index c3377cba6..946ed9678 100644 --- a/ecommerce/processes/lib/processes/reservation_process.rb +++ b/ecommerce/processes/lib/processes/reservation_process.rb @@ -1,5 +1,7 @@ module Processes class ReservationProcess + include Infra::Retry + def initialize @event_store = Configuration.event_store @command_bus = Configuration.command_bus @@ -72,12 +74,13 @@ def stream_name(order_id) def build_state(event) stream_name = stream_name(event.data.fetch(:order_id)) + past_events = nil begin - past_events = event_store.read.stream(stream_name).to_a - last_stored = past_events.size - 1 - event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) - rescue RubyEventStore::WrongExpectedEventVersion - retry + with_retry do + past_events = event_store.read.stream(stream_name).to_a + last_stored = past_events.size - 1 + event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) + end rescue RubyEventStore::EventDuplicatedInStream return end diff --git a/ecommerce/processes/lib/processes/shipment_process.rb b/ecommerce/processes/lib/processes/shipment_process.rb index 0a04dae74..b6d5bbc96 100644 --- a/ecommerce/processes/lib/processes/shipment_process.rb +++ b/ecommerce/processes/lib/processes/shipment_process.rb @@ -1,5 +1,7 @@ module Processes class ShipmentProcess + include Infra::Retry + def initialize(event_store, command_bus) @event_store = event_store @command_bus = command_bus @@ -33,16 +35,16 @@ def authorize_shipment(state) attr_reader :command_bus, :event_store def build_state(event) - stream_name = "ShipmentProcess$#{event.data.fetch(:order_id)}" - past_events = event_store.read.stream(stream_name).to_a - last_stored = past_events.size - 1 - event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) - ProcessState.new.tap do |state| - past_events.each { |ev| state.call(ev) } - state.call(event) + with_retry do + stream_name = "ShipmentProcess$#{event.data.fetch(:order_id)}" + past_events = event_store.read.stream(stream_name).to_a + last_stored = past_events.size - 1 + event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) + ProcessState.new.tap do |state| + past_events.each { |ev| state.call(ev) } + state.call(event) + end end - rescue RubyEventStore::WrongExpectedEventVersion - retry end class ProcessState @@ -78,4 +80,4 @@ def authorize? end end end -end \ No newline at end of file +end diff --git a/ecommerce/processes/lib/processes/three_plus_one_free.rb b/ecommerce/processes/lib/processes/three_plus_one_free.rb index 5e85a0027..df6d7b7ca 100644 --- a/ecommerce/processes/lib/processes/three_plus_one_free.rb +++ b/ecommerce/processes/lib/processes/three_plus_one_free.rb @@ -1,5 +1,6 @@ module Processes class ThreePlusOneFree + include Infra::Retry def initialize(event_store, command_bus) @event_store = event_store @@ -25,16 +26,16 @@ def call(event) private def build_state(event) - stream_name = "ThreePlusOneFreeProcess$#{event.data.fetch(:order_id)}" - past_events = @event_store.read.stream(stream_name).to_a - last_stored = past_events.size - 1 - @event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) - ProcessState.new(event.data.fetch(:order_id)).tap do |state| - past_events.each { |ev| state.call(ev) } - state.call(event) + with_retry do + stream_name = "ThreePlusOneFreeProcess$#{event.data.fetch(:order_id)}" + past_events = @event_store.read.stream(stream_name).to_a + last_stored = past_events.size - 1 + @event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) + ProcessState.new(event.data.fetch(:order_id)).tap do |state| + past_events.each { |ev| state.call(ev) } + state.call(event) + end end - rescue RubyEventStore::WrongExpectedEventVersion - retry end def make_or_remove_free_product(state) diff --git a/infra/lib/infra.rb b/infra/lib/infra.rb index 8e9d950dc..e93a67e2f 100644 --- a/infra/lib/infra.rb +++ b/infra/lib/infra.rb @@ -14,5 +14,6 @@ require_relative "infra/event" require_relative "infra/event_store" require_relative "infra/process" +require_relative "infra/retry" require_relative "infra/types" require_relative "infra/testing" diff --git a/infra/lib/infra/retry.rb b/infra/lib/infra/retry.rb new file mode 100644 index 000000000..c1247ac5e --- /dev/null +++ b/infra/lib/infra/retry.rb @@ -0,0 +1,9 @@ +module Infra + module Retry + def with_retry + yield + rescue RubyEventStore::WrongExpectedEventVersion + yield + end + end +end diff --git a/infra/test/retry_test.rb b/infra/test/retry_test.rb new file mode 100644 index 000000000..7c17b343a --- /dev/null +++ b/infra/test/retry_test.rb @@ -0,0 +1,36 @@ +require_relative "test_helper" + +module Infra + class RetryTest < Minitest::Test + cover "Infra::Retry" + + include Infra::Retry + + def test_no_error + result = with_retry { true } + assert result + end + + def test_retries_once + attempts = 0 + with_retry do + attempts += 1 + raise RubyEventStore::WrongExpectedEventVersion if attempts == 1 + end + + assert_equal 2, attempts + end + + def test_fails_after_two_attempts + attempts = 0 + assert_raises RubyEventStore::WrongExpectedEventVersion do + with_retry do + attempts += 1 + raise RubyEventStore::WrongExpectedEventVersion + end + end + + assert_equal 2, attempts + end + end +end diff --git a/rails_application/app/read_models/single_table_read_model.rb b/rails_application/app/read_models/single_table_read_model.rb index dafe81310..d1efffc61 100644 --- a/rails_application/app/read_models/single_table_read_model.rb +++ b/rails_application/app/read_models/single_table_read_model.rb @@ -47,6 +47,8 @@ def copy_handler(event, sequence_of_keys, column) end class ReadModelHandler + include Infra::Retry + def initialize(*args) if args.present? @event_store = args[0] @@ -64,18 +66,18 @@ def concurrent_safely(event) stream_name = "#{active_record_name}$#{record_id(event)}$#{event.event_type}" read_scope = event_store.read.as_at.stream(stream_name) begin - last_event = read_scope.last - return if last_event && last_event.timestamp > event.timestamp - ApplicationRecord.with_advisory_lock(active_record_name, record_id(event)) do - yield - event_store.link( - event.event_id, - stream_name: stream_name, - expected_version: last_event ? read_scope.to(last_event.event_id).count : -1 - ) + with_retry do + last_event = read_scope.last + return if last_event && last_event.timestamp > event.timestamp + ApplicationRecord.with_advisory_lock(active_record_name, record_id(event)) do + yield + event_store.link( + event.event_id, + stream_name: stream_name, + expected_version: last_event ? read_scope.to(last_event.event_id).count : -1 + ) + end end - rescue RubyEventStore::WrongExpectedEventVersion - retry rescue RubyEventStore::EventDuplicatedInStream end end @@ -114,4 +116,4 @@ def call(event) private attr_reader :sequence_of_keys, :column -end \ No newline at end of file +end