Skip to content

Commit

Permalink
Put null or empty json nodes to DLQ for schema evolution
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Dec 4, 2024
1 parent 1a437d6 commit 4bb5888
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,11 @@ public enum SnowflakeErrors {
"Could not allocate thread for file cleaner to start processing in given time. If problem"
+ " persists, please try setting snowflake.snowpipe.use_new_cleaner to false"),
ERROR_5025(
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution.");
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution."),
ERROR_5026(
"5026",
"Invalid SinkRecord received",
"Cannot infer type from null or empty object/list during schema evolution.");

// properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
Expand All @@ -44,13 +41,15 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.*;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -536,17 +535,38 @@ private void handleInsertRowFailure(
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName());
if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
try {
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error(
"Error while performing schema evolution for channel:{}",
this.getChannelNameFormatV1(),
e);
if (Objects.equals(e.getCode(), SnowflakeErrors.ERROR_5026.getCode())) {
handleError(Collections.singletonList(e), kafkaSinkRecord);
} else {
throw e;
}
}

return;
}
}

handleError(
insertErrors.stream()
.map(InsertValidationResponse.InsertError::getException)
.collect(Collectors.toList()),
kafkaSinkRecord);
}

private void handleError(List<Exception> insertErrors, SinkRecord kafkaSinkRecord) {
if (logErrors) {
for (InsertValidationResponse.InsertError insertError : insertErrors) {
LOGGER.error("Insert Row Error message:{}", insertError.getException().getMessage());
for (Exception insertError : insertErrors) {
LOGGER.error("Insert Row Error message:{}", insertError.getMessage());
}
}
if (errorTolerance) {
Expand All @@ -563,7 +583,6 @@ private void handleInsertRowFailure(
this.kafkaRecordErrorReporter.reportError(
kafkaSinkRecord,
insertErrors.stream()
.map(InsertValidationResponse.InsertError::getException)
.findFirst()
.orElseThrow(
() ->
Expand All @@ -574,20 +593,12 @@ private void handleInsertRowFailure(
final String errMsg =
String.format(
"Error inserting Records using Streaming API with msg:%s",
insertErrors.get(0).getException().getMessage());
insertErrors.get(0).getMessage());
this.telemetryServiceV2.reportKafkaConnectFatalError(errMsg);
throw new DataException(errMsg, insertErrors.get(0).getException());
throw new DataException(errMsg, insertErrors.get(0));
}
}

private List<String> join(
List<String> nonNullableColumns, List<String> nullValueForNotNullColNames) {
return Lists.newArrayList(
Iterables.concat(
Optional.ofNullable(nonNullableColumns).orElse(ImmutableList.of()),
Optional.ofNullable(nullValueForNotNullColNames).orElse(ImmutableList.of())));
}

// TODO: SNOW-529755 POLL committed offsets in background thread

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private LinkedHashMap<String, IcebergFieldNode> produceChildren(Type apacheIcebe

// -- parse tree from kafka record payload logic --
private IcebergFieldNode createNode(String name, JsonNode jsonNode) {
String snowflakeType = mapper.mapToColumnTypeFromJson(jsonNode);
String snowflakeType = mapper.mapToColumnTypeFromJson(name, jsonNode);
return new IcebergFieldNode(name, snowflakeType, produceChildren(jsonNode));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5026;
import static org.apache.kafka.connect.data.Schema.Type.ARRAY;
import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN;
import static org.apache.kafka.connect.data.Schema.Type.BYTES;
Expand All @@ -11,6 +12,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Date;
Expand Down Expand Up @@ -72,8 +74,8 @@ String mapToColumnTypeFromIcebergSchema(Type apacheIcebergType) {
*
* <p>Converts Types from: JsonNode -> KafkaKafka -> Snowflake.
*/
String mapToColumnTypeFromJson(JsonNode value) {
Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(value);
String mapToColumnTypeFromJson(String name, JsonNode value) {
Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(name, value);
return mapToColumnTypeFromKafkaSchema(kafkaType, null);
}

Expand Down Expand Up @@ -124,15 +126,19 @@ String mapToColumnTypeFromKafkaSchema(Schema.Type kafkaType, String schemaName)
}

/**
* Map the JSON node type to Kafka type
* Map the JSON node type to Kafka type. For null and empty values, we can't infer the type, so we
* throw an exception.
*
* @param name column/field name
* @param value JSON node
* @throws SnowflakeKafkaConnectorException if the value is null or empty array or empty object
* @return Kafka type
*/
Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
if (value == null || value.isNull()) {
return STRING;
} else if (value.isNumber()) {
Schema.Type mapJsonNodeTypeToKafkaType(String name, JsonNode value) {
if (cannotInferType(value)) {
throw ERROR_5026.getException("'" + name + "' field value is null or empty");
}
if (value.isNumber()) {
if (value.isFloat()) {
return FLOAT32;
} else if (value.isDouble()) {
Expand All @@ -154,4 +160,12 @@ Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
return null;
}
}

boolean cannotInferType(JsonNode value) {
// cannot infer type if value null or empty array or empty object
return value == null
|| value.isNull()
|| (value.isArray() && value.isEmpty())
|| (value.isObject() && value.isEmpty());
}
}

0 comments on commit 4bb5888

Please sign in to comment.