diff --git a/eventuate-local-java-cdc-connector-db-log-common/src/main/java/io/eventuate/local/db/log/common/DbLogClient.java b/eventuate-local-java-cdc-connector-db-log-common/src/main/java/io/eventuate/local/db/log/common/DbLogClient.java index 9cfa1619..6c5f3dd5 100644 --- a/eventuate-local-java-cdc-connector-db-log-common/src/main/java/io/eventuate/local/db/log/common/DbLogClient.java +++ b/eventuate-local-java-cdc-connector-db-log-common/src/main/java/io/eventuate/local/db/log/common/DbLogClient.java @@ -2,8 +2,6 @@ import io.eventuate.common.eventuate.local.BinlogFileOffset; import io.eventuate.common.jdbc.EventuateSchema; -import io.eventuate.common.jdbc.JdbcUrl; -import io.eventuate.common.jdbc.JdbcUrlParser; import io.eventuate.local.common.BinlogEntryReader; import io.eventuate.local.common.CdcMonitoringDao; import io.micrometer.core.instrument.MeterRegistry; @@ -15,11 +13,9 @@ public abstract class DbLogClient extends BinlogEntryReader { protected String dbUserName; protected String dbPassword; - protected String host; - protected int port; protected DbLogMetrics dbLogMetrics; - protected String dataSourceUrl; - private boolean checkEntriesForDuplicates; + protected String dataSourceUrl; + private boolean checkEntriesForDuplicates; protected volatile boolean connected; protected CdcMonitoringDao cdcMonitoringDao; @@ -53,10 +49,6 @@ public DbLogClient(MeterRegistry meterRegistry, this.dbUserName = dbUserName; this.dbPassword = dbPassword; this.dataSourceUrl = dataSourceUrl; - - JdbcUrl jdbcUrl = JdbcUrlParser.parse(dataSourceUrl); - host = jdbcUrl.getHost(); - port = jdbcUrl.getPort(); } public boolean isConnected() { diff --git a/eventuate-local-java-cdc-connector-mysql-binlog/src/main/java/io/eventuate/local/mysql/binlog/MySqlBinaryLogClient.java b/eventuate-local-java-cdc-connector-mysql-binlog/src/main/java/io/eventuate/local/mysql/binlog/MySqlBinaryLogClient.java index b2862273..30c27af9 100644 --- a/eventuate-local-java-cdc-connector-mysql-binlog/src/main/java/io/eventuate/local/mysql/binlog/MySqlBinaryLogClient.java +++ b/eventuate-local-java-cdc-connector-mysql-binlog/src/main/java/io/eventuate/local/mysql/binlog/MySqlBinaryLogClient.java @@ -8,6 +8,8 @@ import com.google.common.collect.ImmutableSet; import io.eventuate.common.eventuate.local.BinlogFileOffset; import io.eventuate.common.jdbc.EventuateSchema; +import io.eventuate.common.jdbc.JdbcUrl; +import io.eventuate.common.jdbc.JdbcUrlParser; import io.eventuate.common.jdbc.SchemaAndTable; import io.eventuate.local.common.BinlogEntry; import io.eventuate.local.common.BinlogEntryHandler; @@ -62,6 +64,8 @@ public class MySqlBinaryLogClient extends DbLogClient { private AtomicLong timeOfLatestMessage = new AtomicLong();; private Timer messagePublishingTimer; private BinaryLogClient.EventListener eventListener; + private final String host; + private final int port; public MySqlBinaryLogClient(MeterRegistry meterRegistry, String dbUserName, @@ -98,6 +102,10 @@ public MySqlBinaryLogClient(MeterRegistry meterRegistry, this.offsetStore = offsetStore; this.debeziumBinlogOffsetKafkaStore = debeziumBinlogOffsetKafkaStore; + JdbcUrl jdbcUrl = JdbcUrlParser.parse(dataSourceUrl); + host = jdbcUrl.getHost(); + port = jdbcUrl.getPort(); + timestampExtractor = new MySqlBinlogCdcMonitoringTimestampExtractor(dataSource); mySqlBinlogEntryExtractor = new MySqlBinlogEntryExtractor(dataSource); tableMapper = new TableMapper();