diff --git a/docker-compose-cdc-unified-polling-mysql.yml b/docker-compose-cdc-unified-polling-mysql.yml index 6273ab95..e1dc8cf6 100644 --- a/docker-compose-cdc-unified-polling-mysql.yml +++ b/docker-compose-cdc-unified-polling-mysql.yml @@ -13,18 +13,31 @@ services: EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181 SPRING_PROFILES_ACTIVE: ${SPRING_PROFILES_ACTIVE} - EVENTUATE_CDC_READER_MYSQLREADER_TYPE: polling - EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEURL: jdbc:mysql://mysql:3306/eventuate - EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEUSERNAME: mysqluser - EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEPASSWORD: mysqlpw - EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEDRIVERCLASSNAME: com.mysql.cj.jdbc.Driver - EVENTUATE_CDC_READER_MYSQLREADER_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/1 - EVENTUATE_CDC_READER_MYSQLREADER_OUTBOXID: 1 - EVENTUATE_CDC_READER_MYSQLREADER_POLLINGPARALLELCHANNELNAMES: parallel_channel_1 + EVENTUATE_CDC_READER_MYSQLREADER1_TYPE: polling + EVENTUATE_CDC_READER_MYSQLREADER1_DATASOURCEURL: jdbc:mysql://mysql:3306/eventuate + EVENTUATE_CDC_READER_MYSQLREADER1_DATASOURCEUSERNAME: mysqluser + EVENTUATE_CDC_READER_MYSQLREADER1_DATASOURCEPASSWORD: mysqlpw + EVENTUATE_CDC_READER_MYSQLREADER1_DATASOURCEDRIVERCLASSNAME: com.mysql.cj.jdbc.Driver + EVENTUATE_CDC_READER_MYSQLREADER1_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/1 + EVENTUATE_CDC_READER_MYSQLREADER1_OUTBOXID: 1 + + EVENTUATE_CDC_READER_MYSQLREADER2_TYPE: polling + EVENTUATE_CDC_READER_MYSQLREADER2_DATASOURCEURL: jdbc:mysql://mysql:3306/eventuate + EVENTUATE_CDC_READER_MYSQLREADER2_DATASOURCEUSERNAME: mysqluser + EVENTUATE_CDC_READER_MYSQLREADER2_DATASOURCEPASSWORD: mysqlpw + EVENTUATE_CDC_READER_MYSQLREADER2_DATASOURCEDRIVERCLASSNAME: com.mysql.cj.jdbc.Driver + EVENTUATE_CDC_READER_MYSQLREADER2_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/1 + EVENTUATE_CDC_READER_MYSQLREADER2_OUTBOXID: 1 + EVENTUATE_CDC_READER_MYSQLREADER2_POLLINGPARALLELCHANNELNAMES: parallel_channel_1 EVENTUATE_CDC_PIPELINE_P1_TYPE: eventuate-local - EVENTUATE_CDC_PIPELINE_P1_READER: MYSQLREADER + EVENTUATE_CDC_PIPELINE_P1_READER: MYSQLREADER2 - EVENTUATE_CDC_PIPELINE_P4_TYPE: eventuate-tram - EVENTUATE_CDC_PIPELINE_P4_READER: MYSQLREADER + EVENTUATE_CDC_PIPELINE_P2_TYPE: eventuate-tram + EVENTUATE_CDC_PIPELINE_P2_READER: MYSQLREADER1 + EVENTUATE_CDC_PIPELINE_P3_TYPE: eventuate-local + EVENTUATE_CDC_PIPELINE_P3_READER: MYSQLREADER1 + + EVENTUATE_CDC_PIPELINE_P4_TYPE: eventuate-tram + EVENTUATE_CDC_PIPELINE_P4_READER: MYSQLREADER2 diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProvider.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProvider.java index 2a438b97..00ffa1ad 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProvider.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProvider.java @@ -8,10 +8,10 @@ import io.eventuate.local.unified.cdc.pipeline.common.properties.RawUnifiedCdcProperties; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; public class PipelineConfigPropertiesProvider { @@ -26,22 +26,22 @@ public PipelineConfigPropertiesProvider(RawUnifiedCdcProperties rawUnifiedCdcPro this.cdcPipelineReaderFactories = cdcPipelineReaderFactories; } - public Optional> pipelineReaderProperties() { + public Optional> pipelineReaderProperties() { if (rawUnifiedCdcProperties.isReaderPropertiesDeclared()) { - return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getReader(), this::createCdcPipelineReaderProperties)); + return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getReader(), this::createCdcPipelineReaderProperties, CdcPipelineReaderProperties::getReaderName)); } else return Optional.empty(); } - public Optional> pipelineProperties() { + public Optional> pipelineProperties() { if (rawUnifiedCdcProperties.isPipelinePropertiesDeclared()) { - return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getPipeline(), this::createPipelineProperties)); + return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getPipeline(), this::createPipelineProperties, CdcPipelineProperties::getName)); } else return Optional.empty(); } - private List makeFromProperties(Map> properties, BiFunction, T> creator) { - return properties.entrySet().stream().map(entry -> creator.apply(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + private Map makeFromProperties(Map> properties, BiFunction, T> creator, Function nameGetter) { + return properties.entrySet().stream().map(entry -> creator.apply(entry.getKey(), entry.getValue())).collect(Collectors.toMap(nameGetter, x -> x)); } private CdcPipelineReaderProperties createCdcPipelineReaderProperties(String name, Map properties) { @@ -90,7 +90,7 @@ private CdcPipelineProperties createPipelineProperties(String name, Map getPollingParallelChannels() { - return pollingParallelChannels; + return pollingParallelChannels == null ? emptySet() : pollingParallelChannels; } } diff --git a/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProviderTest.java b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProviderTest.java index fa9adb59..6b77f162 100644 --- a/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProviderTest.java +++ b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProviderTest.java @@ -22,9 +22,10 @@ import java.util.Collection; import java.util.HashSet; -import java.util.List; +import java.util.Map; import java.util.Set; +import static java.util.Collections.emptySet; import static org.junit.Assert.assertEquals; @@ -62,18 +63,23 @@ public PollingCdcPipelineReaderFactory pollingCdcPipelineReaderFactory(Connectio @Test public void shouldProvideReaderProperties() { - List readers = pipelineConfigPropertiesProvider.pipelineReaderProperties().get(); - assertEquals(1, readers.size()); - PollingPipelineReaderProperties reader = (PollingPipelineReaderProperties) readers.get(0); + Map readers = pipelineConfigPropertiesProvider.pipelineReaderProperties().get(); + assertEquals(2, readers.size()); + + PollingPipelineReaderProperties mysqlreader1 = (PollingPipelineReaderProperties) readers.get("mysqlreader1"); Set expectedChannels = new HashSet<>(); expectedChannels.add("parallel_channel_1"); expectedChannels.add("parallel_channel_2"); - assertEquals(expectedChannels, reader.getPollingParallelChannels()); + assertEquals(expectedChannels, mysqlreader1.getPollingParallelChannels()); + + PollingPipelineReaderProperties mysqlreader2 = (PollingPipelineReaderProperties) readers.get("mysqlreader2"); + assertEquals(emptySet(), mysqlreader2.getPollingParallelChannels()); + } @Test public void shouldProvidePipelineProperties() { - List pipelines = pipelineConfigPropertiesProvider.pipelineProperties().get(); + Map pipelines = pipelineConfigPropertiesProvider.pipelineProperties().get(); assertEquals(2, pipelines.size()); } } \ No newline at end of file diff --git a/eventuate-local-java-cdc-connector-unified/src/test/resources/sample-pipeline-config.properties b/eventuate-local-java-cdc-connector-unified/src/test/resources/sample-pipeline-config.properties index 27b09cec..6e3f5eb5 100644 --- a/eventuate-local-java-cdc-connector-unified/src/test/resources/sample-pipeline-config.properties +++ b/eventuate-local-java-cdc-connector-unified/src/test/resources/sample-pipeline-config.properties @@ -1,14 +1,22 @@ -eventuate.cdc.reader.mysqlreader.type: polling -eventuate.cdc.reader.mysqlreader.datasourceurl: jdbc:mysql://mysql:3306/eventuate -eventuate.cdc.reader.mysqlreader.datasourceusername: mysqluser -eventuate.cdc.reader.mysqlreader.datasourcepassword: mysqlpw -eventuate.cdc.reader.mysqlreader.datasourcedriverclassname: com.mysql.cj.jdbc.driver -eventuate.cdc.reader.mysqlreader.leadershiplockpath: /eventuatelocal/cdc/leader/pipeline/1 -eventuate.cdc.reader.mysqlreader.outboxid: 1 -eventuate.cdc.reader.mysqlreader.pollingparallelchannelnames: parallel_channel_1,parallel_channel_2 +eventuate.cdc.reader.mysqlreader1.type: polling +eventuate.cdc.reader.mysqlreader1.datasourceurl: jdbc:mysql://mysql:3306/eventuate +eventuate.cdc.reader.mysqlreader1.datasourceusername: mysqluser +eventuate.cdc.reader.mysqlreader1.datasourcepassword: mysqlpw +eventuate.cdc.reader.mysqlreader1.datasourcedriverclassname: com.mysql.cj.jdbc.driver +eventuate.cdc.reader.mysqlreader1.leadershiplockpath: /eventuatelocal/cdc/leader/pipeline/1 +eventuate.cdc.reader.mysqlreader1.outboxid: 1 +eventuate.cdc.reader.mysqlreader1.pollingparallelchannelnames: parallel_channel_1,parallel_channel_2 + +eventuate.cdc.reader.mysqlreader2.type: polling +eventuate.cdc.reader.mysqlreader2.datasourceurl: jdbc:mysql://mysql:3306/eventuate +eventuate.cdc.reader.mysqlreader2.datasourceusername: mysqluser +eventuate.cdc.reader.mysqlreader2.datasourcepassword: mysqlpw +eventuate.cdc.reader.mysqlreader2.datasourcedriverclassname: com.mysql.cj.jdbc.driver +eventuate.cdc.reader.mysqlreader2.leadershiplockpath: /eventuatelocal/cdc/leader/pipeline/1 +eventuate.cdc.reader.mysqlreader2.outboxid: 1 eventuate.cdc.pipeline.p1.type: eventuate-local -eventuate.cdc.pipeline.p1.reader: mysqlreader +eventuate.cdc.pipeline.p1.reader: mysqlreader2 eventuate.cdc.pipeline.p4.type: eventuate-tram -eventuate.cdc.pipeline.p4.reader: mysqlreader +eventuate.cdc.pipeline.p4.reader: mysqlreader1