Skip to content

Commit

Permalink
#136 Enhanced EventuateCdcContainer
Browse files Browse the repository at this point in the history
  • Loading branch information
cer committed Dec 12, 2022
1 parent 9a36359 commit 5b0da6f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
1 change: 1 addition & 0 deletions eventuate-cdc-testcontainers/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
implementation "io.eventuate.common:eventuate-common-testcontainers:$eventuateCommonVersion"
implementation "io.eventuate.messaging.kafka:eventuate-messaging-kafka-testcontainers:$eventuateMessagingKafkaVersion"
implementation "org.springframework.boot:spring-boot-starter-test:$springBootVersion"

implementation "org.testcontainers:testcontainers:$testContainersVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package io.eventuate.cdc.testcontainers;

import io.eventuate.common.testcontainers.ContainerUtil;
import io.eventuate.common.testcontainers.EventuateDatabaseContainer;
import io.eventuate.common.testcontainers.EventuateGenericContainer;
import io.eventuate.common.testcontainers.PropertyProvidingContainer;
import org.testcontainers.containers.GenericContainer;
import io.eventuate.messaging.kafka.testcontainers.EventuateKafkaCluster;
import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.ImageFromDockerfile;

import java.nio.file.Path;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class EventuateCdcContainer extends GenericContainer<EventuateCdcContainer> implements PropertyProvidingContainer {
public class EventuateCdcContainer extends EventuateGenericContainer<EventuateCdcContainer> implements PropertyProvidingContainer {

private int pipelineIdx;

public EventuateCdcContainer() {
super(ContainerUtil.findImage("eventuateio/eventuate-cdc-service", "eventuate.cdc.version.properties"));
Expand All @@ -22,10 +28,67 @@ public EventuateCdcContainer(Path path) {
}

private void withConfiguration() {
waitingFor(Wait.forHealthcheck());
}

@Override
public void registerProperties(BiConsumer<String, Supplier<Object>> registry) {

}

public EventuateCdcContainer withKafkaCluster(EventuateKafkaCluster eventuateKafkaCluster) {
withNetwork(eventuateKafkaCluster.network);
withEnv("EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING", eventuateKafkaCluster.zookeeper.getConnectionString());
withEnv("EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS", eventuateKafkaCluster.kafka.getConnectionString());
return this;
}
public EventuateCdcContainer withTramPipeline(EventuateDatabaseContainer<?> database) {
newPipeline();

withEnv("EVENTUATE_CDC_READER_READERX_DATASOURCEURL", database.getJdbcUrl());

withEnv("EVENTUATE_CDC_READER_READERX_TYPE", database.getCdcReaderType());
withEnv("EVENTUATE_CDC_READER_READERX_MONITORINGSCHEMA", database.getMonitoringSchema());
withEnv("EVENTUATE_CDC_READER_READERX_DATASOURCEUSERNAME", database.getAdminCredentials().userName);
withEnv("EVENTUATE_CDC_READER_READERX_DATASOURCEPASSWORD", database.getAdminCredentials().password);
withEnv("EVENTUATE_CDC_READER_READERX_DATASOURCEDRIVERCLASSNAME", database.getDriverClassName());
withEnv("EVENTUATE_CDC_READER_READERX_LEADERSHIPLOCKPATH", () -> String.format("/eventuate/cdc/leader/%s", database.getContainerId()));
withEnv("EVENTUATE_CDC_READER_READERX_CDCDBUSERNAME", database.getAdminCredentials().userName);
withEnv("EVENTUATE_CDC_READER_READERX_CDCDBPASSWORD", database.getAdminCredentials().password);
withEnv("EVENTUATE_CDC_READER_READERX_READOLDDEBEZIUMDBOFFSETSTORAGETOPIC", "false");
withEnv("EVENTUATE_CDC_READER_READERX_MYSQLBINLOGCLIENTUNIQUEID", Integer.toString(pipelineIdx));
withEnv("EVENTUATE_CDC_READER_READERX_OFFSETSTOREKEY", database::getContainerId);
withEnv("EVENTUATE_CDC_READER_READERX_OFFSETSTORAGETOPICNAME", "db.history.common");
withEnv("EVENTUATE_CDC_READER_READERX_OUTBOXID", Integer.toString(pipelineIdx));

withEnv("EVENTUATE_CDC_PIPELINE_PIPELINEX_TYPE", "eventuate-tram");
withEnv("EVENTUATE_CDC_PIPELINE_PIPELINEX_READER", "reader" + pipelineIdx);
withEnv("EVENTUATE_CDC_PIPELINE_PIPELINEX_EVENTUATEDATABASESCHEMA", database.getDatabaseName());
return this;
}

private void newPipeline() {
pipelineIdx++;
}

@Override
public EventuateCdcContainer withEnv(String name, String value) {
return super.withEnv(replaceName(name), value);
}

@Override
public EventuateCdcContainer withEnv(String name, Supplier<String> valueSupplier) {
return super.withEnv(replaceName(name), valueSupplier);
}

@NotNull
private String replaceName(String name) {
return name.replace("_READERX_", String.format("_READER%s_", pipelineIdx))
.replace("_PIPELINEX_", String.format("_PIPELINE%s_", pipelineIdx));
}

@Override
protected int getPort() {
return 8080;
}
}

0 comments on commit 5b0da6f

Please sign in to comment.