diff --git a/CHANGELOG.md b/CHANGELOG.md index 6760331e..1b23b0b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 1.2.2 +* Adding a new property `tolerateStateMismatch` to allow for the connector to continue processing even if the state stored in ClickHouse does not match the current offset in Kafka + +# 1.2.1 +* Adding some additional logging details to help debug issues + # 1.2.0 * Adding a KeyToValue transformation to allow for key to be stored in a separate column in ClickHouse diff --git a/VERSION b/VERSION index 6a5e98a7..cc904638 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.2.1 +v1.2.2 diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java index 2362d057..992918f8 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java @@ -47,6 +47,7 @@ public class ClickHouseSinkConfig { public static final String DB_TOPIC_SPLIT_CHAR = "dbTopicSplitChar"; public static final String KEEPER_ON_CLUSTER = "keeperOnCluster"; public static final String DATE_TIME_FORMAT = "dateTimeFormats"; + public static final String TOLERATE_STATE_MISMATCH = "tolerateStateMismatch"; public static final int MILLI_IN_A_SEC = 1000; private static final String databaseDefault = "default"; @@ -92,6 +93,7 @@ public enum StateStores { private final String keeperOnCluster; private final Map dateTimeFormats; private final String clientVersion; + private final boolean tolerateStateMismatch; public enum InsertFormats { NONE, @@ -263,6 +265,7 @@ public ClickHouseSinkConfig(Map props) { } } this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1"); + this.tolerateStateMismatch = Boolean.parseBoolean(props.getOrDefault(TOLERATE_STATE_MISMATCH, "false")); LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}", hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce); @@ -558,6 +561,16 @@ private static ConfigDef createConfigDef() { ConfigDef.Width.SHORT, "Client version" ); + configDef.define(TOLERATE_STATE_MISMATCH, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + "Tolerate state mismatch. default: false", + group, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Tolerate state mismatch." + ); return configDef; } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/kafka/RangeContainer.java b/src/main/java/com/clickhouse/kafka/connect/sink/kafka/RangeContainer.java index 3259d67d..03c39b38 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/kafka/RangeContainer.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/kafka/RangeContainer.java @@ -66,6 +66,9 @@ public RangeState getOverLappingState(RangeContainer rangeContainer) { // ZEROED [10, 20] Actual [0, 10] if (actualMinOffset == 0) return RangeState.ZERO; + // PREVIOUS [10, 20] Actual [5, 8] + if (actualMaxOffset < minOffset) + return RangeState.PREVIOUS; // ERROR [10, 20] Actual [8, 19] return RangeState.ERROR; } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/kafka/RangeState.java b/src/main/java/com/clickhouse/kafka/connect/sink/kafka/RangeState.java index 43fd2ce9..5add7f51 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/kafka/RangeState.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/kafka/RangeState.java @@ -1,7 +1,6 @@ package com.clickhouse.kafka.connect.sink.kafka; public enum RangeState { - ZERO(0), //This is for when it seems like the topic has been deleted/recreated SAME(1), PREFIX(2), @@ -9,7 +8,8 @@ public enum RangeState { CONTAINS(4), OVER_LAPPING(5), NEW(6), - ERROR(7); + ERROR(7), + PREVIOUS(8); private int rangeState; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java b/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java index 1608de45..1fe2c463 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java @@ -34,12 +34,6 @@ public class Processing { private ErrorReporter errorReporter = null; - public Processing(StateProvider stateProvider, DBWriter dbWriter) { - this.stateProvider = stateProvider; - this.dbWriter = dbWriter; - this.errorReporter = null; - } - public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter, ClickHouseSinkConfig clickHouseSinkConfig) { this.stateProvider = stateProvider; this.dbWriter = dbWriter; @@ -177,8 +171,8 @@ public void doLogic(List records) throws IOException, ExecutionException doInsert(rightRecords, rightRangeContainer); stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.AFTER_PROCESSING)); break; - case ERROR: - throw new RuntimeException(String.format("State MISMATCH given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]", + default: //case ERROR: + throw new RuntimeException(String.format("ERROR State given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]", records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset())); } break; @@ -211,8 +205,17 @@ public void doLogic(List records) throws IOException, ExecutionException doInsert(rightRecords, rightRangeContainer); stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.AFTER_PROCESSING)); break; - case ERROR: - throw new RuntimeException(String.format("State MISMATCH given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]", + case PREVIOUS: + if (clickHouseSinkConfig.isTolerateStateMismatch()) { + LOGGER.warn("State MISMATCH as batch already processed - skipping [{}] records for topic: [{}], partition: [{}], minOffset: [{}], maxOffset: [{}], storedMinOffset: [{}], storedMaxOffset: [{}]", + records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset()); + } else { + throw new RuntimeException(String.format("State MISMATCH as batch already processed - skipping [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], storedMinOffset: [%s], storedMaxOffset: [%s]", + records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset())); + } + break; + default: //case ERROR: + throw new RuntimeException(String.format("ERROR State given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]", records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset())); } } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java index 11d5c5cc..bd9fbe47 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java @@ -1,4 +1,5 @@ package com.clickhouse.kafka.connect.sink.processing; +import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -18,6 +19,7 @@ import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Assert; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -58,7 +60,7 @@ public void ProcessAllAtOnceNewTest() throws IOException, ExecutionException, In List records = createRecords("test", 1); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); } @@ -73,7 +75,7 @@ public void ProcessSplitNewTest() throws IOException, ExecutionException, Interr assertEquals(records.size(), recordsHead.size() + recordsTail.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); processing.doLogic(recordsTail); @@ -86,7 +88,7 @@ public void ProcessAllNewTwiceTest() throws IOException, ExecutionException, Int List records = createRecords("test", 1); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); processing.doLogic(records); @@ -102,7 +104,7 @@ public void ProcessAllNewFailedSetStateAfterProcessingTest() throws IOException, //List recordsTail = records.subList(splitPoint, records.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -118,7 +120,7 @@ public void ProcessContainsBeforeProcessingTest() throws IOException, ExecutionE List containsRecords = records.subList(345,850); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -133,7 +135,7 @@ public void ProcessContainsAfterProcessingTest() throws IOException, ExecutionEx List containsRecords = records.subList(345,850); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); processing.doLogic(containsRecords); @@ -148,7 +150,7 @@ public void ProcessOverlappingBeforeProcessingTest() throws IOException, Executi List containsRecords = records.subList(345,850); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); processing.doLogic(containsRecords); @@ -166,7 +168,7 @@ public void ProcessSplitNewWithBeforeProcessingTest() throws IOException, Execut assertEquals(records.size(), recordsHead.size() + recordsTail.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -183,7 +185,7 @@ public void ProcessDeletedTopicBeforeProcessingTest() throws IOException, Execut List containsRecords = records.subList(0,150); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -218,7 +220,7 @@ public void ProcessPartialOverlappingBeforeProcessingTest() throws IOException, List recordsTail = records.subList(splitPointLow, records.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -237,11 +239,30 @@ public void ProcessPartialOverlappingAfterProcessingTest() throws IOException, E List recordsTail = records.subList(splitPointLow, records.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); processing.doLogic(recordsTail); assertEquals(records.size(), dbWriter.recordsInserted()); } + @Test + @DisplayName("ProcessOldRecordsTest") + public void ProcessOldRecordsTest() throws IOException, ExecutionException, InterruptedException { + List records = createRecords("test", 1); + List recordsHead = records.subList(1, 2); + StateProvider stateProvider = new InMemoryState(); + stateProvider.setStateRecord(new StateRecord("test", 1, 5000, 4000, State.AFTER_PROCESSING)); + DBWriter dbWriter = new InMemoryDBWriter(); + Processing processingWithoutConfig = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Assert.assertThrows(RuntimeException.class, () -> processingWithoutConfig.doLogic(recordsHead)); + + HashMap config = new HashMap<>(); + config.put(ClickHouseSinkConfig.TOLERATE_STATE_MISMATCH, "true"); + ClickHouseSinkConfig clickHouseConfig = new ClickHouseSinkConfig(config); + Processing processing = new Processing(stateProvider, dbWriter, null, clickHouseConfig); + processing.doLogic(recordsHead); + assertEquals(0, dbWriter.recordsInserted()); + } + }