Skip to content

Commit

Permalink
Merge pull request #275 from ClickHouse/add-query-id
Browse files Browse the repository at this point in the history
Adding a more comprehensive way of tracking queries across methods
  • Loading branch information
Paultagoras authored Dec 6, 2023
2 parents 2328163 + 32ee7c1 commit a9a8006
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 193 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.0.9 2023-12-06
* Added more logging to help debug future issues
* Restored send_progress_in_http_headers flag

## 1.0.8 2023-12-04
* Remove send_progress_in_http_headers flag as it conflicts
* Updated tests
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.0.8
v1.0.9
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public boolean visible(String name, Map<String, Object> parsedConfig) {
public static final class ProxyTypeValidatorAndRecommender implements ConfigDef.Recommender {
@Override
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
return List.of(ClickHouseProxyType.values());
return List.of((Object[]) ClickHouseProxyType.values());
}
@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
Expand Down Expand Up @@ -173,6 +173,8 @@ public ClickHouseSinkConfig(Map<String, String> props) {
this.addClickHouseSetting("insert_quorum", "2", false);
this.addClickHouseSetting("input_format_skip_unknown_fields", "1", false);
this.addClickHouseSetting("wait_end_of_query", "1", false);
//We set this so our ResponseSummary has actual data in it
this.addClickHouseSetting("send_progress_in_http_headers", "1", false);

topicToTableMap = new HashMap<>();
String topicToTableMapString = props.getOrDefault(TABLE_MAPPING, "").trim();
Expand Down
240 changes: 80 additions & 160 deletions src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.util.QueryIdentifier;

import java.io.IOException;
import java.util.List;
Expand All @@ -13,7 +14,7 @@ public interface DBWriter {

public boolean start(ClickHouseSinkConfig csc);
public void stop();
public void doInsert(List<Record> records) throws IOException, ExecutionException, InterruptedException;
public void doInsert(List<Record> records, ErrorReporter errorReporter) throws IOException, ExecutionException, InterruptedException;
public void doInsert(List<Record> records, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException;
public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporter errorReporter) throws IOException, ExecutionException, InterruptedException;
public long recordsInserted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.util.QueryIdentifier;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -30,13 +31,13 @@ public void stop() {
}

@Override
public void doInsert(List<Record> records) {
public void doInsert(List<Record> records, QueryIdentifier queryId) {
records.stream().forEach( r -> this.recordMap.put(r.getRecordOffsetContainer().getOffset(), r) );
}

@Override
public void doInsert(List<Record> records, ErrorReporter errorReporter) {
doInsert(records);
public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporter errorReporter) {
doInsert(records, queryId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.clickhouse.kafka.connect.sink.state.State;
import com.clickhouse.kafka.connect.sink.state.StateProvider;
import com.clickhouse.kafka.connect.sink.state.StateRecord;
import com.clickhouse.kafka.connect.util.QueryIdentifier;
import com.clickhouse.kafka.connect.util.Utils;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
Expand All @@ -18,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand All @@ -43,9 +45,23 @@ public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter
* the logic is only for topic partition scoop
*
* @param records
* @param rangeContainer
*/
private void doInsert(List<Record> records) throws IOException, ExecutionException, InterruptedException {
dbWriter.doInsert(records, errorReporter);
private void doInsert(List<Record> records, RangeContainer rangeContainer) {
if (records == null || records.isEmpty()) {
LOGGER.info("doInsert - No records to insert.");
return;
}
QueryIdentifier queryId = new QueryIdentifier(records.get(0).getRecordOffsetContainer().getTopic(), records.get(0).getRecordOffsetContainer().getPartition(),
rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(),
UUID.randomUUID().toString());

try {
LOGGER.info("doInsert - Records: [{}] - {}", records.size(), queryId);
dbWriter.doInsert(records, queryId, errorReporter);
} catch (Exception e) {
throw new RuntimeException(queryId.toString(), e);//This way the queryId will propagate
}
}


Expand Down Expand Up @@ -86,7 +102,8 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
String topic = record.getRecordOffsetContainer().getTopic();
int partition = record.getRecordOffsetContainer().getPartition();
RangeContainer rangeContainer = extractRange(records, topic, partition);
LOGGER.info(String.format("doLogic Topic [%s] Partition [%d] offset [%d:%d]", topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset()));
LOGGER.info("doLogic - Topic: [{}], Partition: [{}], MinOffset: [{}], MaxOffset: [{}], Records: [{}]",
topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), records.size());
// State Actual
// [10 , 19] [10, 19] ==> same
// [10 , 19] [10, 30] ==> overlapping [10,19], [20, 30]
Expand All @@ -100,7 +117,7 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
case NONE:
// this is the first time we see this topic and partition; or we had a previous failure setting the state.
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.BEFORE_PROCESSING));
doInsert(records);
doInsert(records, rangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case BEFORE_PROCESSING:
Expand All @@ -113,12 +130,12 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
case ZERO: // Reset if we're at a 0 state
LOGGER.warn(String.format("The topic seems to be deleted. Resetting state for topic [%s] partition [%s].", topic, partition));
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.BEFORE_PROCESSING));//RESET
doInsert(records);
doInsert(records, rangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case SAME: // Dedupe in clickhouse will fix it
case NEW:
doInsert(trimmedRecords);
doInsert(trimmedRecords, rangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case CONTAINS: // The state contains the given records
Expand All @@ -131,14 +148,14 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
case OVER_LAPPING:
// spit it to 2 inserts
List<List<Record>> rightAndLeft = splitRecordsByOffset(trimmedRecords, stateRecord.getMaxOffset(), stateRecord.getMinOffset());
doInsert(rightAndLeft.get(0));
doInsert(rightAndLeft.get(0), stateRecord.getRangeContainer());
stateProvider.setStateRecord(new StateRecord(
topic, partition, stateRecord.getRangeContainer().getMaxOffset(),
stateRecord.getRangeContainer().getMinOffset(), State.AFTER_PROCESSING));
List<Record> rightRecords = rightAndLeft.get(1);
RangeContainer rightRangeContainer = extractRange(rightRecords, topic, partition);
stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.BEFORE_PROCESSING));
doInsert(rightRecords);
doInsert(rightRecords, rightRangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case ERROR:
Expand All @@ -157,12 +174,12 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
case ZERO:
LOGGER.warn(String.format("It seems you deleted the topic - resetting state for topic [%s] partition [%s].", topic, partition));
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.BEFORE_PROCESSING));
doInsert(records);
doInsert(records, rangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case NEW:
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.BEFORE_PROCESSING));
doInsert(trimmedRecords);
doInsert(trimmedRecords, rangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rangeContainer.getMaxOffset(), rangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case OVER_LAPPING:
Expand All @@ -171,7 +188,7 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
List<Record> rightRecords = rightAndLeft.get(1);
RangeContainer rightRangeContainer = extractRange(rightRecords, topic, partition);
stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.BEFORE_PROCESSING));
doInsert(rightRecords);
doInsert(rightRecords, rightRangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case ERROR:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.clickhouse.kafka.connect.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryIdentifier {

private static final Logger LOGGER = LoggerFactory.getLogger(QueryIdentifier.class);
private final String topic;
private final int partition;
private final long minOffset;
private final long maxOffset;
private final String queryId;

public QueryIdentifier(String topic, int partition, long minOffset, long maxOffset, String queryId) {
this.topic = topic;
this.partition = partition;
this.minOffset = minOffset;
this.maxOffset = maxOffset;
this.queryId = queryId;
}

public String toString() {
return String.format("Topic: [%s], Partition: [%s], MinOffset: [%s], MaxOffset: [%s], (QueryId: [%s])",
topic, partition, minOffset, maxOffset, queryId);
}

public String getQueryId() {
return queryId;
}
public String getTopic() {
return topic;
}
public int getPartition() {
return partition;
}
public long getMinOffset() {
return minOffset;
}
public long getMaxOffset() {
return maxOffset;
}
}
16 changes: 6 additions & 10 deletions src/main/java/com/clickhouse/kafka/connect/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,6 @@ public static Exception getRootCause(Exception e, Boolean prioritizeClickHouseEx

public static void handleException(Exception e, boolean errorsTolerance, Collection<SinkRecord> records) {
LOGGER.warn("Deciding how to handle exception: {}", e.getLocalizedMessage());
if (records != null && !records.isEmpty()) {
LOGGER.warn("Number of total records: {}", records.size());
Map<String, List<SinkRecord>> dataRecords = records.stream().collect(Collectors.groupingBy((r) -> r.topic() + "-" + r.kafkaPartition()));
for (String topicAndPartition : dataRecords.keySet()) {
LOGGER.warn("Number of records in [{}] : {}", topicAndPartition, dataRecords.get(topicAndPartition).size());
List<SinkRecord> recordsByTopicAndPartition = dataRecords.get(topicAndPartition);
LOGGER.warn("Exception context: topic: [{}], partition: [{}], offsets: [{}]", recordsByTopicAndPartition.get(0).topic(), recordsByTopicAndPartition.get(0).kafkaPartition(), getOffsets(records));
}
}

//Let's check if we have a ClickHouseException to reference the error code
//https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
Expand Down Expand Up @@ -121,7 +112,12 @@ public static void handleException(Exception e, boolean errorsTolerance, Collect
LOGGER.warn("Errors tolerance is enabled, ignoring exception: {}", e.getLocalizedMessage());
} else {
LOGGER.error("Errors tolerance is disabled, wrapping exception: {}", e.getLocalizedMessage());
throw new RuntimeException(e);
if (records != null) {
throw new RuntimeException(String.format("Number of records: %d", records.size()), e);
} else {
throw new RuntimeException("Records was null", e);
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseContainer;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.sink.helper.SchemaTestData;
import com.clickhouse.kafka.connect.util.Utils;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import org.apache.kafka.connect.errors.DataException;
Expand Down Expand Up @@ -225,7 +226,11 @@ public void detectUnsupportedDataConversions() {

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
assertThrows(DataException.class, () -> chst.put(sr), "Did not detect wrong date conversion ");
try {
chst.put(sr);
} catch (RuntimeException e) {
assertTrue(Utils.getRootCause(e) instanceof DataException, "Did not detect wrong date conversion ");
}
chst.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.sink.helper.SchemaTestData;
import com.clickhouse.kafka.connect.util.Utils;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -14,8 +15,7 @@

import java.util.*;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.*;

public class ClickHouseSinkTaskWithSchemaTest {

Expand Down Expand Up @@ -213,7 +213,11 @@ public void detectUnsupportedDataConversions() {

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
assertThrows(DataException.class, () -> chst.put(sr), "Did not detect wrong date conversion ");
try {
chst.put(sr);
} catch (RuntimeException e) {
assertTrue(Utils.getRootCause(e) instanceof DataException, "Did not detect wrong date conversion ");
}
chst.stop();
}

Expand Down

0 comments on commit a9a8006

Please sign in to comment.