Skip to content

Commit

Permalink
#76 Send one-way commands/notifications to saga participants - reacti…
Browse files Browse the repository at this point in the history
…ve saga
  • Loading branch information
cer committed Aug 11, 2022
1 parent 5cbd33f commit 10c7cd1
Show file tree
Hide file tree
Showing 23 changed files with 758 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,4 @@ public CommandWithDestination undo2() {
return new CommandWithDestination("participant2", null, new ReleaseCreditCommand());
}

@Override
public int hashCode() {
return 1;
}

@Override
public boolean equals(Object obj) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@
import org.junit.Test;

import static io.eventuate.tram.sagas.testing.SagaUnitTestSupport.given;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;

public class NotificationBasedCreateOrderSagaTest {
public class LocalSagaTest {

private LocalSagaSteps steps;

@Before
public void setUp() throws Exception {
public void setUp() {
steps = mock(LocalSagaSteps.class);
}

@Test
public void shouldExecuteAllStepsSuccessfully() {
given().
saga(new LocalSaga(steps), new LocalSagaData()).
saga(makeSaga(), new LocalSagaData()).
expect().
command(new ReserveCreditCommand()).
to("participant2").
Expand All @@ -32,7 +33,7 @@ public void shouldExecuteAllStepsSuccessfully() {
@Test
public void shouldRollbackFromStep2() {
given().
saga(new LocalSaga(steps), new LocalSagaData()).
saga(makeSaga(), new LocalSagaData()).
expect().
command(new ReserveCreditCommand()).
to("participant2").
Expand All @@ -49,16 +50,15 @@ public void shouldHandleFailureOfFirstLocalStep() {
RuntimeException expectedCreateException = new RuntimeException("Failed local step");
doThrow(expectedCreateException).when(steps).localStep1(data);
given().
saga(new LocalSaga(steps), data).
saga(makeSaga(), data).
expectException(expectedCreateException)
;
}
@Test
public void shouldHandleFailureOfLastLocalStep() {
LocalSagaData data = new LocalSagaData();
doThrow(new RuntimeException()).when(steps).localStep3(data);
doThrow(new RuntimeException()).when(steps).localStep3(any());
given().
saga(new LocalSaga(steps), data).
saga(makeSaga(), new LocalSagaData()).
expect().
command(new ReserveCreditCommand()).
to("participant2").
Expand All @@ -73,5 +73,9 @@ public void shouldHandleFailureOfLastLocalStep() {
;
}

private LocalSaga makeSaga() {
return new LocalSaga(steps);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ public CommandWithDestination getCommandWithDestination() {
public boolean isNotification() {
return notification;
}

boolean isCommand() {
return !isNotification();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ public class SagaActions<Data> {
private final List<CommandWithDestinationAndType> commands;
private final Optional<Data> updatedSagaData;
private final Optional<String> updatedState;
private boolean endState;
private boolean compensating;
private boolean local;
private final boolean endState;
private final boolean compensating;
private final boolean local;
private Optional<RuntimeException> localException;
private boolean failed;
private final boolean failed;

public SagaActions(List<CommandWithDestinationAndType> commands,
Optional<Data> updatedSagaData,
Expand Down Expand Up @@ -49,15 +49,10 @@ public boolean isCompensating() {
return compensating;
}

public boolean isLocal() {
return local;
public boolean isReplyExpected() {
return (commands.isEmpty() || commands.stream().anyMatch(CommandWithDestinationAndType::isCommand)) && !local;
}

public boolean isAllNotifications() {
return !commands.isEmpty() && commands.stream().allMatch(CommandWithDestinationAndType::isNotification);
}


public boolean isFailed() {
return failed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ public class SagaInstance {
private Boolean compensating = false;
private Boolean failed = false;

@Override
public String toString() {
return "SagaInstance{" +
"sagaType='" + sagaType + '\'' +
", id='" + id + '\'' +
", lastRequestId='" + lastRequestId + '\'' +
", serializedSagaData=" + serializedSagaData +
", stateName='" + stateName + '\'' +
", destinationsAndResources=" + destinationsAndResources +
", endState=" + endState +
", compensating=" + compensating +
", failed=" + failed +
'}';
}

public void setSagaType(String sagaType) {
this.sagaType = sagaType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,10 @@ private void processActions(String sagaType, String sagaId, SagaInstance sagaIns

sagaInstanceRepository.update(sagaInstance);

if (actions.isAllNotifications() || actions.isLocal()) {
actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions);
} else {
if (actions.isReplyExpected()) {
break;
} else {
actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ InvokeReactiveParticipantStepBuilder<Data> withAction(Optional<Predicate<Data>>
return this;
}

InvokeReactiveParticipantStepBuilder<Data> withNotificationAction(Optional<Predicate<Data>> participantInvocationPredicate,
Function<Data, Publisher<CommandWithDestination>> notificationAction) {
this.action = Optional.of(new ReactiveParticipantInvocationImpl<>(participantInvocationPredicate, notificationAction, true));
return this;
}


<C extends Command> InvokeReactiveParticipantStepBuilder<Data> withAction(Optional<Predicate<Data>> participantInvocationPredicate,
CommandEndpoint<C> commandEndpoint,
Function<Data, Publisher<C>> commandProvider) {
Expand All @@ -44,6 +51,11 @@ public InvokeReactiveParticipantStepBuilder<Data> withCompensation(Function<Data
return this;
}

public InvokeReactiveParticipantStepBuilder<Data> withCompensationNotification(Function<Data, Publisher<CommandWithDestination>> compensation) {
this.compensation = Optional.of(new ReactiveParticipantInvocationImpl<>(Optional.empty(), compensation, true));
return this;
}

@Override
public InvokeReactiveParticipantStepBuilder<Data> withCompensation(Predicate<Data> compensationPredicate, Function<Data, Publisher<CommandWithDestination>> compensation) {
this.compensation = Optional.of(new ReactiveParticipantInvocationImpl<>(Optional.of(compensationPredicate), compensation));
Expand Down Expand Up @@ -86,4 +98,5 @@ private void addStep() {
parent.addStep(new ReactiveParticipantInvocationStep<>(action, compensation, actionReplyHandlers, compensationReplyHandlers));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public ReactiveLocalStepBuilder<Data> invokeLocal(Function<Data, Publisher<?>> l
public InvokeReactiveParticipantStepBuilder<Data> invokeParticipant(Function<Data, Publisher<CommandWithDestination>> action) {
return new InvokeReactiveParticipantStepBuilder<>(parent).withAction(Optional.empty(), action);
}
public InvokeReactiveParticipantStepBuilder<Data> notifyParticipant(Function<Data, Publisher<CommandWithDestination>> action) {
return new InvokeReactiveParticipantStepBuilder<>(parent).withNotificationAction(Optional.empty(), action);
}

public InvokeReactiveParticipantStepBuilder<Data> invokeParticipant(Predicate<Data> participantInvocationPredicate,
Function<Data, Publisher<CommandWithDestination>> action) {
Expand Down Expand Up @@ -65,4 +68,5 @@ public <C extends Command> InvokeReactiveParticipantStepBuilder<Data> withCompen
CommandEndpoint<C> commandEndpoint, Function<Data, Publisher<C>> commandProvider) {
return new InvokeReactiveParticipantStepBuilder<>(parent).withCompensation(compensationPredicate, commandEndpoint, commandProvider);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.eventuate.tram.sagas.reactive.simpledsl;

import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;

import static io.eventuate.tram.sagas.reactive.simpledsl.framework.ReactiveSagaUnitTestSupport.given;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

public abstract class AbstractReactiveLocalSagaTest {
protected LocalSagaSteps steps;

@Before
public void setUp() {
steps = mock(LocalSagaSteps.class);

}

@Test
public void shouldRollbackFromStep2() {
doReturn(Mono.empty()).when(steps).localStep1(any());
doReturn(Mono.empty()).when(steps).localStep1Compensation(any());
doReturn(Mono.empty()).when(steps).localStep3(any());

given().
saga(makeSaga(), new LocalSagaData()).
expect().
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
failureReply().
andGiven().
expectRolledBack()
;
}

@Test
public void shouldHandleFailureOfFirstLocalStep() {
LocalSagaData data = new LocalSagaData();
RuntimeException expectedCreateException = new RuntimeException("Failed local step");
doThrow(expectedCreateException).when(steps).localStep1(data);
given().
saga(makeSaga(), data).
expectException(expectedCreateException)
;
}

protected abstract SimpleReactiveSaga<LocalSagaData> makeSaga();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.eventuate.tram.sagas.reactive.simpledsl;

import io.eventuate.tram.commands.consumer.CommandWithDestination;
import reactor.core.publisher.Mono;

public class LocalSagaData {

public Mono<CommandWithDestination> do2() {
return Mono.just(new CommandWithDestination("participant2", null, new ReserveCreditCommand()));
}

public Mono<CommandWithDestination> undo2() {
return Mono.just(new CommandWithDestination("participant2", null, new ReleaseCreditCommand()));
}

public Mono<CommandWithDestination> notify3() {
return Mono.just(new CommandWithDestination("participant3", null, new NotifyCommand()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.eventuate.tram.sagas.reactive.simpledsl;

import reactor.core.publisher.Mono;

public interface LocalSagaSteps {

Mono<Void> localStep1(LocalSagaData data);
Mono<Void> localStep1Compensation(LocalSagaData data);
Mono<Void> localStep3(LocalSagaData data);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.eventuate.tram.sagas.reactive.simpledsl;

import io.eventuate.tram.commands.common.Command;

public class NotifyCommand implements Command {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.eventuate.tram.sagas.reactive.simpledsl;

import io.eventuate.tram.sagas.reactive.orchestration.ReactiveSagaDefinition;

public class ReactiveLocalSaga implements SimpleReactiveSaga<LocalSagaData> {

private ReactiveSagaDefinition<LocalSagaData> sagaDefinition;

public ReactiveLocalSaga(LocalSagaSteps steps) {
this.sagaDefinition =
step()
.invokeLocal(steps::localStep1)
.withCompensation(steps::localStep1Compensation)
.step()
.invokeParticipant(LocalSagaData::do2)
.withCompensation(LocalSagaData::undo2)
.step()
.invokeLocal(steps::localStep3)
.build();
}


@Override
public ReactiveSagaDefinition<LocalSagaData> getSagaDefinition() {
return this.sagaDefinition;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.eventuate.tram.sagas.reactive.simpledsl;

import org.junit.Test;
import reactor.core.publisher.Mono;

import static io.eventuate.tram.sagas.reactive.simpledsl.framework.ReactiveSagaUnitTestSupport.given;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;

public class ReactiveLocalSagaTest extends AbstractReactiveLocalSagaTest {

@Override
protected SimpleReactiveSaga<LocalSagaData> makeSaga() {
return new ReactiveLocalSaga(steps);
}


@Test
public void shouldExecuteAllStepsSuccessfully() {
doReturn(Mono.empty()).when(steps).localStep1(any());
doReturn(Mono.empty()).when(steps).localStep1Compensation(any());
doReturn(Mono.empty()).when(steps).localStep3(any());

given().
saga(makeSaga(), new LocalSagaData()).
expect().
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
successReply()
.expectCompletedSuccessfully()
;

verify(steps).localStep3(any());
}

@Test
public void shouldHandleFailureOfLastLocalStep() {
doReturn(Mono.empty()).when(steps).localStep1(any());
doReturn(Mono.empty()).when(steps).localStep1Compensation(any());
doReturn(Mono.error(new RuntimeException())).when(steps).localStep3(any());

given().
saga(makeSaga(), new LocalSagaData()).
expect().
command(new ReserveCreditCommand()).
to("participant2").
andGiven().
successReply().
expect().
command(new ReleaseCreditCommand()).
to("participant2").
andGiven().
successReply().
expectRolledBack()
;
}
}
Loading

0 comments on commit 10c7cd1

Please sign in to comment.