diff --git a/build.gradle b/build.gradle index 6e4c333..07370f5 100644 --- a/build.gradle +++ b/build.gradle @@ -4,11 +4,10 @@ buildscript { maven { url "https://plugins.gradle.org/m2/" } - jcenter() eventuateMavenRepoUrl.split(',').each { repoUrl -> maven { url repoUrl } } } dependencies { - classpath "com.avast.gradle:gradle-docker-compose-plugin:0.9.2" + classpath "com.avast.gradle:gradle-docker-compose-plugin:$dockerComposePluginVersion" classpath "io.eventuate.plugins.gradle:eventuate-plugins-gradle-publish:$eventuatePluginsGradleVersion" } } @@ -30,7 +29,6 @@ allprojects { repositories { mavenCentral() - jcenter() eventuateMavenRepoUrl.split(',').each { repoUrl -> maven { url repoUrl } } } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/build.gradle b/eventuate-tram-sagas-orchestration-simple-dsl/build.gradle index c6905d9..a0bb5aa 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/build.gradle +++ b/eventuate-tram-sagas-orchestration-simple-dsl/build.gradle @@ -2,5 +2,5 @@ dependencies { compile project(":eventuate-tram-sagas-orchestration") - testCompile project(":eventuate-tram-sagas-testing-support") + testCompile project(":eventuate-tram-sagas-unit-testing-support") } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/InvokeParticipantStepBuilder.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/InvokeParticipantStepBuilder.java index a13563f..3f69272 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/InvokeParticipantStepBuilder.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/InvokeParticipantStepBuilder.java @@ -28,6 +28,12 @@ InvokeParticipantStepBuilder withAction(Optional> particip return this; } + public InvokeParticipantStepBuilder withNotificationAction(Optional> participantInvocationPredicate, Function notificationAction) { + this.action = Optional.of(new ParticipantInvocationImpl<>(participantInvocationPredicate, notificationAction, true)); + return this; + } + + InvokeParticipantStepBuilder withAction(Optional> participantInvocationPredicate, CommandEndpoint commandEndpoint, Function commandProvider) { this.action = Optional.of(new ParticipantEndpointInvocationImpl<>(participantInvocationPredicate, commandEndpoint, commandProvider)); return this; @@ -39,6 +45,11 @@ public InvokeParticipantStepBuilder withCompensation(Function withCompensationNotification(Function compensation) { + this.compensation = Optional.of(new ParticipantInvocationImpl<>(Optional.empty(), compensation, true)); + return this; + } + @Override public InvokeParticipantStepBuilder withCompensation(Predicate compensationPredicate, Function compensation) { this.compensation = Optional.of(new ParticipantInvocationImpl<>(Optional.of(compensationPredicate), compensation)); diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantEndpointInvocationImpl.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantEndpointInvocationImpl.java index 2a16b51..35dc4a3 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantEndpointInvocationImpl.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantEndpointInvocationImpl.java @@ -3,8 +3,8 @@ import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.common.CommandReplyOutcome; import io.eventuate.tram.commands.common.ReplyMessageHeaders; -import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import java.util.Optional; import java.util.function.Function; @@ -28,7 +28,7 @@ public boolean isSuccessfulReply(Message message) { } @Override - public CommandWithDestination makeCommandToSend(Data data) { - return new CommandWithDestination(commandEndpoint.getCommandChannel(), null, commandProvider.apply(data)); + public CommandWithDestinationAndType makeCommandToSend(Data data) { + return CommandWithDestinationAndType.command(commandEndpoint.getCommandChannel(), null, commandProvider.apply(data)); // TODO notifications } } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocation.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocation.java index 033eee3..ede9bf7 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocation.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocation.java @@ -1,7 +1,7 @@ package io.eventuate.tram.sagas.simpledsl; -import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; public interface ParticipantInvocation { @@ -10,5 +10,5 @@ public interface ParticipantInvocation { boolean isInvocable(Data data); - CommandWithDestination makeCommandToSend(Data data); + CommandWithDestinationAndType makeCommandToSend(Data data); } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocationImpl.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocationImpl.java index 2e48bb3..6f15a30 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocationImpl.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocationImpl.java @@ -5,27 +5,34 @@ import io.eventuate.tram.commands.common.ReplyMessageHeaders; import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import java.util.Optional; import java.util.function.Function; import java.util.function.Predicate; public class ParticipantInvocationImpl extends AbstractParticipantInvocation { - private Function commandBuilder; + private final boolean notification; + private final Function commandBuilder; public ParticipantInvocationImpl(Optional> invocablePredicate, Function commandBuilder) { + this(invocablePredicate, commandBuilder, false); + } + + public ParticipantInvocationImpl(Optional> invocablePredicate, Function commandBuilder, boolean notification) { super(invocablePredicate); this.commandBuilder = commandBuilder; + this.notification = notification; } - @Override + @Override public boolean isSuccessfulReply(Message message) { return CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME)); } @Override - public CommandWithDestination makeCommandToSend(Data data) { - return commandBuilder.apply(data); + public CommandWithDestinationAndType makeCommandToSend(Data data) { + return new CommandWithDestinationAndType(commandBuilder.apply(data), notification); } } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocationStep.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocationStep.java index c4ded33..f5d5dd3 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocationStep.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ParticipantInvocationStep.java @@ -3,7 +3,9 @@ import io.eventuate.tram.commands.common.ReplyMessageHeaders; import io.eventuate.tram.messaging.common.Message; -import java.util.*; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; public class ParticipantInvocationStep implements SagaStep { @@ -47,7 +49,7 @@ public boolean isSuccessfulReply(boolean compensating, Message message) { @Override public StepOutcome makeStepOutcome(Data data, boolean compensating) { return StepOutcome.makeRemoteStepOutcome(getParticipantInvocation(compensating) - .map(x -> x.makeCommandToSend(data)) + .map(pi -> pi.makeCommandToSend(data)) .map(Collections::singletonList) .orElseGet(Collections::emptyList)); } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepBuilder.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepBuilder.java index 2382896..62e9773 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepBuilder.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepBuilder.java @@ -57,4 +57,8 @@ public InvokeParticipantStepBuilder withCompensation(C public InvokeParticipantStepBuilder withCompensation(Predicate compensationPredicate, CommandEndpoint commandEndpoint, Function commandProvider) { return new InvokeParticipantStepBuilder<>(parent).withCompensation(compensationPredicate, commandEndpoint, commandProvider); } + + public InvokeParticipantStepBuilder notifyParticipant(Function notificationAction) { + return new InvokeParticipantStepBuilder<>(parent).withNotificationAction(Optional.empty(), notificationAction); + } } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepOutcome.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepOutcome.java index c6ef1bf..feb8bd4 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepOutcome.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepOutcome.java @@ -1,6 +1,6 @@ package io.eventuate.tram.sagas.simpledsl; -import io.eventuate.tram.commands.consumer.CommandWithDestination; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import java.util.List; import java.util.Optional; @@ -8,7 +8,7 @@ public abstract class StepOutcome { - public abstract void visit(Consumer> localConsumer, Consumer> commandsConsumer); + public abstract void visit(Consumer> localConsumer, Consumer> commandsConsumer); static class LocalStepOutcome extends StepOutcome { private Optional localOutcome; @@ -18,20 +18,20 @@ public LocalStepOutcome(Optional localOutcome) { } @Override - public void visit(Consumer> localConsumer, Consumer> commandsConsumer) { + public void visit(Consumer> localConsumer, Consumer> commandsConsumer) { localConsumer.accept(localOutcome); } } static class RemoteStepOutcome extends StepOutcome { - private List commandsToSend; + private List commandsToSend; - public RemoteStepOutcome(List commandsToSend) { + public RemoteStepOutcome(List commandsToSend) { this.commandsToSend = commandsToSend; } @Override - public void visit(Consumer> localConsumer, Consumer> commandsConsumer) { + public void visit(Consumer> localConsumer, Consumer> commandsConsumer) { commandsConsumer.accept(commandsToSend); } } @@ -39,7 +39,7 @@ public void visit(Consumer> localConsumer, Consumer localOutcome) { return new LocalStepOutcome(localOutcome); } - public static StepOutcome makeRemoteStepOutcome(List commandsToSend) { + public static StepOutcome makeRemoteStepOutcome(List commandsToSend) { return new RemoteStepOutcome(commandsToSend); } } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ConditionalSagaData.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ConditionalSagaData.java index d38f9fd..2b1115f 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ConditionalSagaData.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ConditionalSagaData.java @@ -36,6 +36,6 @@ public CommandWithDestination undo1() { } public CommandWithDestination do2() { - return new CommandWithDestination("participant2", null, new Do2Command()); + return new CommandWithDestination("participant2", null, new ReserveCreditCommand()); } } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ConditionalSagaTest.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ConditionalSagaTest.java index 627df1b..da76906 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ConditionalSagaTest.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ConditionalSagaTest.java @@ -16,7 +16,7 @@ public void shouldExecuteAllStepsSuccessfully() { andGiven(). successReply(). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). successReply(). @@ -34,7 +34,7 @@ public void shouldRollback() { andGiven(). successReply(). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). failureReply(). @@ -52,7 +52,7 @@ public void shouldExecuteAllStepsExcept1Successfully() { given(). saga(new ConditionalSaga(), new ConditionalSagaData(false)). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). successReply(). @@ -65,7 +65,7 @@ public void shouldRollbackExcept1() { given(). saga(new ConditionalSaga(), new ConditionalSagaData(false)). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). failureReply(). diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaData.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaData.java index 8be880e..64e1201 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaData.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaData.java @@ -5,11 +5,11 @@ public class LocalSagaData { public CommandWithDestination do2() { - return new CommandWithDestination("participant2", null, new Do2Command()); + return new CommandWithDestination("participant2", null, new ReserveCreditCommand()); } public CommandWithDestination undo2() { - return new CommandWithDestination("participant2", null, new Undo2Command()); + return new CommandWithDestination("participant2", null, new ReleaseCreditCommand()); } @Override diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaTest.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/NotificationBasedCreateOrderSagaTest.java similarity index 88% rename from eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaTest.java rename to eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/NotificationBasedCreateOrderSagaTest.java index 80e92fb..007d782 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/LocalSagaTest.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/NotificationBasedCreateOrderSagaTest.java @@ -7,7 +7,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -public class LocalSagaTest { +public class NotificationBasedCreateOrderSagaTest { private LocalSagaSteps steps; @@ -21,7 +21,7 @@ public void shouldExecuteAllStepsSuccessfully() { given(). saga(new LocalSaga(steps), new LocalSagaData()). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). successReply(). @@ -34,7 +34,7 @@ public void shouldRollbackFromStep2() { given(). saga(new LocalSaga(steps), new LocalSagaData()). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). failureReply(). @@ -60,12 +60,12 @@ public void shouldHandleFailureOfLastLocalStep() { given(). saga(new LocalSaga(steps), data). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). successReply(). expect(). - command(new Undo2Command()). + command(new ReleaseCreditCommand()). to("participant2"). andGiven(). successReply(). diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/Do2Command.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ReleaseCreditCommand.java similarity index 63% rename from eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/Do2Command.java rename to eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ReleaseCreditCommand.java index 50ffb14..22ef141 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/Do2Command.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ReleaseCreditCommand.java @@ -2,5 +2,5 @@ import io.eventuate.tram.commands.common.Command; -public class Do2Command implements Command { +public class ReleaseCreditCommand implements Command { } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/Undo2Command.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ReserveCreditCommand.java similarity index 63% rename from eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/Undo2Command.java rename to eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ReserveCreditCommand.java index 3654b54..9ce4010 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/Undo2Command.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/ReserveCreditCommand.java @@ -2,5 +2,5 @@ import io.eventuate.tram.commands.common.Command; -public class Undo2Command implements Command { +public class ReserveCreditCommand implements Command { } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/WithHandlersSagaTest.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/WithHandlersSagaTest.java index ea16ebd..0f1321b 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/WithHandlersSagaTest.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/WithHandlersSagaTest.java @@ -35,7 +35,7 @@ public void shouldExecuteAllStepsSuccessfully() { andGiven(). successReply(). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). successReply(). @@ -61,7 +61,7 @@ public void shouldRollback() { andGiven(). successReply(). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). failureReply(). @@ -88,7 +88,7 @@ public void shouldExecuteAllStepsExcept1Successfully() { given(). saga(new WithHandlersSaga(handlers), new ConditionalSagaData(false)). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). successReply(). @@ -108,7 +108,7 @@ public void shouldRollbackExcept1() { given(). saga(new WithHandlersSaga(handlers), new ConditionalSagaData(false)). expect(). - command(new Do2Command()). + command(new ReserveCreditCommand()). to("participant2"). andGiven(). failureReply(). diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/FulfillOrder.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/FulfillOrder.java new file mode 100644 index 0000000..d827063 --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/FulfillOrder.java @@ -0,0 +1,6 @@ +package io.eventuate.tram.sagas.simpledsl.notifications; + +import io.eventuate.tram.commands.common.Command; + +public class FulfillOrder implements Command { +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSaga.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSaga.java new file mode 100644 index 0000000..023f6db --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSaga.java @@ -0,0 +1,34 @@ +package io.eventuate.tram.sagas.simpledsl.notifications; + +import io.eventuate.tram.sagas.orchestration.SagaDefinition; +import io.eventuate.tram.sagas.simpledsl.SimpleSaga; + +public class NotificationBasedCreateOrderSaga implements SimpleSaga { + + private final SagaDefinition sagaDefinition; + + public NotificationBasedCreateOrderSaga(NotificationBasedCreateOrderSagaSteps steps) { + this.sagaDefinition = + step() + .invokeLocal(steps::createOrder) + .withCompensation(steps::rejectOrder) + .step() + .invokeParticipant(NotificationBasedCreateOrderSagaData::reserveCredit) + .withCompensation(NotificationBasedCreateOrderSagaData::releaseCredit) + .step() + .invokeParticipant(NotificationBasedCreateOrderSagaData::reserveInventory) + .withCompensationNotification(NotificationBasedCreateOrderSagaData::releaseInventory) + .step() + .invokeLocal(steps::approveOrder) + .step() + .notifyParticipant(NotificationBasedCreateOrderSagaData::fulfillOrder) + .build(); + } + + + @Override + public SagaDefinition getSagaDefinition() { + return this.sagaDefinition; + } + +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaData.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaData.java new file mode 100644 index 0000000..93aaff7 --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaData.java @@ -0,0 +1,28 @@ +package io.eventuate.tram.sagas.simpledsl.notifications; + +import io.eventuate.tram.commands.consumer.CommandWithDestination; +import io.eventuate.tram.sagas.simpledsl.ReleaseCreditCommand; +import io.eventuate.tram.sagas.simpledsl.ReserveCreditCommand; + +public class NotificationBasedCreateOrderSagaData { + + public CommandWithDestination reserveCredit() { + return new CommandWithDestination("customerService", null, new ReserveCreditCommand()); + } + + public CommandWithDestination releaseCredit() { + return new CommandWithDestination("customerService", null, new ReleaseCreditCommand()); + } + + public CommandWithDestination reserveInventory() { + return new CommandWithDestination("inventoryService", null, new ReserveInventory()); + } + + public CommandWithDestination releaseInventory() { + return new CommandWithDestination("inventoryService", null, new ReleaseInventory()); + } + + public CommandWithDestination fulfillOrder() { + return new CommandWithDestination("fulfillmentService", null, new FulfillOrder()); + } +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaSteps.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaSteps.java new file mode 100644 index 0000000..f9b5f3b --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaSteps.java @@ -0,0 +1,9 @@ +package io.eventuate.tram.sagas.simpledsl.notifications; + +public interface NotificationBasedCreateOrderSagaSteps { + + void createOrder(NotificationBasedCreateOrderSagaData data); + void rejectOrder(NotificationBasedCreateOrderSagaData data); + void approveOrder(NotificationBasedCreateOrderSagaData data); + +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaTest.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaTest.java new file mode 100644 index 0000000..62d3d21 --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/NotificationBasedCreateOrderSagaTest.java @@ -0,0 +1,96 @@ +package io.eventuate.tram.sagas.simpledsl.notifications; + +import io.eventuate.tram.sagas.simpledsl.ReleaseCreditCommand; +import io.eventuate.tram.sagas.simpledsl.ReserveCreditCommand; +import org.junit.Before; +import org.junit.Test; + +import static io.eventuate.tram.sagas.testing.SagaUnitTestSupport.given; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class NotificationBasedCreateOrderSagaTest { + + private NotificationBasedCreateOrderSagaSteps steps; + + @Before + public void setUp() throws Exception { + steps = mock(NotificationBasedCreateOrderSagaSteps.class); + } + + @Test + public void shouldExecuteAllStepsSuccessfully() { + given(). + saga(new NotificationBasedCreateOrderSaga(steps), new NotificationBasedCreateOrderSagaData()). + expect(). + command(new ReserveCreditCommand()). + to("customerService"). + andGiven(). + successReply(). + expect(). + command(new ReserveInventory()). + to("inventoryService"). + andGiven(). + successReply(). + expect(). + notification(new FulfillOrder()). + to("fulfillmentService"). + expectCompletedSuccessfully() + ; + } + + @Test + public void shouldRollbackFromStep2() { + given(). + saga(new NotificationBasedCreateOrderSaga(steps), new NotificationBasedCreateOrderSagaData()). + expect(). + command(new ReserveCreditCommand()). + to("customerService"). + andGiven(). + failureReply(). + andGiven(). + expectRolledBack() + ; + } + + @Test + public void shouldHandleFailureOfFirstLocalStep() { + NotificationBasedCreateOrderSagaData data = new NotificationBasedCreateOrderSagaData(); + RuntimeException expectedCreateException = new RuntimeException("Failed local step"); + doThrow(expectedCreateException).when(steps).createOrder(data); + given(). + saga(new NotificationBasedCreateOrderSaga(steps), data). + expectException(expectedCreateException) + ; + } + @Test + public void shouldHandleFailureOfLastLocalStep() { + doThrow(new RuntimeException()).when(steps).approveOrder(any()); + given(). + saga(new NotificationBasedCreateOrderSaga(steps), new NotificationBasedCreateOrderSagaData()). + expect(). + command(new ReserveCreditCommand()). + to("customerService"). + andGiven(). + successReply(). + expect(). + command(new ReserveInventory()). + to("inventoryService"). + andGiven(). + successReply(). + expect(). + multiple(). + notification(new ReleaseInventory()). + to("inventoryService"). + command(new ReleaseCreditCommand()). + to("customerService"). + verify(). + andGiven(). + successReply(). + expectRolledBack() + ; + } + + +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/ReleaseInventory.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/ReleaseInventory.java new file mode 100644 index 0000000..164e882 --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/ReleaseInventory.java @@ -0,0 +1,6 @@ +package io.eventuate.tram.sagas.simpledsl.notifications; + +import io.eventuate.tram.commands.common.Command; + +public class ReleaseInventory implements Command { +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/ReserveInventory.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/ReserveInventory.java new file mode 100644 index 0000000..c77ebd2 --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/test/java/io/eventuate/tram/sagas/simpledsl/notifications/ReserveInventory.java @@ -0,0 +1,6 @@ +package io.eventuate.tram.sagas.simpledsl.notifications; + +import io.eventuate.tram.commands.common.Command; + +public class ReserveInventory implements Command { +} diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/CommandWithDestinationAndType.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/CommandWithDestinationAndType.java new file mode 100644 index 0000000..2d6dfcf --- /dev/null +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/CommandWithDestinationAndType.java @@ -0,0 +1,44 @@ +package io.eventuate.tram.sagas.orchestration; + +import io.eventuate.tram.commands.common.Command; +import io.eventuate.tram.commands.consumer.CommandWithDestination; +import org.apache.commons.lang.builder.ToStringBuilder; + +public class CommandWithDestinationAndType { + + private final CommandWithDestination commandWithDestination; + private final boolean notification; + + public static CommandWithDestinationAndType command(String channel, String resource, Command command) { + return command(new CommandWithDestination(channel, resource, command)); + } + + public static CommandWithDestinationAndType command(CommandWithDestination command) { + return new CommandWithDestinationAndType(command, false); + } + + public static CommandWithDestinationAndType notification(String channel, String resource, Command command) { + return notification(new CommandWithDestination(channel, resource, command)); + } + + public static CommandWithDestinationAndType notification(CommandWithDestination command) { + return new CommandWithDestinationAndType(command, true); + } + + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + + public CommandWithDestinationAndType(CommandWithDestination commandWithDestination, boolean notification) { + this.commandWithDestination = commandWithDestination; + this.notification = notification; + } + + public CommandWithDestination getCommandWithDestination() { + return commandWithDestination; + } + + public boolean isNotification() { + return notification; + } +} diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java index a94f54d..71da176 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java @@ -1,7 +1,5 @@ package io.eventuate.tram.sagas.orchestration; -import io.eventuate.tram.commands.consumer.CommandWithDestination; - import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -9,7 +7,7 @@ public class SagaActions { - private final List commands; + private final List commands; private final Optional updatedSagaData; private final Optional updatedState; private boolean endState; @@ -18,7 +16,7 @@ public class SagaActions { private Optional localException; private boolean failed; - public SagaActions(List commands, + public SagaActions(List commands, Optional updatedSagaData, Optional updatedState, boolean endState, boolean compensating, boolean failed, boolean local, Optional localException) { this.commands = commands; @@ -31,7 +29,7 @@ public SagaActions(List commands, this.failed = failed; } - public List getCommands() { + public List getCommands() { return commands; } @@ -55,6 +53,11 @@ public boolean isLocal() { return local; } + public boolean isAllNotifications() { + return !commands.isEmpty() && commands.stream().allMatch(CommandWithDestinationAndType::isNotification); + } + + public boolean isFailed() { return failed; } @@ -64,7 +67,7 @@ public Optional getLocalException() { } public static class Builder { - private List commands = new ArrayList<>(); + private List commands = new ArrayList(); private Optional updatedSagaData = Optional.empty(); private Optional updatedState = Optional.empty(); private boolean endState; @@ -80,7 +83,7 @@ public SagaActions build() { return new SagaActions<>(commands, updatedSagaData, updatedState, endState, compensating, failed, local, localException); } - public Builder withCommand(CommandWithDestination command) { + public Builder withCommand(CommandWithDestinationAndType command) { commands.add(command); return this; } @@ -95,7 +98,7 @@ public Builder withUpdatedState(String state) { return this; } - public Builder withCommands(List commands) { + public Builder withCommands(List commands) { this.commands.addAll(commands); return this; } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaCommandProducer.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaCommandProducer.java index 1c21100..cd60918 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaCommandProducer.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaCommandProducer.java @@ -16,13 +16,17 @@ public SagaCommandProducer(CommandProducer commandProducer) { this.commandProducer = commandProducer; } - public String sendCommands(String sagaType, String sagaId, List commands, String sagaReplyChannel) { + public String sendCommands(String sagaType, String sagaId, List commands, String sagaReplyChannel) { String messageId = null; - for (CommandWithDestination command : commands) { + for (CommandWithDestinationAndType cwdt : commands) { + CommandWithDestination command = cwdt.getCommandWithDestination(); Map headers = new HashMap<>(command.getExtraHeaders()); headers.put(SagaCommandHeaders.SAGA_TYPE, sagaType); headers.put(SagaCommandHeaders.SAGA_ID, sagaId); - messageId = commandProducer.send(command.getDestinationChannel(), command.getResource(), command.getCommand(), sagaReplyChannel, headers); + if (cwdt.isNotification()) + messageId = commandProducer.sendNotification(command.getDestinationChannel(), command.getCommand(), headers); + else + messageId = commandProducer.send(command.getDestinationChannel(), command.getResource(), command.getCommand(), sagaReplyChannel, headers); } return messageId; diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java index e962992..b6aa584 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java @@ -223,18 +223,24 @@ private void processActions(String sagaType, String sagaId, SagaInstance sagaIns sagaInstanceRepository.update(sagaInstance); - if (!actions.isLocal()) + if (actions.isAllNotifications() || actions.isLocal()) { + actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions); + } else { break; + } - actions = getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder - .withPayload("{}") - .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name()) - .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName()) - .build()); } } } + private SagaActions simulateSuccessfulReplyToLocalActionOrNotification(String sagaType, String sagaId, SagaActions actions) { + return getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder + .withPayload("{}") + .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name()) + .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName()) + .build()); + } + private void updateState(SagaInstance sagaInstance, SagaActions actions) { actions.getUpdatedState().ifPresent(stateName -> { sagaInstance.setStateName(stateName); diff --git a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaManagerImplTest.java b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaManagerImplTest.java index 1c15605..267ce04 100644 --- a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaManagerImplTest.java +++ b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaManagerImplTest.java @@ -3,7 +3,6 @@ import io.eventuate.common.id.Int128; import io.eventuate.tram.commands.common.CommandReplyOutcome; import io.eventuate.tram.commands.common.ReplyMessageHeaders; -import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.commands.producer.CommandProducer; import io.eventuate.tram.messaging.common.ChannelMapping; import io.eventuate.tram.messaging.common.Message; @@ -73,10 +72,10 @@ public class SagaManagerImplTest { private TestCommand command1 = new TestCommand(); private TestCommand command2 = new TestCommand(); - private CommandWithDestination commandForParticipant1 = new CommandWithDestination(participantChannel1, testResource, + private CommandWithDestinationAndType commandForParticipant1 = CommandWithDestinationAndType.command(participantChannel1, testResource, SagaManagerImplTest.this.command1); - private CommandWithDestination commandForParticipant2 = new CommandWithDestination(participantChannel2, testResource, + private CommandWithDestinationAndType commandForParticipant2 = CommandWithDestinationAndType.command(participantChannel2, testResource, SagaManagerImplTest.this.command2); private SagaInstance sagaInstance; diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantEndpointInvocationImpl.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantEndpointInvocationImpl.java index 9de8167..91758be 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantEndpointInvocationImpl.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantEndpointInvocationImpl.java @@ -3,8 +3,8 @@ import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.common.CommandReplyOutcome; import io.eventuate.tram.commands.common.ReplyMessageHeaders; -import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import io.eventuate.tram.sagas.simpledsl.CommandEndpoint; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; @@ -33,9 +33,9 @@ public boolean isSuccessfulReply(Message message) { } @Override - public Publisher makeCommandToSend(Data data) { + public Publisher makeCommandToSend(Data data) { return Mono .from(commandProvider.apply(data)) - .map(cmd -> new CommandWithDestination(commandEndpoint.getCommandChannel(), null, cmd)); + .map(cmd -> CommandWithDestinationAndType.command(commandEndpoint.getCommandChannel(), null, cmd)); // TODO notifications } } diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocation.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocation.java index ed41910..b89e3e4 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocation.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocation.java @@ -1,7 +1,7 @@ package io.eventuate.tram.sagas.reactive.simpledsl; -import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import org.reactivestreams.Publisher; @@ -11,5 +11,5 @@ public interface ReactiveParticipantInvocation { boolean isInvocable(Data data); - Publisher makeCommandToSend(Data data); + Publisher makeCommandToSend(Data data); } diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocationImpl.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocationImpl.java index b242489..f04457d 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocationImpl.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocationImpl.java @@ -5,7 +5,9 @@ import io.eventuate.tram.commands.common.ReplyMessageHeaders; import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import java.util.Optional; import java.util.function.Function; @@ -13,12 +15,20 @@ public class ReactiveParticipantInvocationImpl extends AbstractReactiveParticipantInvocation { private Function> commandBuilder; + private final boolean notification; public ReactiveParticipantInvocationImpl(Optional> invocablePredicate, Function> commandBuilder) { super(invocablePredicate); this.commandBuilder = commandBuilder; + this.notification = false; + } + + public ReactiveParticipantInvocationImpl(Optional> invocablePredicate, Function> commandBuilder, boolean notification) { + super(invocablePredicate); + this.commandBuilder = commandBuilder; + this.notification = notification; } @Override @@ -27,7 +37,7 @@ public boolean isSuccessfulReply(Message message) { } @Override - public Publisher makeCommandToSend(Data data) { - return commandBuilder.apply(data); + public Publisher makeCommandToSend(Data data) { + return Mono.from(commandBuilder.apply(data)).map(cmd -> new CommandWithDestinationAndType(cmd, notification)); } } diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocationStep.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocationStep.java index a61d6c2..4ac4126 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocationStep.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveParticipantInvocationStep.java @@ -1,8 +1,8 @@ package io.eventuate.tram.sagas.reactive.simpledsl; import io.eventuate.tram.commands.common.ReplyMessageHeaders; -import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import io.eventuate.tram.sagas.simpledsl.StepOutcome; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; @@ -52,8 +52,8 @@ public boolean isSuccessfulReply(boolean compensating, Message message) { @Override public Publisher makeStepOutcome(Data data, boolean compensating) { - Publisher commandWithDestination = - getParticipantInvocation(compensating).map(x -> x.makeCommandToSend(data)).orElse(Mono.empty()); + Publisher commandWithDestination = + getParticipantInvocation(compensating).map(pi -> pi.makeCommandToSend(data)).orElse(Mono.empty()); return Mono.from(commandWithDestination).map(cmd -> StepOutcome.makeRemoteStepOutcome(Collections.singletonList(cmd))); } } diff --git a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaCommandProducer.java b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaCommandProducer.java index 719971b..43045c5 100644 --- a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaCommandProducer.java +++ b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaCommandProducer.java @@ -3,6 +3,7 @@ import io.eventuate.tram.commands.consumer.CommandWithDestination; import io.eventuate.tram.reactive.commands.producer.ReactiveCommandProducer; import io.eventuate.tram.sagas.common.SagaCommandHeaders; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -18,14 +19,19 @@ public ReactiveSagaCommandProducer(ReactiveCommandProducer commandProducer) { this.commandProducer = commandProducer; } - public Mono sendCommands(String sagaType, String sagaId, List commands, String sagaReplyChannel) { + public Mono sendCommands(String sagaType, String sagaId, List commands, String sagaReplyChannel) { return Flux .fromIterable(commands) - .flatMap(command -> { + .flatMap(cwdt -> { + CommandWithDestination command = cwdt.getCommandWithDestination(); Map headers = new HashMap<>(command.getExtraHeaders()); headers.put(SagaCommandHeaders.SAGA_TYPE, sagaType); headers.put(SagaCommandHeaders.SAGA_ID, sagaId); - return commandProducer.send(command.getDestinationChannel(), command.getResource(), command.getCommand(), sagaReplyChannel, headers); + if (cwdt.isNotification()) + // return commandProducer.sendNotification(command.getDestinationChannel(), command.getCommand(), headers); + return Mono.error(new UnsupportedOperationException("Reactive notifications not yet implemented")); // TODO notifications - implement me + else + return commandProducer.send(command.getDestinationChannel(), command.getResource(), command.getCommand(), sagaReplyChannel, headers); }) .next(); } diff --git a/eventuate-tram-sagas-spring-testing-support-cloud-contract/src/main/java/io/eventuate/tram/sagas/spring/testing/contract/SagaMessagingTestHelper.java b/eventuate-tram-sagas-spring-testing-support-cloud-contract/src/main/java/io/eventuate/tram/sagas/spring/testing/contract/SagaMessagingTestHelper.java index c8e2fd9..d429ca6 100644 --- a/eventuate-tram-sagas-spring-testing-support-cloud-contract/src/main/java/io/eventuate/tram/sagas/spring/testing/contract/SagaMessagingTestHelper.java +++ b/eventuate-tram-sagas-spring-testing-support-cloud-contract/src/main/java/io/eventuate/tram/sagas/spring/testing/contract/SagaMessagingTestHelper.java @@ -3,7 +3,7 @@ import io.eventuate.common.id.IdGenerator; import io.eventuate.common.json.mapper.JSonMapper; import io.eventuate.tram.commands.common.Command; -import io.eventuate.tram.commands.consumer.CommandWithDestination; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; import io.eventuate.tram.sagas.orchestration.SagaCommandProducer; import io.eventuate.tram.sagas.simpledsl.CommandEndpoint; import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierMessage; @@ -31,7 +31,7 @@ public R sendAndReceiveCommand(CommandEndpoint command String sagaId = idGenerator.genId(null).asString(); String replyTo = sagaType + "-reply"; - sagaCommandProducer.sendCommands(sagaType, sagaId, Collections.singletonList(new CommandWithDestination(commandEndpoint.getCommandChannel(), (String)null, (Command)command)), replyTo); + sagaCommandProducer.sendCommands(sagaType, sagaId, Collections.singletonList(CommandWithDestinationAndType.command(commandEndpoint.getCommandChannel(), (String)null, (Command)command)), replyTo); ContractVerifierMessage response = contractVerifierMessaging.receive(replyTo); diff --git a/eventuate-tram-sagas-testing-support/build.gradle b/eventuate-tram-sagas-testing-support/build.gradle index f4d94b8..a35365c 100644 --- a/eventuate-tram-sagas-testing-support/build.gradle +++ b/eventuate-tram-sagas-testing-support/build.gradle @@ -1,6 +1,6 @@ dependencies { - compile project(":eventuate-tram-sagas-orchestration") + compile project(":eventuate-tram-sagas-unit-testing-support") compile "io.eventuate.tram.core:eventuate-tram-testing-support:$eventuateTramVersion" } diff --git a/eventuate-tram-sagas-unit-testing-support/build.gradle b/eventuate-tram-sagas-unit-testing-support/build.gradle new file mode 100644 index 0000000..f195efa --- /dev/null +++ b/eventuate-tram-sagas-unit-testing-support/build.gradle @@ -0,0 +1,4 @@ +dependencies { + compile project(":eventuate-tram-sagas-orchestration") + compile "junit:junit:4.12" +} \ No newline at end of file diff --git a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/MessageWithDestination.java b/eventuate-tram-sagas-unit-testing-support/src/main/java/io/eventuate/tram/sagas/testing/MessageWithDestination.java similarity index 100% rename from eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/MessageWithDestination.java rename to eventuate-tram-sagas-unit-testing-support/src/main/java/io/eventuate/tram/sagas/testing/MessageWithDestination.java diff --git a/eventuate-tram-sagas-unit-testing-support/src/main/java/io/eventuate/tram/sagas/testing/MultipleCommandsExpected.java b/eventuate-tram-sagas-unit-testing-support/src/main/java/io/eventuate/tram/sagas/testing/MultipleCommandsExpected.java new file mode 100644 index 0000000..4ddf3a0 --- /dev/null +++ b/eventuate-tram-sagas-unit-testing-support/src/main/java/io/eventuate/tram/sagas/testing/MultipleCommandsExpected.java @@ -0,0 +1,44 @@ +package io.eventuate.tram.sagas.testing; + +import io.eventuate.tram.commands.common.Command; +import io.eventuate.tram.commands.consumer.CommandWithDestination; +import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType; + +import java.util.LinkedList; +import java.util.List; + +public class MultipleCommandsExpected { + private final SagaUnitTestSupport sagaUnitTestSupport; + private boolean notification; + private Command command; + + private final List commandsAndNotifications = new LinkedList<>(); + + public MultipleCommandsExpected(SagaUnitTestSupport sagaUnitTestSupport) { + super(); + this.sagaUnitTestSupport = sagaUnitTestSupport; + } + + + public MultipleCommandsExpected notification(Command notification) { + this.notification = true; + this.command = notification; + return this; + } + + public MultipleCommandsExpected to(String channel) { + commandsAndNotifications.add(new CommandWithDestinationAndType(new CommandWithDestination(channel, null, command), notification)); + return this; + } + + public SagaUnitTestSupport verify() { + sagaUnitTestSupport.verifySent(commandsAndNotifications); + return sagaUnitTestSupport; + } + + public MultipleCommandsExpected command(Command command) { + this.notification = false; + this.command = command; + return this; + } +} diff --git a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaUnitTestSupport.java b/eventuate-tram-sagas-unit-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaUnitTestSupport.java similarity index 66% rename from eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaUnitTestSupport.java rename to eventuate-tram-sagas-unit-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaUnitTestSupport.java index e811fb0..7b1fe81 100644 --- a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaUnitTestSupport.java +++ b/eventuate-tram-sagas-unit-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaUnitTestSupport.java @@ -6,16 +6,18 @@ import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.messaging.consumer.MessageConsumer; import io.eventuate.tram.messaging.producer.MessageBuilder; -import io.eventuate.tram.sagas.orchestration.*; import io.eventuate.tram.sagas.common.SagaLockManager; +import io.eventuate.tram.sagas.orchestration.*; import org.junit.Assert; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static org.junit.Assert.*; /** * Provides a DSL for writing unit tests for saga orchestrators @@ -24,6 +26,7 @@ public class SagaUnitTestSupport { private SagaManagerImpl sagaManager; private Command expectedCommand; + private Command expectedNotification; private List sentCommands = new ArrayList<>(); private MessageWithDestination sentCommand; @@ -100,23 +103,60 @@ public SagaUnitTestSupport expect() { public SagaUnitTestSupport command(Command command) { expectedCommand = command; + expectedNotification = null; + return this; + } + + public SagaUnitTestSupport notification(Command command) { + expectedCommand = null; + expectedNotification = command; return this; } public SagaUnitTestSupport to(String commandChannel) { - assertEquals("Expected one command", 1, sentCommands.size()); + Assert.assertEquals("Expected one command", 1, sentCommands.size()); sentCommand = sentCommands.get(0); - assertEquals(commandChannel, sentCommand.getDestination()); - assertEquals(expectedCommand.getClass().getName(), sentCommand.getMessage().getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE)); - // TODO + Assert.assertEquals(commandChannel, sentCommand.getDestination()); + Message sentMessage = sentCommand.getMessage(); + if (expectedCommand != null) { + Assert.assertEquals(expectedCommand.getClass().getName(), sentMessage.getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE)); + Assert.assertNotNull(sentMessage.getRequiredHeader(CommandMessageHeaders.REPLY_TO)); + } else { + Assert.assertEquals(expectedNotification.getClass().getName(), sentMessage.getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE)); + Assert.assertNull(sentMessage.getHeader(CommandMessageHeaders.REPLY_TO).orElse(null)); + + } sentCommands.clear(); return this; } + void verifySent(List commandsAndNotifications) { + sentCommand = null; + for (CommandWithDestinationAndType corn : commandsAndNotifications) { + MessageWithDestination sentMessage = sentCommands.stream() + .filter(sm -> corn.getCommandWithDestination().getCommand().getClass().getName().equals(sm.getMessage().getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE)) + && corn.getCommandWithDestination().getDestinationChannel().equals(sm.getDestination())) + .findAny() + .orElseThrow(() -> new AssertionError(String.format("Did not find expected command %s in %s", corn, sentCommands))); + + if (corn.isNotification()) + Assert.assertNull(sentMessage.getMessage().getHeader(CommandMessageHeaders.REPLY_TO).orElse(null)); + else { + Assert.assertNotNull(sentMessage.getMessage().getRequiredHeader(CommandMessageHeaders.REPLY_TO)); + if (sentCommand != null) + Assert.fail(String.format("There can only be one command in %s", sentCommands)); + sentCommand = sentMessage; + } + } + if (commandsAndNotifications.size() != sentCommands.size()) + Assert.fail(String.format("Expected these commands %s but there are extra %s", commandsAndNotifications, sentCommands)); + sentCommands.clear(); + } + public SagaUnitTestSupport withExtraHeaders(Map expectedExtraHeaders) { Map actualHeaders = sentCommand.getMessage().getHeaders(); if (!actualHeaders.entrySet().containsAll(expectedExtraHeaders.entrySet())) - fail(String.format("Expected headers %s to contain %s", actualHeaders, expectedExtraHeaders)); + Assert.fail(String.format("Expected headers %s to contain %s", actualHeaders, expectedExtraHeaders)); return this; } @@ -172,8 +212,8 @@ private void sendReply(Object reply, CommandReplyOutcome outcome) { public SagaUnitTestSupport expectCompletedSuccessfully() { assertNoCommands(); - assertTrue("Expected saga to have finished", sagaInstance.isEndState()); - assertFalse("Expected saga to have finished successfully", sagaInstance.isCompensating()); + Assert.assertTrue("Expected saga to have finished", sagaInstance.isEndState()); + Assert.assertFalse("Expected saga to have finished successfully", sagaInstance.isCompensating()); return this; } @@ -186,19 +226,19 @@ private void assertNoCommands() { Assert.fail(String.format("Expected saga to have finished but found a command of %s sent to %s: %s", mwd.getMessage().getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE), mwd.getDestination(), mwd.getMessage())); break; default: - assertEquals(emptyList(), sentCommands); + Assert.assertEquals(emptyList(), sentCommands); } } public SagaUnitTestSupport expectRolledBack() { assertNoCommands(); - assertTrue("Expected saga to have finished", sagaInstance.isEndState()); - assertTrue("Expected saga to have rolled back", sagaInstance.isCompensating()); + Assert.assertTrue("Expected saga to have finished", sagaInstance.isEndState()); + Assert.assertTrue("Expected saga to have rolled back", sagaInstance.isCompensating()); return this; } public void expectException(Exception expectedCreateException) { - assertEquals(expectedCreateException, createException.get()); + Assert.assertEquals(expectedCreateException, createException.get()); } public SagaUnitTestSupport assertSagaData(Consumer sagaDataConsumer) { @@ -206,4 +246,8 @@ public SagaUnitTestSupport assertSagaData(Consumer sagaDataConsumer) { return this; } + public MultipleCommandsExpected multiple() { + return new MultipleCommandsExpected(this); + } + } diff --git a/gradle.properties b/gradle.properties index e62cbac..da07b7b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -10,7 +10,7 @@ springBootVersion=2.4.5 springCloudContractDependenciesVersion=2.0.0.RELEASE springDependencyManagementPluginVersion=1.0.3.RELEASE - +dockerComposePluginVersion=0.12.0 eventuateUtilVersion=0.15.0.BUILD-SNAPSHOT eventuateTramVersion=0.31.0.BUILD-SNAPSHOT eventuateLocalVersion=0.39.0.BUILD-SNAPSHOT diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e61f9ec..e9dafce 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.9.1-all.zip \ No newline at end of file diff --git a/orders-and-customers/build.gradle b/orders-and-customers/build.gradle index 621b09f..bc1b0a6 100644 --- a/orders-and-customers/build.gradle +++ b/orders-and-customers/build.gradle @@ -4,5 +4,8 @@ dependencies { compile project(":eventuate-tram-sagas-orchestration") compile project(":eventuate-tram-sagas-orchestration-simple-dsl") compile 'javax.persistence:javax.persistence-api:2.2' + + testCompile project(":eventuate-tram-sagas-unit-testing-support") + } diff --git a/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/sagas/createorder/CreateOrderSagaTest.java b/orders-and-customers/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/orders/sagas/createorder/CreateOrderSagaTest.java similarity index 85% rename from orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/sagas/createorder/CreateOrderSagaTest.java rename to orders-and-customers/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/orders/sagas/createorder/CreateOrderSagaTest.java index bd5a394..349f2b9 100644 --- a/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/sagas/createorder/CreateOrderSagaTest.java +++ b/orders-and-customers/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/orders/sagas/createorder/CreateOrderSagaTest.java @@ -1,10 +1,6 @@ -package io.eventuate.examples.tram.sagas.ordersandcustomers.spring.orders.sagas.createorder; +package io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder; import io.eventuate.examples.tram.sagas.ordersandcustomers.commondomain.Money; -import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.CreateOrderSaga; -import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.CreateOrderSagaCompletedSuccesfully; -import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.CreateOrderSagaData; -import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.CreateOrderSagaRolledBack; import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.participants.ApproveOrderCommand; import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.participants.ReserveCreditCommand; import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.OrderDetails; diff --git a/settings.gradle b/settings.gradle index 9410a38..123d181 100644 --- a/settings.gradle +++ b/settings.gradle @@ -26,6 +26,7 @@ include 'eventuate-tram-sagas-common-in-memory' include 'eventuate-tram-sagas-spring-in-memory' include 'eventuate-tram-sagas-micronaut-in-memory' include "eventuate-tram-sagas-event-sourcing-support" +include 'eventuate-tram-sagas-unit-testing-support' include 'eventuate-tram-sagas-testing-support' include 'eventuate-tram-sagas-spring-testing-support' include 'eventuate-tram-sagas-spring-testing-support-cloud-contract'