Skip to content

Commit

Permalink
#82 Support nested sagas- implemented basic functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
cer committed Aug 16, 2022
1 parent 0bfe1df commit 4bcfd99
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandReplyProducer;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;
import io.eventuate.tram.sagas.participant.SagaCommandDispatcherFactory;
import io.micronaut.context.annotation.Factory;
Expand All @@ -14,10 +13,9 @@
public class SagaParticipantFactory {
@Singleton
public SagaCommandDispatcherFactory sagaCommandDispatcherFactory(MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) {
return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping, commandReplyProducer);
return new SagaCommandDispatcherFactory(messageConsumer, sagaLockManager, commandNameMapping, commandReplyProducer);
}


Expand Down
3 changes: 3 additions & 0 deletions eventuate-tram-sagas-orchestration-simple-dsl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ dependencies {
compile project(":eventuate-tram-sagas-orchestration")

testCompile project(":eventuate-tram-sagas-unit-testing-support")
testCompile project(":eventuate-tram-sagas-participant")
testCompile "io.eventuate.tram.core:eventuate-tram-in-memory:$eventuateTramVersion"

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.eventuate.tram.sagas.simpledsl.nested;

import io.eventuate.tram.sagas.orchestration.SagaInstance;
import io.eventuate.tram.sagas.orchestration.SagaInstanceRepository;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class InMemorySagaInstanceRepository implements SagaInstanceRepository {

private ConcurrentHashMap<String, SagaInstance> sagaInstances = new ConcurrentHashMap<>();

@Override
public void save(SagaInstance sagaInstance) {
String id = UUID.randomUUID().toString();
sagaInstance.setId(id);
sagaInstances.put(id, sagaInstance);
}

@Override
public SagaInstance find(String sagaType, String sagaId) {
return sagaInstances.get(sagaId);
}

@Override
public void update(SagaInstance sagaInstance) {
sagaInstances.put(sagaInstance.getId(), sagaInstance);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.eventuate.tram.sagas.simpledsl.nested;

import io.eventuate.tram.commands.common.Command;

public class InnerCommand implements Command {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.eventuate.tram.sagas.simpledsl.nested;

import io.eventuate.tram.commands.consumer.CommandReplyProducer;
import io.eventuate.tram.sagas.orchestration.SagaDefinition;
import io.eventuate.tram.sagas.simpledsl.SimpleSaga;

import static io.eventuate.tram.commands.consumer.CommandHandlerReplyBuilder.withSuccess;

public class InnerSaga implements SimpleSaga<InnerSagaData> {
private final SagaDefinition<InnerSagaData> sagaDefinition;

private final CommandReplyProducer commandReplyProducer;

public InnerSaga(CommandReplyProducer commandReplyProducer) {
this.commandReplyProducer = commandReplyProducer;
this.sagaDefinition = step()
.invokeParticipant(InnerSagaData::innerOperation)
.build()
;
}


@Override
public SagaDefinition<InnerSagaData> getSagaDefinition() {
return this.sagaDefinition;
}

@Override
public void onSagaCompletedSuccessfully(String sagaId, InnerSagaData innerSagaData) {
SimpleSaga.super.onSagaCompletedSuccessfully(sagaId, innerSagaData);
commandReplyProducer.sendReplies(innerSagaData.getCommandReplyToken(), withSuccess());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.eventuate.tram.sagas.simpledsl.nested;

import io.eventuate.tram.commands.consumer.CommandReplyToken;
import io.eventuate.tram.commands.consumer.CommandWithDestination;

public class InnerSagaData {
private CommandReplyToken commandReplyToken;

public InnerSagaData(CommandReplyToken commandReplyToken) {
this.commandReplyToken = commandReplyToken;
}

private InnerSagaData() {
}

public CommandWithDestination innerOperation() {
return new CommandWithDestination("customerService", null, new InnerCommand());
}

public CommandReplyToken getCommandReplyToken() {
return commandReplyToken;
}

public void setCommandReplyToken(CommandReplyToken commandReplyToken) {
this.commandReplyToken = commandReplyToken;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.eventuate.tram.sagas.simpledsl.nested;

import io.eventuate.tram.commands.common.DefaultCommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandDispatcher;
import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandReplyProducer;
import io.eventuate.tram.commands.producer.CommandProducerImpl;
import io.eventuate.tram.consumer.common.DecoratedMessageHandlerFactory;
import io.eventuate.tram.consumer.common.MessageConsumerImpl;
import io.eventuate.tram.inmemory.EventuateTransactionSynchronizationManager;
import io.eventuate.tram.inmemory.InMemoryMessageConsumer;
import io.eventuate.tram.inmemory.InMemoryMessageProducer;
import io.eventuate.tram.messaging.common.ChannelMapping;
import io.eventuate.tram.messaging.common.DefaultChannelMapping;
import io.eventuate.tram.messaging.common.MessageInterceptor;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.messaging.producer.common.MessageProducerImpl;
import io.eventuate.tram.sagas.common.SagaLockManager;
import io.eventuate.tram.sagas.orchestration.SagaCommandProducer;
import io.eventuate.tram.sagas.orchestration.SagaInstanceFactory;
import io.eventuate.tram.sagas.orchestration.SagaInstanceRepository;
import io.eventuate.tram.sagas.orchestration.SagaManagerFactory;
import io.eventuate.tram.sagas.participant.SagaCommandDispatcherFactory;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class NestedSagaTest {
private SagaInstanceFactory sagaInstanceFactory;
private OuterSaga outerSaga = new OuterSaga();
private InnerSaga innerSaga;

private MessageConsumerImpl messageConsumer;
private CommandProducerImpl commandProducer;
private CommandReplyProducer commandReplyProducer;

@Before
public void setup() {

// TODO - copy/paste

InMemoryMessageConsumer inMemoryMessageConsumer = new InMemoryMessageConsumer();
EventuateTransactionSynchronizationManager eventuateTransactionSynchronizationManager = mock(EventuateTransactionSynchronizationManager.class);
when(eventuateTransactionSynchronizationManager.isTransactionActive()).thenReturn(false);

ChannelMapping channelMapping = new DefaultChannelMapping.DefaultChannelMappingBuilder().build();
MessageProducer messageProducer = new MessageProducerImpl(new MessageInterceptor[0], channelMapping,
new InMemoryMessageProducer(inMemoryMessageConsumer, eventuateTransactionSynchronizationManager));

messageConsumer = new MessageConsumerImpl(channelMapping, inMemoryMessageConsumer, new DecoratedMessageHandlerFactory(Collections.emptyList()));

DefaultCommandNameMapping commandNameMapping = new DefaultCommandNameMapping();
commandProducer = new CommandProducerImpl(messageProducer, commandNameMapping);

SagaCommandProducer sagaCommandProducer = new SagaCommandProducer(commandProducer);

SagaLockManager sagaLockManager = mock(SagaLockManager.class);
when(sagaLockManager.unlock(any(), any())).thenReturn(Optional.empty());

SagaInstanceRepository sagaInstanceRepository = new InMemorySagaInstanceRepository();

commandReplyProducer = new CommandReplyProducer(messageProducer);

innerSaga = new InnerSaga(commandReplyProducer);

SagaManagerFactory sagaManagerFactory = new SagaManagerFactory(sagaInstanceRepository, commandProducer, messageConsumer, sagaLockManager, sagaCommandProducer);
sagaInstanceFactory = new SagaInstanceFactory(sagaManagerFactory, Arrays.asList(outerSaga, innerSaga));

ParticipantCommandHandlers participantCommandHandlers = new ParticipantCommandHandlers(sagaInstanceFactory, innerSaga);
CommandHandlers commandHandlers = participantCommandHandlers.commandHandlerDefinitions();
SagaCommandDispatcherFactory commandDispatcherFactory = new SagaCommandDispatcherFactory(messageConsumer, sagaLockManager, commandNameMapping, commandReplyProducer);
CommandDispatcher commandDispatcher = commandDispatcherFactory.make("subscriberId", commandHandlers);
commandDispatcher.initialize();
}

@Test
public void shouldInvokeNestedSaga() throws InterruptedException {
OuterSagaData data = new OuterSagaData();
sagaInstanceFactory.create(outerSaga, data);
TimeUnit.SECONDS.sleep(5);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.eventuate.tram.sagas.simpledsl.nested;

import io.eventuate.tram.sagas.orchestration.SagaDefinition;
import io.eventuate.tram.sagas.simpledsl.SimpleSaga;

public class OuterSaga implements SimpleSaga<OuterSagaData> {
private final SagaDefinition<OuterSagaData> sagaDefinition;

public OuterSaga() {
this.sagaDefinition = step()
.invokeParticipant(OuterSagaData::reserveCredit)
.withCompensation(OuterSagaData::releaseCredit)
.step()
.invokeLocal(OuterSagaData::approveOrder)
.build()
;
}


@Override
public SagaDefinition<OuterSagaData> getSagaDefinition() {
return this.sagaDefinition;
}

@Override
public void onSagaCompletedSuccessfully(String sagaId, OuterSagaData sagaData) {
SimpleSaga.super.onSagaCompletedSuccessfully(sagaId, sagaData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.eventuate.tram.sagas.simpledsl.nested;

import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.sagas.simpledsl.ReleaseCreditCommand;
import io.eventuate.tram.sagas.simpledsl.ReserveCreditCommand;

public class OuterSagaData {
public CommandWithDestination reserveCredit() {
return new CommandWithDestination("customerService", null, new ReserveCreditCommand());
}

public CommandWithDestination releaseCredit() {
return new CommandWithDestination("customerService", null, new ReleaseCreditCommand());
}

public void approveOrder() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.eventuate.tram.sagas.simpledsl.nested;

import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.commands.consumer.CommandReplyToken;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.sagas.orchestration.SagaInstanceFactory;
import io.eventuate.tram.sagas.participant.SagaCommandHandlersBuilder;
import io.eventuate.tram.sagas.simpledsl.ReleaseCreditCommand;
import io.eventuate.tram.sagas.simpledsl.ReserveCreditCommand;

import static io.eventuate.tram.commands.consumer.CommandHandlerReplyBuilder.withSuccess;

public class ParticipantCommandHandlers {

private final SagaInstanceFactory sagaInstanceFactory;
private final InnerSaga innerSaga;

public ParticipantCommandHandlers(SagaInstanceFactory sagaInstanceFactory, InnerSaga innerSaga) {
this.sagaInstanceFactory = sagaInstanceFactory;
this.innerSaga = innerSaga;
}

public CommandHandlers commandHandlerDefinitions() {
return SagaCommandHandlersBuilder
.fromChannel("customerService")
.onMessage(ReserveCreditCommand.class, this::reserveCredit)
.onMessage(ReleaseCreditCommand.class, this::releaseCredit)
.onMessage(InnerCommand.class, this::innerCommand)
// CommandHandler for inner saga - dummy
.build();
}

private Message innerCommand(CommandMessage<InnerCommand> innerCommandCommandMessage) {
return withSuccess();
}

private void reserveCredit(CommandMessage<ReserveCreditCommand> cm, CommandReplyToken commandReplyToken) {
InnerSagaData data = new InnerSagaData(commandReplyToken);
sagaInstanceFactory.create(innerSaga, data);
}

private Message releaseCredit(CommandMessage<ReleaseCreditCommand> releaseCreditCommandCommandMessage) {
return withSuccess();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,20 @@
import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandReplyProducer;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;

public class SagaCommandDispatcherFactory {

private final MessageConsumer messageConsumer;
private final MessageProducer messageProducer;
private final SagaLockManager sagaLockManager;
private final CommandNameMapping commandNameMapping;
private final CommandReplyProducer commandReplyProducer;

public SagaCommandDispatcherFactory(MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) {
CommandNameMapping commandNameMapping,
CommandReplyProducer commandReplyProducer) {
this.messageConsumer = messageConsumer;
this.messageProducer = messageProducer;
this.sagaLockManager = sagaLockManager;
this.commandNameMapping = commandNameMapping;
this.commandReplyProducer = commandReplyProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import io.eventuate.tram.commands.consumer.CommandHandler;
import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.commands.consumer.CommandReplyToken;
import io.eventuate.tram.messaging.common.Message;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -64,6 +66,16 @@ public <C extends Command> SagaCommandHandlerBuilder<C> onMessage(Class<C> comma
return new SagaCommandHandlerBuilder<>(this, h);
}

public <C extends Command> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, BiConsumer<CommandMessage<C>, CommandReplyToken> handler) {
SagaCommandHandler h = new SagaCommandHandler(channel, commandClass,
args -> {
handler.accept(args.getCommandMessage(), args.getCommandReplyToken());
return Collections.emptyList();
});
this.handlers.add(h);
return new SagaCommandHandlerBuilder<>(this, h);
}

public CommandHandlers build() {
return new CommandHandlers(handlers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandReplyProducer;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;
import io.eventuate.tram.sagas.participant.SagaCommandDispatcherFactory;
import io.eventuate.tram.sagas.spring.common.EventuateTramSagaCommonConfiguration;
Expand All @@ -16,11 +15,10 @@
public class SagaParticipantConfiguration {
@Bean
public SagaCommandDispatcherFactory sagaCommandDispatcherFactory(MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping,
CommandReplyProducer commandReplyProducer) {
return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping, commandReplyProducer);
return new SagaCommandDispatcherFactory(messageConsumer, sagaLockManager, commandNameMapping, commandReplyProducer);
}


Expand Down

0 comments on commit 4bcfd99

Please sign in to comment.