Skip to content

Commit

Permalink
eventuate-tram#185 Use Flyway DB eventuate-tram#186 Upgrade to Spring…
Browse files Browse the repository at this point in the history
… Cloud Contract 3.x
  • Loading branch information
cer committed Dec 16, 2022
1 parent 8052e84 commit dcd18bb
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 102 deletions.
8 changes: 8 additions & 0 deletions eventuate-tram-spring-flyway/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dependencies {
implementation "org.flywaydb:flyway-core:$flywayVersion"
implementation "org.flywaydb:flyway-mysql:$flywayVersion"

implementation "org.springframework.boot:spring-boot-starter:$springBootVersion"
implementation "io.eventuate.common:eventuate-common-jdbc:$eventuateCommonVersion"
implementation "io.eventuate.common:eventuate-common-flyway:$eventuateCommonVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.eventuate.tram.spring.flyway;

import io.eventuate.common.jdbc.OutboxPartitioningSpec;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class EventuateTramFlywayMigrationConfiguration {

@Bean
public V1005__MyMigration v1005__myMigration(OutboxPartitioningSpec outboxPartitioningSpec) {
return new V1005__MyMigration(outboxPartitioningSpec);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.eventuate.tram.spring.flyway;

import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.util.FileCopyUtils;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.util.Map;

import static java.nio.charset.StandardCharsets.UTF_8;

public class ScriptExecutor {
public static String readFileToString(String path) {
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(path);
try (Reader reader = new InputStreamReader(resource.getInputStream(), UTF_8)) {
return FileCopyUtils.copyToString(reader);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public void executeScript(Map<String, String> replacements, String script, SqlExecutor sqlExecutor) {

String s = readFileToString(script);

for (Map.Entry<String, String> entry : replacements.entrySet())
s = s.replace("${" + entry.getKey() + "}", entry.getValue());

String[] t = s.split(";");

for (String statement : t) {
if (!statement.startsWith("USE ") && statement.trim().length() > 0) {
System.out.println(statement);
try {
sqlExecutor.execute(statement);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.eventuate.tram.spring.flyway;

import java.sql.SQLException;

interface SqlExecutor {
void execute(String ddl) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.eventuate.tram.spring.flyway;

import io.eventuate.common.jdbc.OutboxPartitioningSpec;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.springframework.boot.jdbc.DatabaseDriver;

import java.sql.Connection;
import java.util.Collections;

public class V1005__MyMigration extends BaseJavaMigration {

private OutboxPartitioningSpec outboxPartitioningSpec;
private ScriptExecutor scriptExecutor;

public V1005__MyMigration(OutboxPartitioningSpec outboxPartitioningSpec) {
this.outboxPartitioningSpec = outboxPartitioningSpec;
this.scriptExecutor = new ScriptExecutor();
}

@Override
public void migrate(Context context) throws Exception {
System.out.println("Hi-flyway");

Connection connection = context.getConnection();
DatabaseDriver driver = DatabaseDriver.fromJdbcUrl(connection.getMetaData().getURL());
String driverId = driver.getId();

SqlExecutor sqlExecutor = statement -> connection.prepareStatement(statement).execute();

outboxPartitioningSpec.outboxTableSuffixes().forEach(suffix ->
scriptExecutor.executeScript(Collections.singletonMap("EVENTUATE_OUTBOX_SUFFIX", suffix.suffixAsString),
"flyway-templates/" + driverId + "/3.create-message-table.sql", sqlExecutor));
}

}
4 changes: 4 additions & 0 deletions eventuate-tram-spring-logging/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
implementation project(":eventuate-tram-messaging")
implementation "org.springframework.boot:spring-boot-starter:$springBootVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.eventuate.tram.spring.logging;

import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.common.MessageInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoggingMessageInterceptor implements MessageInterceptor {

private static final Logger logger = LoggerFactory.getLogger("io.eventuate.activity");

@Override
public void postSend(Message message, Exception e) {
logger.info("Message Sent: {}", message);
}

@Override
public void preHandle(String subscriberId, Message message) {
logger.info("message received: {} {}", subscriberId, message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.eventuate.tram.spring.logging;

import io.eventuate.tram.messaging.common.MessageInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class LoggingMessageInterceptorAutoConfiguration {
@Bean
public MessageInterceptor messageLoggingInterceptor() {
return new LoggingMessageInterceptor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.eventuate.tram.spring.logging.LoggingMessageInterceptorAutoConfiguration
28 changes: 5 additions & 23 deletions eventuate-tram-spring-testing-support-cloud-contract/build.gradle
Original file line number Diff line number Diff line change
@@ -1,28 +1,10 @@

buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath "io.spring.gradle:dependency-management-plugin:$springDependencyManagementPluginVersion"
// if using Stub Runner (consumer side) only remove this dependency
classpath "org.springframework.cloud:spring-cloud-contract-gradle-plugin:$springCloudContractDependenciesVersion"
}
}

apply plugin: "io.spring.dependency-management"

dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-contract-dependencies:$springCloudContractDependenciesVersion"
}
}

dependencies {

compile project(":eventuate-tram-messaging")
compile project(":eventuate-tram-commands")
compile "org.springframework.cloud:spring-cloud-starter-contract-verifier:$springCloudContractDependenciesVersion"
compile project(":eventuate-tram-spring-in-memory")
compile "org.springframework.cloud:spring-cloud-starter-contract-stub-runner:$springCloudContractDependenciesVersion"

compile project(":eventuate-tram-testing-support")

compile 'org.springframework.cloud:spring-cloud-starter-contract-verifier'
compile "org.springframework.cloud:spring-cloud-starter-contract-stub-runner"
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@
import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierMessage;
import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierMessaging;

import javax.inject.Inject;
import java.util.HashMap;

public class EventuateContractVerifierMessaging extends ContractVerifierMessaging<Message> {

@Inject
ContractVerifierMessaging contractVerifierMessaging;

public EventuateContractVerifierMessaging(MessageVerifier<Message> exchange) {
public class ContractVerifierEventuateMessaging extends ContractVerifierMessaging<Message> {
public ContractVerifierEventuateMessaging(MessageVerifier<Message> exchange) {
super(exchange);
}

@Override
protected ContractVerifierMessage convert(Message m) {
return m == null ? null : contractVerifierMessaging.create(m.getPayload(), m.getHeaders());
protected ContractVerifierMessage convert(Message receive) {
if (receive == null)
return null;
return new ContractVerifierMessage(receive.getPayload(), new HashMap<>(receive.getHeaders()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@
public class EventuateContractVerifierConfiguration {

@Bean
public MessageVerifier eventuateTramMessageVerifier() {

public EventuateTramMessageVerifier newEventuateTramMessageVerifier() {
return new EventuateTramMessageVerifier();
}

@Bean
public ContractVerifierMessaging<Message> eventuateContractVerifierMessaging(MessageVerifier<Message> exchange) {
return new EventuateContractVerifierMessaging(exchange);
public ContractVerifierMessaging<Message> contractVerifierEventuateMessaging(MessageVerifier<Message> exchange) {
return new ContractVerifierEventuateMessaging(exchange);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import io.eventuate.tram.messaging.producer.MessageBuilder;
import io.eventuate.tram.messaging.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.contract.verifier.converter.YamlContract;
import org.springframework.cloud.contract.verifier.messaging.MessageVerifier;

import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -19,20 +21,11 @@ public class EventuateTramMessageVerifier implements MessageVerifier<Message> {

@Autowired
private MessageProducer messageProducer;

@Autowired
private MessageConsumer messageConsumer;

@Override
public void send(Message message, String destination) {
throw new UnsupportedOperationException();
}

private ConcurrentHashMap<String, LinkedBlockingQueue<Message>> messagesByDestination = new ConcurrentHashMap<>();

public EventuateTramMessageVerifier() {
}

@PostConstruct
public void subscribe() {
messageConsumer.subscribe(getClass().getName(), singleton("*"), m -> {
Expand All @@ -46,32 +39,31 @@ private LinkedBlockingQueue<Message> getForDestination(String destination) {
}

@Override
public <T> void send(T payload, Map<String, Object> headers, String destination) {
String p = (String) payload;
MessageBuilder mb = MessageBuilder.withPayload(p);

headers.forEach((key, value) -> {
mb.withHeader(key, (String) value);
});

messageProducer.send(destination, mb.build());
}

@Override
public Message receive(String destination, long timeout, TimeUnit timeUnit) {
public Message receive(String destination, long timeout, TimeUnit timeUnit, @Nullable YamlContract contract) {
Message m;
try {
m = getForDestination(destination).poll(timeout, timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (m == null)
return null;
return m;
}

@Override
public Message receive(String destination) {
public Message receive(String destination, YamlContract contract) {
return receive(destination, 5, TimeUnit.SECONDS);
}

@Override
public void send(Message message, String destination, @Nullable YamlContract contract) {
messageProducer.send(destination, message);
}

@Override
public <T> void send(T payload, Map<String, Object> headers, String destination, @Nullable YamlContract contract) {
MessageBuilder messageBuilder = MessageBuilder.withPayload(payload.toString());
headers.forEach((name, value) -> messageBuilder.withHeader(name, value.toString()));
messageProducer.send(destination, messageBuilder.build());

}
}
Loading

0 comments on commit dcd18bb

Please sign in to comment.