Skip to content

Commit

Permalink
#76 Send one-way commands/notifications to saga participants
Browse files Browse the repository at this point in the history
Extracted framework independent eventuate-tram-sagas-unit-testing-support from eventuate-tram-sagas-testing-support
  • Loading branch information
cer committed Aug 9, 2022
1 parent ca896db commit ec42ecd
Show file tree
Hide file tree
Showing 44 changed files with 468 additions and 97 deletions.
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ buildscript {
maven {
url "https://plugins.gradle.org/m2/"
}
jcenter()
eventuateMavenRepoUrl.split(',').each { repoUrl -> maven { url repoUrl } }
}
dependencies {
classpath "com.avast.gradle:gradle-docker-compose-plugin:0.9.2"
classpath "com.avast.gradle:gradle-docker-compose-plugin:$dockerComposePluginVersion"
classpath "io.eventuate.plugins.gradle:eventuate-plugins-gradle-publish:$eventuatePluginsGradleVersion"
}
}
Expand All @@ -30,7 +29,6 @@ allprojects {

repositories {
mavenCentral()
jcenter()
eventuateMavenRepoUrl.split(',').each { repoUrl -> maven { url repoUrl } }
}

Expand Down
2 changes: 1 addition & 1 deletion eventuate-tram-sagas-orchestration-simple-dsl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
dependencies {
compile project(":eventuate-tram-sagas-orchestration")

testCompile project(":eventuate-tram-sagas-testing-support")
testCompile project(":eventuate-tram-sagas-unit-testing-support")
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ InvokeParticipantStepBuilder<Data> withAction(Optional<Predicate<Data>> particip
return this;
}

public InvokeParticipantStepBuilder<Data> withNotificationAction(Optional<Predicate<Data>> participantInvocationPredicate, Function<Data, CommandWithDestination> notificationAction) {
this.action = Optional.of(new ParticipantInvocationImpl<>(participantInvocationPredicate, notificationAction, true));
return this;
}


<C extends Command> InvokeParticipantStepBuilder<Data> withAction(Optional<Predicate<Data>> participantInvocationPredicate, CommandEndpoint<C> commandEndpoint, Function<Data, C> commandProvider) {
this.action = Optional.of(new ParticipantEndpointInvocationImpl<>(participantInvocationPredicate, commandEndpoint, commandProvider));
return this;
Expand All @@ -39,6 +45,11 @@ public InvokeParticipantStepBuilder<Data> withCompensation(Function<Data, Comman
return this;
}

public InvokeParticipantStepBuilder<Data> withCompensationNotification(Function<Data, CommandWithDestination> compensation) {
this.compensation = Optional.of(new ParticipantInvocationImpl<>(Optional.empty(), compensation, true));
return this;
}

@Override
public InvokeParticipantStepBuilder<Data> withCompensation(Predicate<Data> compensationPredicate, Function<Data, CommandWithDestination> compensation) {
this.compensation = Optional.of(new ParticipantInvocationImpl<>(Optional.of(compensationPredicate), compensation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.common.CommandReplyOutcome;
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType;

import java.util.Optional;
import java.util.function.Function;
Expand All @@ -28,7 +28,7 @@ public boolean isSuccessfulReply(Message message) {
}

@Override
public CommandWithDestination makeCommandToSend(Data data) {
return new CommandWithDestination(commandEndpoint.getCommandChannel(), null, commandProvider.apply(data));
public CommandWithDestinationAndType makeCommandToSend(Data data) {
return CommandWithDestinationAndType.command(commandEndpoint.getCommandChannel(), null, commandProvider.apply(data)); // TODO notifications
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType;


public interface ParticipantInvocation<Data> {
Expand All @@ -10,5 +10,5 @@ public interface ParticipantInvocation<Data> {

boolean isInvocable(Data data);

CommandWithDestination makeCommandToSend(Data data);
CommandWithDestinationAndType makeCommandToSend(Data data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,34 @@
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType;

import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;

public class ParticipantInvocationImpl<Data, C extends Command> extends AbstractParticipantInvocation<Data> {
private Function<Data, CommandWithDestination> commandBuilder;
private final boolean notification;
private final Function<Data, CommandWithDestination> commandBuilder;


public ParticipantInvocationImpl(Optional<Predicate<Data>> invocablePredicate, Function<Data, CommandWithDestination> commandBuilder) {
this(invocablePredicate, commandBuilder, false);
}

public ParticipantInvocationImpl(Optional<Predicate<Data>> invocablePredicate, Function<Data, CommandWithDestination> commandBuilder, boolean notification) {
super(invocablePredicate);
this.commandBuilder = commandBuilder;
this.notification = notification;
}

@Override
@Override
public boolean isSuccessfulReply(Message message) {
return CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));
}

@Override
public CommandWithDestination makeCommandToSend(Data data) {
return commandBuilder.apply(data);
public CommandWithDestinationAndType makeCommandToSend(Data data) {
return new CommandWithDestinationAndType(commandBuilder.apply(data), notification);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.messaging.common.Message;

import java.util.*;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

public class ParticipantInvocationStep<Data> implements SagaStep<Data> {
Expand Down Expand Up @@ -47,7 +49,7 @@ public boolean isSuccessfulReply(boolean compensating, Message message) {
@Override
public StepOutcome makeStepOutcome(Data data, boolean compensating) {
return StepOutcome.makeRemoteStepOutcome(getParticipantInvocation(compensating)
.map(x -> x.makeCommandToSend(data))
.map(pi -> pi.makeCommandToSend(data))
.map(Collections::singletonList)
.orElseGet(Collections::emptyList));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,8 @@ public <C extends Command> InvokeParticipantStepBuilder<Data> withCompensation(C
public <C extends Command> InvokeParticipantStepBuilder<Data> withCompensation(Predicate<Data> compensationPredicate, CommandEndpoint<C> commandEndpoint, Function<Data, C> commandProvider) {
return new InvokeParticipantStepBuilder<>(parent).withCompensation(compensationPredicate, commandEndpoint, commandProvider);
}

public InvokeParticipantStepBuilder<Data> notifyParticipant(Function<Data, CommandWithDestination> notificationAction) {
return new InvokeParticipantStepBuilder<>(parent).withNotificationAction(Optional.empty(), notificationAction);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.sagas.orchestration.CommandWithDestinationAndType;

import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

public abstract class StepOutcome {

public abstract void visit(Consumer<Optional<RuntimeException>> localConsumer, Consumer<List<CommandWithDestination>> commandsConsumer);
public abstract void visit(Consumer<Optional<RuntimeException>> localConsumer, Consumer<List<CommandWithDestinationAndType>> commandsConsumer);

static class LocalStepOutcome extends StepOutcome {
private Optional<RuntimeException> localOutcome;
Expand All @@ -18,28 +18,28 @@ public LocalStepOutcome(Optional<RuntimeException> localOutcome) {
}

@Override
public void visit(Consumer<Optional<RuntimeException>> localConsumer, Consumer<List<CommandWithDestination>> commandsConsumer) {
public void visit(Consumer<Optional<RuntimeException>> localConsumer, Consumer<List<CommandWithDestinationAndType>> commandsConsumer) {
localConsumer.accept(localOutcome);
}
}

static class RemoteStepOutcome extends StepOutcome {
private List<CommandWithDestination> commandsToSend;
private List<CommandWithDestinationAndType> commandsToSend;

public RemoteStepOutcome(List<CommandWithDestination> commandsToSend) {
public RemoteStepOutcome(List<CommandWithDestinationAndType> commandsToSend) {
this.commandsToSend = commandsToSend;
}

@Override
public void visit(Consumer<Optional<RuntimeException>> localConsumer, Consumer<List<CommandWithDestination>> commandsConsumer) {
public void visit(Consumer<Optional<RuntimeException>> localConsumer, Consumer<List<CommandWithDestinationAndType>> commandsConsumer) {
commandsConsumer.accept(commandsToSend);
}
}

public static StepOutcome makeLocalOutcome(Optional<RuntimeException> localOutcome) {
return new LocalStepOutcome(localOutcome);
}
public static StepOutcome makeRemoteStepOutcome(List<CommandWithDestination> commandsToSend) {
public static StepOutcome makeRemoteStepOutcome(List<CommandWithDestinationAndType> commandsToSend) {
return new RemoteStepOutcome(commandsToSend);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public CommandWithDestination undo1() {
}

public CommandWithDestination do2() {
return new CommandWithDestination("participant2", null, new Do2Command());
return new CommandWithDestination("participant2", null, new ReserveCreditCommand());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void shouldExecuteAllStepsSuccessfully() {
andGiven().
successReply().
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
successReply().
Expand All @@ -34,7 +34,7 @@ public void shouldRollback() {
andGiven().
successReply().
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
failureReply().
Expand All @@ -52,7 +52,7 @@ public void shouldExecuteAllStepsExcept1Successfully() {
given().
saga(new ConditionalSaga(), new ConditionalSagaData(false)).
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
successReply().
Expand All @@ -65,7 +65,7 @@ public void shouldRollbackExcept1() {
given().
saga(new ConditionalSaga(), new ConditionalSagaData(false)).
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
failureReply().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
public class LocalSagaData {

public CommandWithDestination do2() {
return new CommandWithDestination("participant2", null, new Do2Command());
return new CommandWithDestination("participant2", null, new ReserveCreditCommand());
}

public CommandWithDestination undo2() {
return new CommandWithDestination("participant2", null, new Undo2Command());
return new CommandWithDestination("participant2", null, new ReleaseCreditCommand());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;

public class LocalSagaTest {
public class NotificationBasedCreateOrderSagaTest {

private LocalSagaSteps steps;

Expand All @@ -21,7 +21,7 @@ public void shouldExecuteAllStepsSuccessfully() {
given().
saga(new LocalSaga(steps), new LocalSagaData()).
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
successReply().
Expand All @@ -34,7 +34,7 @@ public void shouldRollbackFromStep2() {
given().
saga(new LocalSaga(steps), new LocalSagaData()).
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
failureReply().
Expand All @@ -60,12 +60,12 @@ public void shouldHandleFailureOfLastLocalStep() {
given().
saga(new LocalSaga(steps), data).
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
successReply().
expect().
command(new Undo2Command()).
command(new ReleaseCreditCommand()).
to("participant2").
andGiven().
successReply().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

import io.eventuate.tram.commands.common.Command;

public class Do2Command implements Command {
public class ReleaseCreditCommand implements Command {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

import io.eventuate.tram.commands.common.Command;

public class Undo2Command implements Command {
public class ReserveCreditCommand implements Command {
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void shouldExecuteAllStepsSuccessfully() {
andGiven().
successReply().
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
successReply().
Expand All @@ -61,7 +61,7 @@ public void shouldRollback() {
andGiven().
successReply().
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
failureReply().
Expand All @@ -88,7 +88,7 @@ public void shouldExecuteAllStepsExcept1Successfully() {
given().
saga(new WithHandlersSaga(handlers), new ConditionalSagaData(false)).
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
successReply().
Expand All @@ -108,7 +108,7 @@ public void shouldRollbackExcept1() {
given().
saga(new WithHandlersSaga(handlers), new ConditionalSagaData(false)).
expect().
command(new Do2Command()).
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
failureReply().
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.eventuate.tram.sagas.simpledsl.notifications;

import io.eventuate.tram.commands.common.Command;

public class FulfillOrder implements Command {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.eventuate.tram.sagas.simpledsl.notifications;

import io.eventuate.tram.sagas.orchestration.SagaDefinition;
import io.eventuate.tram.sagas.simpledsl.SimpleSaga;

public class NotificationBasedCreateOrderSaga implements SimpleSaga<NotificationBasedCreateOrderSagaData> {

private final SagaDefinition<NotificationBasedCreateOrderSagaData> sagaDefinition;

public NotificationBasedCreateOrderSaga(NotificationBasedCreateOrderSagaSteps steps) {
this.sagaDefinition =
step()
.invokeLocal(steps::createOrder)
.withCompensation(steps::rejectOrder)
.step()
.invokeParticipant(NotificationBasedCreateOrderSagaData::reserveCredit)
.withCompensation(NotificationBasedCreateOrderSagaData::releaseCredit)
.step()
.invokeParticipant(NotificationBasedCreateOrderSagaData::reserveInventory)
.withCompensationNotification(NotificationBasedCreateOrderSagaData::releaseInventory)
.step()
.invokeLocal(steps::approveOrder)
.step()
.notifyParticipant(NotificationBasedCreateOrderSagaData::fulfillOrder)
.build();
}


@Override
public SagaDefinition<NotificationBasedCreateOrderSagaData> getSagaDefinition() {
return this.sagaDefinition;
}

}
Loading

0 comments on commit ec42ecd

Please sign in to comment.