Skip to content

Commit

Permalink
#128 Enhance PollingDao to use a thread-per-channel
Browse files Browse the repository at this point in the history
Fixed NPE when no polling channels configured
  • Loading branch information
cer committed Aug 3, 2022
1 parent edf8e47 commit ff16fab
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 36 deletions.
35 changes: 24 additions & 11 deletions docker-compose-cdc-unified-polling-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,31 @@ services:
EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
SPRING_PROFILES_ACTIVE: ${SPRING_PROFILES_ACTIVE}

EVENTUATE_CDC_READER_MYSQLREADER_TYPE: polling
EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEURL: jdbc:mysql://mysql:3306/eventuate
EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEUSERNAME: mysqluser
EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEPASSWORD: mysqlpw
EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEDRIVERCLASSNAME: com.mysql.cj.jdbc.Driver
EVENTUATE_CDC_READER_MYSQLREADER_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/1
EVENTUATE_CDC_READER_MYSQLREADER_OUTBOXID: 1
EVENTUATE_CDC_READER_MYSQLREADER_POLLINGPARALLELCHANNELNAMES: parallel_channel_1
EVENTUATE_CDC_READER_MYSQLREADER1_TYPE: polling
EVENTUATE_CDC_READER_MYSQLREADER1_DATASOURCEURL: jdbc:mysql://mysql:3306/eventuate
EVENTUATE_CDC_READER_MYSQLREADER1_DATASOURCEUSERNAME: mysqluser
EVENTUATE_CDC_READER_MYSQLREADER1_DATASOURCEPASSWORD: mysqlpw
EVENTUATE_CDC_READER_MYSQLREADER1_DATASOURCEDRIVERCLASSNAME: com.mysql.cj.jdbc.Driver
EVENTUATE_CDC_READER_MYSQLREADER1_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/1
EVENTUATE_CDC_READER_MYSQLREADER1_OUTBOXID: 1

EVENTUATE_CDC_READER_MYSQLREADER2_TYPE: polling
EVENTUATE_CDC_READER_MYSQLREADER2_DATASOURCEURL: jdbc:mysql://mysql:3306/eventuate
EVENTUATE_CDC_READER_MYSQLREADER2_DATASOURCEUSERNAME: mysqluser
EVENTUATE_CDC_READER_MYSQLREADER2_DATASOURCEPASSWORD: mysqlpw
EVENTUATE_CDC_READER_MYSQLREADER2_DATASOURCEDRIVERCLASSNAME: com.mysql.cj.jdbc.Driver
EVENTUATE_CDC_READER_MYSQLREADER2_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/1
EVENTUATE_CDC_READER_MYSQLREADER2_OUTBOXID: 1
EVENTUATE_CDC_READER_MYSQLREADER2_POLLINGPARALLELCHANNELNAMES: parallel_channel_1

EVENTUATE_CDC_PIPELINE_P1_TYPE: eventuate-local
EVENTUATE_CDC_PIPELINE_P1_READER: MYSQLREADER
EVENTUATE_CDC_PIPELINE_P1_READER: MYSQLREADER2

EVENTUATE_CDC_PIPELINE_P4_TYPE: eventuate-tram
EVENTUATE_CDC_PIPELINE_P4_READER: MYSQLREADER
EVENTUATE_CDC_PIPELINE_P2_TYPE: eventuate-tram
EVENTUATE_CDC_PIPELINE_P2_READER: MYSQLREADER1

EVENTUATE_CDC_PIPELINE_P3_TYPE: eventuate-local
EVENTUATE_CDC_PIPELINE_P3_READER: MYSQLREADER1

EVENTUATE_CDC_PIPELINE_P4_TYPE: eventuate-tram
EVENTUATE_CDC_PIPELINE_P4_READER: MYSQLREADER2
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import io.eventuate.local.unified.cdc.pipeline.common.properties.RawUnifiedCdcProperties;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PipelineConfigPropertiesProvider {
Expand All @@ -26,22 +26,22 @@ public PipelineConfigPropertiesProvider(RawUnifiedCdcProperties rawUnifiedCdcPro
this.cdcPipelineReaderFactories = cdcPipelineReaderFactories;
}

public Optional<List<CdcPipelineReaderProperties>> pipelineReaderProperties() {
public Optional<Map<String, CdcPipelineReaderProperties>> pipelineReaderProperties() {
if (rawUnifiedCdcProperties.isReaderPropertiesDeclared()) {
return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getReader(), this::createCdcPipelineReaderProperties));
return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getReader(), this::createCdcPipelineReaderProperties, CdcPipelineReaderProperties::getReaderName));
} else
return Optional.empty();
}

public Optional<List<CdcPipelineProperties>> pipelineProperties() {
public Optional<Map<String, CdcPipelineProperties>> pipelineProperties() {
if (rawUnifiedCdcProperties.isPipelinePropertiesDeclared()) {
return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getPipeline(), this::createPipelineProperties));
return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getPipeline(), this::createPipelineProperties, CdcPipelineProperties::getName));
} else
return Optional.empty();
}

private <T> List<T> makeFromProperties(Map<String, Map<String, Object>> properties, BiFunction<String, Map<String, Object>, T> creator) {
return properties.entrySet().stream().map(entry -> creator.apply(entry.getKey(), entry.getValue())).collect(Collectors.toList());
private <T> Map<String, T> makeFromProperties(Map<String, Map<String, Object>> properties, BiFunction<String, Map<String, Object>, T> creator, Function<T, String> nameGetter) {
return properties.entrySet().stream().map(entry -> creator.apply(entry.getKey(), entry.getValue())).collect(Collectors.toMap(nameGetter, x -> x));
}

private CdcPipelineReaderProperties createCdcPipelineReaderProperties(String name, Map<String, Object> properties) {
Expand Down Expand Up @@ -90,7 +90,7 @@ private CdcPipelineProperties createPipelineProperties(String name, Map<String,
.convertMapToPropertyClass(properties, CdcPipelineProperties.class);

cdcPipelineProperties.validate();

cdcPipelineProperties.setName(name);
return cdcPipelineProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
public class CdcPipelineProperties implements ValidatableProperties {
private String type;

private String name;

private String reader;
private String eventuateDatabaseSchema = null;
private String sourceTableName = null;
Expand Down Expand Up @@ -53,4 +55,12 @@ public String getSourceTableName() {
public void setSourceTableName(String sourceTableName) {
this.sourceTableName = sourceTableName;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;

public class PollingPipelineReaderProperties extends CdcPipelineReaderProperties {
private Integer pollingIntervalInMilliseconds = 500;
private Integer maxEventsPerPolling = 1000;
Expand Down Expand Up @@ -58,6 +60,6 @@ public void setPollingParallelChannelNames(String pollingParallelChannelNames) {
}

public Set<String> getPollingParallelChannels() {
return pollingParallelChannels;
return pollingParallelChannels == null ? emptySet() : pollingParallelChannels;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptySet;
import static org.junit.Assert.assertEquals;


Expand Down Expand Up @@ -62,18 +63,23 @@ public PollingCdcPipelineReaderFactory pollingCdcPipelineReaderFactory(Connectio

@Test
public void shouldProvideReaderProperties() {
List<CdcPipelineReaderProperties> readers = pipelineConfigPropertiesProvider.pipelineReaderProperties().get();
assertEquals(1, readers.size());
PollingPipelineReaderProperties reader = (PollingPipelineReaderProperties) readers.get(0);
Map<String, CdcPipelineReaderProperties> readers = pipelineConfigPropertiesProvider.pipelineReaderProperties().get();
assertEquals(2, readers.size());

PollingPipelineReaderProperties mysqlreader1 = (PollingPipelineReaderProperties) readers.get("mysqlreader1");
Set<String> expectedChannels = new HashSet<>();
expectedChannels.add("parallel_channel_1");
expectedChannels.add("parallel_channel_2");
assertEquals(expectedChannels, reader.getPollingParallelChannels());
assertEquals(expectedChannels, mysqlreader1.getPollingParallelChannels());

PollingPipelineReaderProperties mysqlreader2 = (PollingPipelineReaderProperties) readers.get("mysqlreader2");
assertEquals(emptySet(), mysqlreader2.getPollingParallelChannels());

}

@Test
public void shouldProvidePipelineProperties() {
List<CdcPipelineProperties> pipelines = pipelineConfigPropertiesProvider.pipelineProperties().get();
Map<String, CdcPipelineProperties> pipelines = pipelineConfigPropertiesProvider.pipelineProperties().get();
assertEquals(2, pipelines.size());
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
eventuate.cdc.reader.mysqlreader.type: polling
eventuate.cdc.reader.mysqlreader.datasourceurl: jdbc:mysql://mysql:3306/eventuate
eventuate.cdc.reader.mysqlreader.datasourceusername: mysqluser
eventuate.cdc.reader.mysqlreader.datasourcepassword: mysqlpw
eventuate.cdc.reader.mysqlreader.datasourcedriverclassname: com.mysql.cj.jdbc.driver
eventuate.cdc.reader.mysqlreader.leadershiplockpath: /eventuatelocal/cdc/leader/pipeline/1
eventuate.cdc.reader.mysqlreader.outboxid: 1
eventuate.cdc.reader.mysqlreader.pollingparallelchannelnames: parallel_channel_1,parallel_channel_2
eventuate.cdc.reader.mysqlreader1.type: polling
eventuate.cdc.reader.mysqlreader1.datasourceurl: jdbc:mysql://mysql:3306/eventuate
eventuate.cdc.reader.mysqlreader1.datasourceusername: mysqluser
eventuate.cdc.reader.mysqlreader1.datasourcepassword: mysqlpw
eventuate.cdc.reader.mysqlreader1.datasourcedriverclassname: com.mysql.cj.jdbc.driver
eventuate.cdc.reader.mysqlreader1.leadershiplockpath: /eventuatelocal/cdc/leader/pipeline/1
eventuate.cdc.reader.mysqlreader1.outboxid: 1
eventuate.cdc.reader.mysqlreader1.pollingparallelchannelnames: parallel_channel_1,parallel_channel_2

eventuate.cdc.reader.mysqlreader2.type: polling
eventuate.cdc.reader.mysqlreader2.datasourceurl: jdbc:mysql://mysql:3306/eventuate
eventuate.cdc.reader.mysqlreader2.datasourceusername: mysqluser
eventuate.cdc.reader.mysqlreader2.datasourcepassword: mysqlpw
eventuate.cdc.reader.mysqlreader2.datasourcedriverclassname: com.mysql.cj.jdbc.driver
eventuate.cdc.reader.mysqlreader2.leadershiplockpath: /eventuatelocal/cdc/leader/pipeline/1
eventuate.cdc.reader.mysqlreader2.outboxid: 1

eventuate.cdc.pipeline.p1.type: eventuate-local
eventuate.cdc.pipeline.p1.reader: mysqlreader
eventuate.cdc.pipeline.p1.reader: mysqlreader2

eventuate.cdc.pipeline.p4.type: eventuate-tram
eventuate.cdc.pipeline.p4.reader: mysqlreader
eventuate.cdc.pipeline.p4.reader: mysqlreader1

0 comments on commit ff16fab

Please sign in to comment.