From 10c7cd1390753805668dcfbfe56816e5d2e44b8e Mon Sep 17 00:00:00 2001 From: Chris Richardson Date: Thu, 11 Aug 2022 13:21:39 +0900 Subject: [PATCH] #76 Send one-way commands/notifications to saga participants - reactive saga --- .../tram/sagas/simpledsl/LocalSagaData.java | 9 - ...eOrderSagaTest.java => LocalSagaTest.java} | 20 +- .../CommandWithDestinationAndType.java | 4 + .../tram/sagas/orchestration/SagaActions.java | 17 +- .../sagas/orchestration/SagaInstance.java | 15 + .../sagas/orchestration/SagaManagerImpl.java | 6 +- .../InvokeReactiveParticipantStepBuilder.java | 13 + .../simpledsl/ReactiveStepBuilder.java | 4 + .../AbstractReactiveLocalSagaTest.java | 51 ++++ .../reactive/simpledsl/LocalSagaData.java | 20 ++ .../reactive/simpledsl/LocalSagaSteps.java | 11 + .../reactive/simpledsl/NotifyCommand.java | 6 + .../reactive/simpledsl/ReactiveLocalSaga.java | 28 ++ .../simpledsl/ReactiveLocalSagaTest.java | 59 ++++ .../ReactiveLocalSagaWithNotification.java | 30 ++ ...ReactiveLocalSagaWithNotificationTest.java | 60 ++++ .../simpledsl/ReleaseCreditCommand.java | 6 + .../simpledsl/ReserveCreditCommand.java | 6 + .../framework/MessageWithDestination.java | 27 ++ .../framework/MultipleCommandsExpected.java | 44 +++ .../ReactiveSagaUnitTestSupport.java | 276 ++++++++++++++++++ .../ReactiveSagaCommandProducer.java | 3 +- .../ReactiveSagaManagerImpl.java | 153 +++++----- 23 files changed, 758 insertions(+), 110 deletions(-) rename eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/{NotificationBasedCreateOrderSagaTest.java => LocalSagaTest.java} (79%) create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/AbstractReactiveLocalSagaTest.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/LocalSagaData.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/LocalSagaSteps.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/NotifyCommand.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSaga.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaTest.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaWithNotification.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaWithNotificationTest.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReleaseCreditCommand.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReserveCreditCommand.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/MessageWithDestination.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/MultipleCommandsExpected.java create mode 100644 eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/ReactiveSagaUnitTestSupport.java diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaData.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaData.java index 64e1201..913df7c 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaData.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaData.java @@ -12,13 +12,4 @@ public CommandWithDestination undo2() { return new CommandWithDestination("participant2", null, new ReleaseCreditCommand()); } - @Override - public int hashCode() { - return 1; - } - - @Override - public boolean equals(Object obj) { - return true; - } } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/NotificationBasedCreateOrderSagaTest.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaTest.java similarity index 79% rename from eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/NotificationBasedCreateOrderSagaTest.java rename to eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaTest.java index 007d782..28db0a8 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/NotificationBasedCreateOrderSagaTest.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaTest.java @@ -4,22 +4,23 @@ import org.junit.Test; import static io.eventuate.tram.sagas.testing.SagaUnitTestSupport.given; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -public class NotificationBasedCreateOrderSagaTest { +public class LocalSagaTest { private LocalSagaSteps steps; @Before - public void setUp() throws Exception { + public void setUp() { steps = mock(LocalSagaSteps.class); } @Test public void shouldExecuteAllStepsSuccessfully() { given(). - saga(new LocalSaga(steps), new LocalSagaData()). + saga(makeSaga(), new LocalSagaData()). expect(). command(new ReserveCreditCommand()). to("participant2"). @@ -32,7 +33,7 @@ public void shouldExecuteAllStepsSuccessfully() { @Test public void shouldRollbackFromStep2() { given(). - saga(new LocalSaga(steps), new LocalSagaData()). + saga(makeSaga(), new LocalSagaData()). expect(). command(new ReserveCreditCommand()). to("participant2"). @@ -49,16 +50,15 @@ public void shouldHandleFailureOfFirstLocalStep() { RuntimeException expectedCreateException = new RuntimeException("Failed local step"); doThrow(expectedCreateException).when(steps).localStep1(data); given(). - saga(new LocalSaga(steps), data). + saga(makeSaga(), data). expectException(expectedCreateException) ; } @Test public void shouldHandleFailureOfLastLocalStep() { - LocalSagaData data = new LocalSagaData(); - doThrow(new RuntimeException()).when(steps).localStep3(data); + doThrow(new RuntimeException()).when(steps).localStep3(any()); given(). - saga(new LocalSaga(steps), data). + saga(makeSaga(), new LocalSagaData()). expect(). command(new ReserveCreditCommand()). to("participant2"). @@ -73,5 +73,9 @@ public void shouldHandleFailureOfLastLocalStep() { ; } + private LocalSaga makeSaga() { + return new LocalSaga(steps); + } + } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/CommandWithDestinationAndType.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/CommandWithDestinationAndType.java index 2d6dfcf..a3ce1dd 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/CommandWithDestinationAndType.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/CommandWithDestinationAndType.java @@ -41,4 +41,8 @@ public CommandWithDestination getCommandWithDestination() { public boolean isNotification() { return notification; } + + boolean isCommand() { + return !isNotification(); + } } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java index 71da176..63eb625 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java @@ -10,11 +10,11 @@ public class SagaActions { private final List commands; private final Optional updatedSagaData; private final Optional updatedState; - private boolean endState; - private boolean compensating; - private boolean local; + private final boolean endState; + private final boolean compensating; + private final boolean local; private Optional localException; - private boolean failed; + private final boolean failed; public SagaActions(List commands, Optional updatedSagaData, @@ -49,15 +49,10 @@ public boolean isCompensating() { return compensating; } - public boolean isLocal() { - return local; + public boolean isReplyExpected() { + return (commands.isEmpty() || commands.stream().anyMatch(CommandWithDestinationAndType::isCommand)) && !local; } - public boolean isAllNotifications() { - return !commands.isEmpty() && commands.stream().allMatch(CommandWithDestinationAndType::isNotification); - } - - public boolean isFailed() { return failed; } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstance.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstance.java index 916c2eb..73e3760 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstance.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstance.java @@ -14,6 +14,21 @@ public class SagaInstance { private Boolean compensating = false; private Boolean failed = false; + @Override + public String toString() { + return "SagaInstance{" + + "sagaType='" + sagaType + '\'' + + ", id='" + id + '\'' + + ", lastRequestId='" + lastRequestId + '\'' + + ", serializedSagaData=" + serializedSagaData + + ", stateName='" + stateName + '\'' + + ", destinationsAndResources=" + destinationsAndResources + + ", endState=" + endState + + ", compensating=" + compensating + + ", failed=" + failed + + '}'; + } + public void setSagaType(String sagaType) { this.sagaType = sagaType; } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java index b6aa584..1d0c3e6 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java @@ -223,10 +223,10 @@ private void processActions(String sagaType, String sagaId, SagaInstance sagaIns sagaInstanceRepository.update(sagaInstance); - if (actions.isAllNotifications() || actions.isLocal()) { - actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions); - } else { + if (actions.isReplyExpected()) { break; + } else { + actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions); } } diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/InvokeReactiveParticipantStepBuilder.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/InvokeReactiveParticipantStepBuilder.java index 33f9590..1902270 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/InvokeReactiveParticipantStepBuilder.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/InvokeReactiveParticipantStepBuilder.java @@ -31,6 +31,13 @@ InvokeReactiveParticipantStepBuilder withAction(Optional> return this; } + InvokeReactiveParticipantStepBuilder withNotificationAction(Optional> participantInvocationPredicate, + Function> notificationAction) { + this.action = Optional.of(new ReactiveParticipantInvocationImpl<>(participantInvocationPredicate, notificationAction, true)); + return this; + } + + InvokeReactiveParticipantStepBuilder withAction(Optional> participantInvocationPredicate, CommandEndpoint commandEndpoint, Function> commandProvider) { @@ -44,6 +51,11 @@ public InvokeReactiveParticipantStepBuilder withCompensation(Function withCompensationNotification(Function> compensation) { + this.compensation = Optional.of(new ReactiveParticipantInvocationImpl<>(Optional.empty(), compensation, true)); + return this; + } + @Override public InvokeReactiveParticipantStepBuilder withCompensation(Predicate compensationPredicate, Function> compensation) { this.compensation = Optional.of(new ReactiveParticipantInvocationImpl<>(Optional.of(compensationPredicate), compensation)); @@ -86,4 +98,5 @@ private void addStep() { parent.addStep(new ReactiveParticipantInvocationStep<>(action, compensation, actionReplyHandlers, compensationReplyHandlers)); } + } diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveStepBuilder.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveStepBuilder.java index 010ff49..9c7f9b0 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveStepBuilder.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveStepBuilder.java @@ -25,6 +25,9 @@ public ReactiveLocalStepBuilder invokeLocal(Function> l public InvokeReactiveParticipantStepBuilder invokeParticipant(Function> action) { return new InvokeReactiveParticipantStepBuilder<>(parent).withAction(Optional.empty(), action); } + public InvokeReactiveParticipantStepBuilder notifyParticipant(Function> action) { + return new InvokeReactiveParticipantStepBuilder<>(parent).withNotificationAction(Optional.empty(), action); + } public InvokeReactiveParticipantStepBuilder invokeParticipant(Predicate participantInvocationPredicate, Function> action) { @@ -65,4 +68,5 @@ public InvokeReactiveParticipantStepBuilder withCompen CommandEndpoint commandEndpoint, Function> commandProvider) { return new InvokeReactiveParticipantStepBuilder<>(parent).withCompensation(compensationPredicate, commandEndpoint, commandProvider); } + } diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/AbstractReactiveLocalSagaTest.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/AbstractReactiveLocalSagaTest.java new file mode 100644 index 0000000..b6bb751 --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/AbstractReactiveLocalSagaTest.java @@ -0,0 +1,51 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import org.junit.Before; +import org.junit.Test; +import reactor.core.publisher.Mono; + +import static io.eventuate.tram.sagas.reactive.simpledsl.framework.ReactiveSagaUnitTestSupport.given; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public abstract class AbstractReactiveLocalSagaTest { + protected LocalSagaSteps steps; + + @Before + public void setUp() { + steps = mock(LocalSagaSteps.class); + + } + + @Test + public void shouldRollbackFromStep2() { + doReturn(Mono.empty()).when(steps).localStep1(any()); + doReturn(Mono.empty()).when(steps).localStep1Compensation(any()); + doReturn(Mono.empty()).when(steps).localStep3(any()); + + given(). + saga(makeSaga(), new LocalSagaData()). + expect(). + command(new ReserveCreditCommand()). + to("participant2"). + andGiven(). + failureReply(). + andGiven(). + expectRolledBack() + ; + } + + @Test + public void shouldHandleFailureOfFirstLocalStep() { + LocalSagaData data = new LocalSagaData(); + RuntimeException expectedCreateException = new RuntimeException("Failed local step"); + doThrow(expectedCreateException).when(steps).localStep1(data); + given(). + saga(makeSaga(), data). + expectException(expectedCreateException) + ; + } + + protected abstract SimpleReactiveSaga makeSaga(); + +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/LocalSagaData.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/LocalSagaData.java new file mode 100644 index 0000000..6f39a7e --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/LocalSagaData.java @@ -0,0 +1,20 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import io.eventuate.tram.commands.consumer.CommandWithDestination; +import reactor.core.publisher.Mono; + +public class LocalSagaData { + + public Mono do2() { + return Mono.just(new CommandWithDestination("participant2", null, new ReserveCreditCommand())); + } + + public Mono undo2() { + return Mono.just(new CommandWithDestination("participant2", null, new ReleaseCreditCommand())); + } + + public Mono notify3() { + return Mono.just(new CommandWithDestination("participant3", null, new NotifyCommand())); + } + +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/LocalSagaSteps.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/LocalSagaSteps.java new file mode 100644 index 0000000..b20f017 --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/LocalSagaSteps.java @@ -0,0 +1,11 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import reactor.core.publisher.Mono; + +public interface LocalSagaSteps { + + Mono localStep1(LocalSagaData data); + Mono localStep1Compensation(LocalSagaData data); + Mono localStep3(LocalSagaData data); + +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/NotifyCommand.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/NotifyCommand.java new file mode 100644 index 0000000..1994afd --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/NotifyCommand.java @@ -0,0 +1,6 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import io.eventuate.tram.commands.common.Command; + +public class NotifyCommand implements Command { +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSaga.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSaga.java new file mode 100644 index 0000000..1750166 --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSaga.java @@ -0,0 +1,28 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import io.eventuate.tram.sagas.reactive.orchestration.ReactiveSagaDefinition; + +public class ReactiveLocalSaga implements SimpleReactiveSaga { + + private ReactiveSagaDefinition sagaDefinition; + + public ReactiveLocalSaga(LocalSagaSteps steps) { + this.sagaDefinition = + step() + .invokeLocal(steps::localStep1) + .withCompensation(steps::localStep1Compensation) + .step() + .invokeParticipant(LocalSagaData::do2) + .withCompensation(LocalSagaData::undo2) + .step() + .invokeLocal(steps::localStep3) + .build(); + } + + + @Override + public ReactiveSagaDefinition getSagaDefinition() { + return this.sagaDefinition; + } + +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaTest.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaTest.java new file mode 100644 index 0000000..e3e34ee --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaTest.java @@ -0,0 +1,59 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import org.junit.Test; +import reactor.core.publisher.Mono; + +import static io.eventuate.tram.sagas.reactive.simpledsl.framework.ReactiveSagaUnitTestSupport.given; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; + +public class ReactiveLocalSagaTest extends AbstractReactiveLocalSagaTest { + + @Override + protected SimpleReactiveSaga makeSaga() { + return new ReactiveLocalSaga(steps); + } + + + @Test + public void shouldExecuteAllStepsSuccessfully() { + doReturn(Mono.empty()).when(steps).localStep1(any()); + doReturn(Mono.empty()).when(steps).localStep1Compensation(any()); + doReturn(Mono.empty()).when(steps).localStep3(any()); + + given(). + saga(makeSaga(), new LocalSagaData()). + expect(). + command(new ReserveCreditCommand()). + to("participant2"). + andGiven(). + successReply() + .expectCompletedSuccessfully() + ; + + verify(steps).localStep3(any()); + } + + @Test + public void shouldHandleFailureOfLastLocalStep() { + doReturn(Mono.empty()).when(steps).localStep1(any()); + doReturn(Mono.empty()).when(steps).localStep1Compensation(any()); + doReturn(Mono.error(new RuntimeException())).when(steps).localStep3(any()); + + given(). + saga(makeSaga(), new LocalSagaData()). + expect(). + command(new ReserveCreditCommand()). + to("participant2"). + andGiven(). + successReply(). + expect(). + command(new ReleaseCreditCommand()). + to("participant2"). + andGiven(). + successReply(). + expectRolledBack() + ; + } +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaWithNotification.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaWithNotification.java new file mode 100644 index 0000000..83be396 --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaWithNotification.java @@ -0,0 +1,30 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import io.eventuate.tram.sagas.reactive.orchestration.ReactiveSagaDefinition; + +public class ReactiveLocalSagaWithNotification implements SimpleReactiveSaga { + + private ReactiveSagaDefinition sagaDefinition; + + public ReactiveLocalSagaWithNotification(LocalSagaSteps steps) { + this.sagaDefinition = + step() + .invokeLocal(steps::localStep1) + .withCompensation(steps::localStep1Compensation) + .step() + .invokeParticipant(LocalSagaData::do2) + .withCompensationNotification(LocalSagaData::undo2) + .step() + .invokeLocal(steps::localStep3) + .step() + .notifyParticipant(LocalSagaData::notify3) + .build(); + } + + + @Override + public ReactiveSagaDefinition getSagaDefinition() { + return this.sagaDefinition; + } + +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaWithNotificationTest.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaWithNotificationTest.java new file mode 100644 index 0000000..f3865f6 --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveLocalSagaWithNotificationTest.java @@ -0,0 +1,60 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import org.junit.Test; +import reactor.core.publisher.Mono; + +import static io.eventuate.tram.sagas.reactive.simpledsl.framework.ReactiveSagaUnitTestSupport.given; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; + +public class ReactiveLocalSagaWithNotificationTest extends AbstractReactiveLocalSagaTest { + + @Override + protected SimpleReactiveSaga makeSaga() { + return new ReactiveLocalSagaWithNotification(steps); + } + + + @Test + public void shouldExecuteAllStepsSuccessfully() { + doReturn(Mono.empty()).when(steps).localStep1(any()); + doReturn(Mono.empty()).when(steps).localStep1Compensation(any()); + doReturn(Mono.empty()).when(steps).localStep3(any()); + + given(). + saga(makeSaga(), new LocalSagaData()). + expect(). + command(new ReserveCreditCommand()). + to("participant2"). + andGiven(). + successReply(). + expect() + .notification(new NotifyCommand()) + .to("participant3") + .expectCompletedSuccessfully() + ; + + verify(steps).localStep3(any()); + } + + @Test + public void shouldHandleFailureOfLastLocalStep() { + doReturn(Mono.empty()).when(steps).localStep1(any()); + doReturn(Mono.empty()).when(steps).localStep1Compensation(any()); + doReturn(Mono.error(new RuntimeException())).when(steps).localStep3(any()); + + given(). + saga(makeSaga(), new LocalSagaData()). + expect(). + command(new ReserveCreditCommand()). + to("participant2"). + andGiven(). + successReply(). + expect(). + notification(new ReleaseCreditCommand()). + to("participant2"). + expectRolledBack() + ; + } +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReleaseCreditCommand.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReleaseCreditCommand.java new file mode 100644 index 0000000..4358bdc --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReleaseCreditCommand.java @@ -0,0 +1,6 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import io.eventuate.tram.commands.common.Command; + +public class ReleaseCreditCommand implements Command { +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReserveCreditCommand.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReserveCreditCommand.java new file mode 100644 index 0000000..dd376a7 --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/ReserveCreditCommand.java @@ -0,0 +1,6 @@ +package io.eventuate.tram.sagas.reactive.simpledsl; + +import io.eventuate.tram.commands.common.Command; + +public class ReserveCreditCommand implements Command { +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/MessageWithDestination.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/MessageWithDestination.java new file mode 100644 index 0000000..ef08d1e --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/MessageWithDestination.java @@ -0,0 +1,27 @@ +package io.eventuate.tram.sagas.reactive.simpledsl.framework; + +import io.eventuate.tram.messaging.common.Message; +import org.apache.commons.lang.builder.ToStringBuilder; + +public class MessageWithDestination { + private final String destination; + private final Message message; + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + + public MessageWithDestination(String destination, Message message) { + this.destination = destination; + this.message = message; + } + + public String getDestination() { + return destination; + } + + public Message getMessage() { + return message; + } +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/MultipleCommandsExpected.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/MultipleCommandsExpected.java new file mode 100644 index 0000000..bb7e1b3 --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/MultipleCommandsExpected.java @@ -0,0 +1,44 @@ +package io.eventuate.tram.sagas.reactive.simpledsl.framework; + +import io.eventuate.tram.commands.common.Command; +import io.eventuate.tram.commands.consumer.CommandWithDestination; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; + +import java.util.LinkedList; +import java.util.List; + +public class MultipleCommandsExpected { + private final ReactiveSagaUnitTestSupport sagaUnitTestSupport; + private boolean notification; + private Command command; + + private final List commandsAndNotifications = new LinkedList<>(); + + public MultipleCommandsExpected(ReactiveSagaUnitTestSupport sagaUnitTestSupport) { + super(); + this.sagaUnitTestSupport = sagaUnitTestSupport; + } + + + public MultipleCommandsExpected notification(Command notification) { + this.notification = true; + this.command = notification; + return this; + } + + public MultipleCommandsExpected to(String channel) { + commandsAndNotifications.add(new CommandWithDestinationAndType(new CommandWithDestination(channel, null, command), notification)); + return this; + } + + public ReactiveSagaUnitTestSupport verify() { + sagaUnitTestSupport.verifySent(commandsAndNotifications); + return sagaUnitTestSupport; + } + + public MultipleCommandsExpected command(Command command) { + this.notification = false; + this.command = command; + return this; + } +} diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/ReactiveSagaUnitTestSupport.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/ReactiveSagaUnitTestSupport.java new file mode 100644 index 0000000..9385c43 --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/reactive/simpledsl/framework/ReactiveSagaUnitTestSupport.java @@ -0,0 +1,276 @@ +package io.eventuate.tram.sagas.reactive.simpledsl.framework; + +import io.eventuate.common.json.mapper.JSonMapper; +import io.eventuate.tram.commands.common.*; +import io.eventuate.tram.consumer.common.reactive.ReactiveMessageConsumer; +import io.eventuate.tram.messaging.common.DefaultChannelMapping; +import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.messaging.common.MessageInterceptor; +import io.eventuate.tram.messaging.producer.MessageBuilder; +import io.eventuate.tram.reactive.commands.producer.ReactiveCommandProducerImpl; +import io.eventuate.tram.reactive.messaging.producer.common.ReactiveMessageProducer; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; +import io.eventuate.tram.sagas.orchestration.SagaDataSerde; +import io.eventuate.tram.sagas.orchestration.SagaInstance; +import io.eventuate.tram.sagas.reactive.common.ReactiveSagaLockManager; +import io.eventuate.tram.sagas.reactive.orchestration.ReactiveSaga; +import io.eventuate.tram.sagas.reactive.orchestration.ReactiveSagaCommandProducer; +import io.eventuate.tram.sagas.reactive.orchestration.ReactiveSagaInstanceRepository; +import io.eventuate.tram.sagas.reactive.orchestration.ReactiveSagaManagerImpl; +import org.junit.Assert; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static org.junit.Assert.assertTrue; + +/** + * Provides a DSL for writing unit tests for saga orchestrators + */ +public class ReactiveSagaUnitTestSupport { + + private ReactiveSagaManagerImpl sagaManager; + private Command expectedCommand; + private Command expectedNotification; + + private List sentCommands = new ArrayList<>(); + private MessageWithDestination sentCommand; + private Optional createException = Optional.empty(); + + + public static final String SAGA_ID = "1"; + + private int counter = 2; + private boolean expectingReply; + + private String genId() { + return Integer.toString(counter++); + } + + private volatile SagaInstance sagaInstance; + + public static ReactiveSagaUnitTestSupport given() { + return new ReactiveSagaUnitTestSupport<>(); + } + + public ReactiveSagaUnitTestSupport saga(ReactiveSaga saga, T sagaData) { + ReactiveSagaInstanceRepository sagaInstanceRepository = new ReactiveSagaInstanceRepository() { + + + @Override + public Mono save(SagaInstance sagaInstance) { + sagaInstance.setId(SAGA_ID); + ReactiveSagaUnitTestSupport.this.sagaInstance = sagaInstance; + return Mono.empty(); + } + + @Override + public Mono find(String sagaType, String sagaId) { + return Mono.just(sagaInstance); + } + + @Override + public Mono update(SagaInstance sagaInstance) { + ReactiveSagaUnitTestSupport.this.sagaInstance = sagaInstance; + return Mono.empty(); + } + + }; + + CommandNameMapping commandNameMapping = new DefaultCommandNameMapping(); + + ReactiveCommandProducerImpl commandProducer = new ReactiveCommandProducerImpl(new ReactiveMessageProducer(new MessageInterceptor[0], new DefaultChannelMapping(emptyMap()), (message) -> { + String id = genId(); + message.setHeader(Message.ID, id); + MessageWithDestination mwd = new MessageWithDestination(message.getRequiredHeader(Message.DESTINATION), message); + System.out.println("mwd=" + mwd); + sentCommands.add(mwd); + return Mono.just(message); + }), commandNameMapping); + + + ReactiveSagaCommandProducer sagaCommandProducer = new ReactiveSagaCommandProducer(commandProducer); + + ReactiveMessageConsumer messageConsumer = null; + ReactiveSagaLockManager sagaLockManager = null; + + sagaManager = new ReactiveSagaManagerImpl<>(saga, sagaInstanceRepository, commandProducer, messageConsumer, + sagaLockManager, sagaCommandProducer); + + + try { + sagaManager.create(sagaData).block(Duration.ofSeconds(1)); + System.out.println("Created saga"); + } catch (Exception e) { + createException = Optional.of(e); + } + return (ReactiveSagaUnitTestSupport) this; + } + + public ReactiveSagaUnitTestSupport expect() { + createException.ifPresent(e -> { + throw new RuntimeException("Saga creation failed: ", e); + }); + return this; + } + + public ReactiveSagaUnitTestSupport command(Command command) { + expectedCommand = command; + expectedNotification = null; + return this; + } + + public ReactiveSagaUnitTestSupport notification(Command command) { + expectedCommand = null; + expectedNotification = command; + return this; + } + + public ReactiveSagaUnitTestSupport to(String commandChannel) { + Assert.assertEquals("Expected one command", 1, sentCommands.size()); + sentCommand = sentCommands.get(0); + Assert.assertEquals(commandChannel, sentCommand.getDestination()); + Message sentMessage = sentCommand.getMessage(); + if (expectedCommand != null) { + Assert.assertEquals(expectedCommand.getClass().getName(), sentMessage.getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE)); + Assert.assertNotNull(sentMessage.getRequiredHeader(CommandMessageHeaders.REPLY_TO)); + this.expectingReply = true; + } else { + Assert.assertEquals(expectedNotification.getClass().getName(), sentMessage.getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE)); + Assert.assertNull(sentMessage.getHeader(CommandMessageHeaders.REPLY_TO).orElse(null)); + this.expectingReply = false; + } + sentCommands.clear(); + return this; + } + + void verifySent(List commandsAndNotifications) { + sentCommand = null; + for (CommandWithDestinationAndType corn : commandsAndNotifications) { + MessageWithDestination sentMessage = sentCommands.stream() + .filter(sm -> corn.getCommandWithDestination().getCommand().getClass().getName().equals(sm.getMessage().getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE)) + && corn.getCommandWithDestination().getDestinationChannel().equals(sm.getDestination())) + .findAny() + .orElseThrow(() -> new AssertionError(String.format("Did not find expected command %s in %s", corn, sentCommands))); + + if (corn.isNotification()) + Assert.assertNull(sentMessage.getMessage().getHeader(CommandMessageHeaders.REPLY_TO).orElse(null)); + else { + Assert.assertNotNull(sentMessage.getMessage().getRequiredHeader(CommandMessageHeaders.REPLY_TO)); + if (sentCommand != null) + Assert.fail(String.format("There can only be one command in %s", sentCommands)); + sentCommand = sentMessage; + } + } + if (commandsAndNotifications.size() != sentCommands.size()) + Assert.fail(String.format("Expected these commands %s but there are extra %s", commandsAndNotifications, sentCommands)); + sentCommands.clear(); + } + + public ReactiveSagaUnitTestSupport withExtraHeaders(Map expectedExtraHeaders) { + Map actualHeaders = sentCommand.getMessage().getHeaders(); + if (!actualHeaders.entrySet().containsAll(expectedExtraHeaders.entrySet())) + Assert.fail(String.format("Expected headers %s to contain %s", actualHeaders, expectedExtraHeaders)); + return this; + } + + public ReactiveSagaUnitTestSupport andGiven() { + return this; + } + + // copy + private Map correlationHeaders(Map headers) { + Map m = headers.entrySet() + .stream() + .filter(e -> e.getKey().startsWith(CommandMessageHeaders.COMMAND_HEADER_PREFIX)) + .collect(Collectors.toMap(e -> CommandMessageHeaders.inReply(e.getKey()), + Map.Entry::getValue)); + m.put(ReplyMessageHeaders.IN_REPLY_TO, headers.get(Message.ID)); + return m; + } + + + public ReactiveSagaUnitTestSupport successReply() { + Success reply = new Success(); + return successReply(reply); + } + + public ReactiveSagaUnitTestSupport successReply(Object reply) { + CommandReplyOutcome outcome = CommandReplyOutcome.SUCCESS; + sendReply(reply, outcome); + return this; + } + + public ReactiveSagaUnitTestSupport failureReply() { + Failure reply = new Failure(); + return failureReply(reply); + } + + public ReactiveSagaUnitTestSupport failureReply(Object reply) { + CommandReplyOutcome outcome = CommandReplyOutcome.FAILURE; + sendReply(reply, outcome); + return this; + } + + private void sendReply(Object reply, CommandReplyOutcome outcome) { + assertTrue("Sending reply but a command was not sent", expectingReply); + Message message = MessageBuilder + .withPayload(JSonMapper.toJson(reply)) + .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, outcome.name()) + .withHeader(ReplyMessageHeaders.REPLY_TYPE, ((Object) reply).getClass().getName()) + .withExtraHeaders("", correlationHeaders(sentCommand.getMessage().getHeaders())) + .build(); + String id = genId(); + message.getHeaders().put(Message.ID, id); + Mono.from(sagaManager.handleMessage(message)).block(Duration.ofSeconds(1)); + } + + public ReactiveSagaUnitTestSupport expectCompletedSuccessfully() { + assertNoCommands(); + assertTrue("Expected saga to have finished", sagaInstance.isEndState()); + Assert.assertFalse("Expected saga to have finished successfully", sagaInstance.isCompensating()); + return this; + } + + private void assertNoCommands() { + switch (sentCommands.size()) { + case 0: + break; + case 1: + MessageWithDestination mwd = sentCommands.get(0); + Assert.fail(String.format("Expected saga to have finished but found a command of %s sent to %s: %s", mwd.getMessage().getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE), mwd.getDestination(), mwd.getMessage())); + break; + default: + Assert.assertEquals(emptyList(), sentCommands); + } + } + + public ReactiveSagaUnitTestSupport expectRolledBack() { + assertNoCommands(); + assertTrue("Expected saga to have finished", sagaInstance.isEndState()); + assertTrue("Expected saga to have rolled back", sagaInstance.isCompensating()); + return this; + } + + public void expectException(Exception expectedCreateException) { + Assert.assertEquals(expectedCreateException, createException.get()); + } + + public ReactiveSagaUnitTestSupport assertSagaData(Consumer sagaDataConsumer) { + sagaDataConsumer.accept(SagaDataSerde.deserializeSagaData(sagaInstance.getSerializedSagaData())); + return this; + } + + public MultipleCommandsExpected multiple() { + return new MultipleCommandsExpected(this); + } + +} diff --git a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaCommandProducer.java b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaCommandProducer.java index 43045c5..f749c92 100644 --- a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaCommandProducer.java +++ b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaCommandProducer.java @@ -28,8 +28,7 @@ public Mono sendCommands(String sagaType, String sagaId, List create(Data sagaData, Optional resource) { return sagaInstanceRepository .save(sagaInstance) - .then(Mono.defer(() -> { - String sagaId = sagaInstance.getId(); - return saga.onStarting(sagaId, sagaData).thenReturn(sagaId); - })) - .flatMap(sagaId -> { - if (resource.isPresent()) { - return sagaLockManager - .claimLock(getSagaType(), sagaId, resource.get()) - .flatMap(blocked -> { - if (blocked) return Mono.empty(); - else return Mono.error(new RuntimeException("Cannot claim lock for resource")); - }); - } - return Mono.empty(); - }) + .then(Mono.defer(() -> saga.onStarting(sagaInstance.getId(), sagaData).thenReturn(sagaInstance.getId()))) + .flatMap(sagaId -> resource.map(s -> sagaLockManager + .claimLock(getSagaType(), sagaId, s) + .flatMap(blocked -> { + if (blocked) return Mono.empty(); + else return Mono.error(new RuntimeException("Cannot claim lock for resource")); + })).orElseGet(Mono::empty)) .then(Mono.defer(() -> Mono.from(getStateDefinition().start(sagaData)))) - .flatMap(actions -> { - if (actions.getLocalException().isPresent()) return Mono.error(actions.getLocalException().get()); - else return Mono.just(actions); - }) - .flatMap(actions -> processActions(getSagaType(), sagaInstance.getId(), sagaInstance, sagaData, Mono.just(actions))) - .then(Mono.fromSupplier(() -> sagaInstance)); + .flatMap(actions -> actions.getLocalException() + .map(Mono::>error) + .orElseGet(() -> processActions(getSagaType(), sagaInstance.getId(), sagaInstance, sagaData, actions))) + .then(Mono.just(sagaInstance)); } private Mono performEndStateActions(String sagaId, SagaInstance sagaInstance, boolean compensating, Data sagaData) { - List> actions = new ArrayList<>(); - - for (DestinationAndResource dr : sagaInstance.getDestinationsAndResources()) { - Map headers = new HashMap<>(); - headers.put(SagaCommandHeaders.SAGA_ID, sagaId); - headers.put(SagaCommandHeaders.SAGA_TYPE, getSagaType()); - actions.add(commandProducer.send(dr.getDestination(), dr.getResource(), new SagaUnlockCommand(), makeSagaReplyChannel(), headers)); - } - - return Flux.merge(actions).then(compensating ? saga.onSagaRolledBack(sagaId, sagaData) : saga.onSagaCompletedSuccessfully(sagaId, sagaData)); + return Flux.fromIterable(sagaInstance.getDestinationsAndResources()) + .map( dr -> { + Map headers = new HashMap<>(); + headers.put(SagaCommandHeaders.SAGA_ID, sagaId); + headers.put(SagaCommandHeaders.SAGA_TYPE, getSagaType()); + return commandProducer.send(dr.getDestination(), dr.getResource(), new SagaUnlockCommand(), makeSagaReplyChannel(), headers); + }) + .then(Mono.defer(() -> compensating ? saga.onSagaRolledBack(sagaId, sagaData) : saga.onSagaCompletedSuccessfully(sagaId, sagaData))); } private ReactiveSagaDefinition getStateDefinition() { @@ -204,51 +195,59 @@ private Data getSagaData(SagaInstance sagaInstance) { return SagaDataSerde.deserializeSagaData(sagaInstance.getSerializedSagaData()); } - private Mono> processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, Mono> actions) { - return actions.flatMap(acts -> { - if (acts.getLocalException().isPresent()) { - Mono> nextActions = Mono.from(getStateDefinition() - .handleReply( - sagaType, sagaId, acts.getUpdatedState().get(), - acts.getUpdatedSagaData().get(), - MessageBuilder - .withPayload("{}") - .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.FAILURE.name()) - .withHeader(ReplyMessageHeaders.REPLY_TYPE, Failure.class.getName()) - .build() - )); - - return processActions(sagaType, sagaId, sagaInstance, sagaData, nextActions); - } else { - Mono> nextActions = sagaCommandProducer - .sendCommands(this.getSagaType(), sagaId, acts.getCommands(), this.makeSagaReplyChannel()) - .map(lastId -> { - sagaInstance.setLastRequestId(lastId); - updateState(sagaInstance, acts); - sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(acts.getUpdatedSagaData().orElse(sagaData))); - if (acts.isEndState()) { - return performEndStateActions(sagaId, sagaInstance, acts.isCompensating(), sagaData).thenReturn(lastId); - } - return Mono.just(lastId); - }) - .then(Mono.defer(() -> sagaInstanceRepository.update(sagaInstance))) - .then(Mono.defer(() -> { - if (!acts.isLocal()) return Mono.empty(); - else return Mono.just(acts); - })) - .flatMap(newActs -> - Mono.from(getStateDefinition() - .handleReply(sagaType, sagaId, newActs.getUpdatedState().get(), - newActs.getUpdatedSagaData().get(), - MessageBuilder - .withPayload("{}") - .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name()) - .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName()) - .build()))); - - return nextActions.flatMap(na -> processActions(sagaType, sagaId, sagaInstance, sagaData, Mono.just(na))); - } - }); + private Mono> processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, Mono> actionsMono) { + return actionsMono.flatMap(actions -> processActions(sagaType, sagaId, sagaInstance, sagaData, actions)); + } + + private Mono> processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions actions) { + if (actions.getLocalException().isPresent()) { + return simulateFailedReplyMessageForFailedLocalStep(sagaType, sagaId, sagaInstance, sagaData, actions); + } else { + Mono> nextActions = sagaCommandProducer + .sendCommands(this.getSagaType(), sagaId, actions.getCommands(), this.makeSagaReplyChannel()) + .map(Optional::of) + .switchIfEmpty(Mono.just(Optional.empty())) + .map(lastId -> { + lastId.ifPresent(sagaInstance::setLastRequestId); + updateState(sagaInstance, actions); + sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(actions.getUpdatedSagaData().orElse(sagaData))); + if (actions.isEndState()) { + return performEndStateActions(sagaId, sagaInstance, actions.isCompensating(), sagaData) + .then(); + } else + return Mono.empty(); + }) + .then(Mono.defer(() -> sagaInstanceRepository.update(sagaInstance))) + .then(Mono.defer(() -> actions.isReplyExpected() ? Mono.empty() : simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions))); + return processActions(sagaType, sagaId, sagaInstance, sagaData, nextActions); + } + } + + + private Mono> simulateSuccessfulReplyToLocalActionOrNotification(String sagaType, String sagaId, SagaActions actions) { + return Mono.from(getStateDefinition() + .handleReply(sagaType, sagaId, actions.getUpdatedState().get(), + actions.getUpdatedSagaData().get(), + MessageBuilder + .withPayload("{}") + .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name()) + .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName()) + .build())); + } + + private Mono> simulateFailedReplyMessageForFailedLocalStep(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions acts) { + Mono> nextActions = Mono.from(getStateDefinition() + .handleReply( + sagaType, sagaId, acts.getUpdatedState().get(), + acts.getUpdatedSagaData().get(), + MessageBuilder + .withPayload("{}") + .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.FAILURE.name()) + .withHeader(ReplyMessageHeaders.REPLY_TYPE, Failure.class.getName()) + .build() + )); + + return processActions(sagaType, sagaId, sagaInstance, sagaData, nextActions); } private void updateState(SagaInstance sagaInstance, SagaActions actions) {