Skip to content

Commit

Permalink
self review improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Nov 29, 2024
1 parent 5d5df77 commit 3666b25
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ public interface SnowflakeConnectionService {
* Alter iceberg table to modify columns datatype
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their infos
* @param columnInfosMap the mapping from the columnNames to their columnInfos
*/
void alterColumnsDataTypeIcebergTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

/**
* Alter iceberg table to add columns according to a map from columnNames to their types
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their infos
* @param columnInfosMap the mapping from the columnNames to their columnInfos
*/
void appendColumnsToIcebergTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,18 +553,6 @@ public void appendColumnsToIcebergTable(
appendColumnsToTable(tableName, columnInfosMap, true);
}

private void executeStatement(String tableName, String query) {
try {
LOGGER.info("Trying to run query: {}", query);
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2015.getException(e);
}
}

private void appendColumnsToTable(
String tableName, Map<String, ColumnInfos> columnInfosMap, boolean isIcebergTable) {
checkConnection();
Expand Down Expand Up @@ -600,6 +588,18 @@ private void appendColumnsToTable(
LOGGER.info(logColumn.toString(), tableName);
}

private void executeStatement(String tableName, String query) {
try {
LOGGER.info("Trying to run query: {}", query);
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2015.getException(e);
}
}

/**
* Alter table to drop non-nullability of a list of columns
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ public enum SnowflakeErrors {
"Timeout while waiting for file cleaner to start",
"Could not allocate thread for file cleaner to start processing in given time. If problem"
+ " persists, please try setting snowflake.snowpipe.use_new_cleaner to false"),
;
ERROR_5025(
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution.");

// properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,25 @@ public IcebergColumnTreeFactory() {
}

IcebergColumnTree fromIcebergSchema(IcebergColumnSchema columnSchema) {
LOGGER.debug("Attempting to resolve schema from schema stored in a channel");
LOGGER.debug(
"Attempting to parse schema from schema stored in a channel for column: "
+ columnSchema.getColumnName());
IcebergFieldNode rootNode =
createNode(columnSchema.getColumnName().toUpperCase(), columnSchema.getSchema());
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromJson(IcebergColumnJsonValuePair pair) {
LOGGER.debug("Attempting to resolve schema from records payload");
LOGGER.debug(
"Attempting to parse schema from records payload for column: " + pair.getColumnName());
IcebergFieldNode rootNode = createNode(pair.getColumnName().toUpperCase(), pair.getJsonNode());
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromConnectSchema(Field kafkaConnectField) {
LOGGER.debug("Attempting to resolve schema from schema attached to a record");
LOGGER.debug(
"Attempting to parse schema from schema attached to a record for column: "
+ kafkaConnectField.name());
IcebergFieldNode rootNode =
createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema());
return new IcebergColumnTree(rootNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ String buildType(IcebergColumnTree columnTree) {
* @param sb StringBuilder
* @param parentType Snowflake Iceberg table compatible type. ROOT_NODE_TYPE is a special case,
* here we never generate column name for it.
* @return field name + data type
* @return SQL type of the column
*/
private StringBuilder buildType(StringBuilder sb, IcebergFieldNode fieldNode, String parentType) {
if (parentType.equals("ARRAY")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.apache.kafka.connect.data.Schema.Type.STRUCT;

import com.fasterxml.jackson.databind.JsonNode;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Date;
Expand All @@ -20,8 +21,6 @@

class IcebergColumnTypeMapper {

static IcebergColumnTypeMapper INSTANCE = new IcebergColumnTypeMapper();

/**
* See <a href="https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types">Data types for
* Apache Iceberg™ tables</a>
Expand Down Expand Up @@ -62,8 +61,8 @@ String mapToColumnTypeFromIcebergSchema(Type apacheIcebergType) {
case MAP:
return "MAP";
default:
throw new IllegalArgumentException(
"Fail unsupported datatype! - " + apacheIcebergType.typeId());
throw SnowflakeErrors.ERROR_5025.getException(
"Data type: " + apacheIcebergType.typeId().name());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
import net.snowflake.ingest.streaming.internal.ColumnProperties;
import org.apache.iceberg.types.Type;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IcebergTableSchemaResolver {

private final IcebergColumnTreeFactory treeFactory = new IcebergColumnTreeFactory();
private final IcebergColumnTypeMapper mapper = IcebergColumnTypeMapper.INSTANCE;

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableSchemaResolver.class);
private final IcebergColumnTreeFactory treeFactory;

public IcebergTableSchemaResolver() {
this.treeFactory = new IcebergColumnTreeFactory();
}

/**
* Retrieve IcebergSchema stored in a channel, then parse it into a tree. Filter out columns that
Expand All @@ -29,17 +29,13 @@ class IcebergTableSchemaResolver {
public List<IcebergColumnTree> resolveIcebergSchemaFromChannel(
Map<String, ColumnProperties> tableSchemaFromChannel, Set<String> columnsToEvolve) {

List<IcebergColumnSchema> apacheIcebergColumnSchemas =
tableSchemaFromChannel.entrySet().stream()
.filter(
(schemaFromChannelEntry) -> {
String columnNameFromChannel = schemaFromChannelEntry.getKey();
return columnsToEvolve.contains(columnNameFromChannel);
})
.map(this::mapApacheSchemaFromChannel)
.collect(Collectors.toList());

return apacheIcebergColumnSchemas.stream()
return tableSchemaFromChannel.entrySet().stream()
.filter(
(schemaFromChannelEntry) -> {
String columnNameFromChannel = schemaFromChannelEntry.getKey();
return columnsToEvolve.contains(columnNameFromChannel);
})
.map(this::mapIcebergSchemaFromChannel)
.map(treeFactory::fromIcebergSchema)
.collect(Collectors.toList());
}
Expand All @@ -50,13 +46,18 @@ public List<IcebergColumnTree> resolveIcebergSchemaFromRecord(
return ImmutableList.of();
}
if (hasSchema(record)) {
LOGGER.debug(
"Schema found. Evolve columns basing on a record schema, column: " + columnsToEvolve);
return getTableSchemaFromRecordSchema(record, columnsToEvolve);
} else {
LOGGER.debug(
"Schema NOT found. Evolve columns basing on a records payload, columns: "
+ columnsToEvolve);
return getTableSchemaFromJson(record, columnsToEvolve);
}
}

private IcebergColumnSchema mapApacheSchemaFromChannel(
private IcebergColumnSchema mapIcebergSchemaFromChannel(
Map.Entry<String, ColumnProperties> schemaFromChannelEntry) {

ColumnProperties columnProperty = schemaFromChannelEntry.getValue();
Expand Down Expand Up @@ -106,34 +107,28 @@ private List<IcebergColumnTree> getTableSchemaFromJson(
private List<IcebergColumnTree> getTableSchemaFromRecordSchema(
SinkRecord record, Set<String> columnsToEvolve) {

Schema schema = record.valueSchema();

if (schema != null && schema.fields() != null) {
ArrayList<Field> foundColumns = new ArrayList<>();

for (Field field : schema.fields()) {
if (columnsToEvolve.contains(field.name().toUpperCase())) {
foundColumns.add(field);
}
}

if (foundColumns.size() < columnsToEvolve.size()) {
List<String> notFoundColumns =
columnsToEvolve.stream()
.filter(
columnToEvolve ->
schema.fields().stream()
.noneMatch(f -> f.name().equalsIgnoreCase(columnToEvolve)))
.collect(Collectors.toList());

throw SnowflakeErrors.ERROR_5022.getException(
"Columns not found in schema: "
+ notFoundColumns
+ ", schemaColumns: "
+ schema.fields().stream().map(Field::name).collect(Collectors.toList()));
}
return foundColumns.stream().map(treeFactory::fromConnectSchema).collect(Collectors.toList());
List<Field> schemaColumns = record.valueSchema().fields();
List<Field> foundColumns =
schemaColumns.stream()
.filter(
schemaColumnName -> columnsToEvolve.contains(schemaColumnName.name().toUpperCase()))
.collect(Collectors.toList());

if (foundColumns.size() < columnsToEvolve.size()) {
List<String> notFoundColumns =
schemaColumns.stream()
.map(Field::name)
.filter(schemaColumnName -> !columnsToEvolve.contains(schemaColumnName.toUpperCase()))
.collect(Collectors.toList());

throw SnowflakeErrors.ERROR_5022.getException(
"Columns not found in schema: "
+ notFoundColumns
+ ", schemaColumns: "
+ schemaColumns.stream().map(Field::name).collect(Collectors.toList())
+ ", foundColumns: "
+ foundColumns.stream().map(Field::name).collect(Collectors.toList()));
}
return ImmutableList.of();
return foundColumns.stream().map(treeFactory::fromConnectSchema).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.snowflake.kafka.connector.internal.streaming.StreamingBufferThreshold;
import com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergSchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeSchemaEvolutionService;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordServiceFactory;
Expand Down Expand Up @@ -138,7 +137,7 @@ private SnowflakeSinkServiceV2 streamingSinkService(
/*enableSchematization=*/ false,
/*closeChannelsInParallel=*/ true,
topicPartitionChannelMap,
new IcebergSchemaEvolutionService(mockConnectionService));
new SnowflakeSchemaEvolutionService(mockConnectionService));
}

private SnowflakeStreamingIngestClient defaultStreamingIngestClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class IcebergColumnTypeMapperTest {

private final IcebergColumnTypeMapper mapper = IcebergColumnTypeMapper.INSTANCE;
private final IcebergColumnTypeMapper mapper = new IcebergColumnTypeMapper();

@ParameterizedTest(name = "should map Kafka type {0} to Snowflake column type {2}")
@MethodSource("kafkaTypesToMap")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static Stream<Arguments> icebergSchemas() {
+ " VARCHAR(16777216)))"));
}

@ParameterizedTest
@ParameterizedTest(name = "{0}")
@MethodSource("parseFromJsonArguments")
void parseFromJsonRecordSchema(String jsonString, String expectedType) {
// given
Expand All @@ -108,8 +108,7 @@ static Stream<Arguments> parseFromJsonArguments() {
return Stream.of(
arguments("{\"testColumnName\" : 1 }", "LONG"),
arguments(
"{ \"testColumnName\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}",
"OBJECT(k1 LONG, k2 LONG)"),
"{ \"testColumnName\": { \"k1\" : 1, \"k2\" : 2 } }", "OBJECT(k1 LONG, k2 LONG)"),
arguments(
"{ \"testColumnName\": {"
+ "\"k1\" : { \"nested_key1\" : 1},"
Expand All @@ -133,20 +132,20 @@ static Stream<Arguments> parseFromJsonArguments() {
+ " OBJECT(brand"
+ " VARCHAR)))"),
arguments(
" { \"testColumnName\": [\n"
+ " {\n"
+ " \"id\": 0,\n"
+ " \"name\": \"Sandoval Hodges\"\n"
+ " },\n"
+ " {\n"
+ " \"id\": 1,\n"
+ " \"name\": \"Ramirez Brooks\"\n"
+ " },\n"
+ " {\n"
+ " \"id\": 2,\n"
+ " \"name\": \"Vivian Whitfield\"\n"
+ " }\n"
+ " ] } \n",
" { \"testColumnName\": ["
+ " {"
+ " \"id\": 0,"
+ " \"name\": \"Sandoval Hodges\""
+ " },"
+ " {"
+ " \"id\": 1,"
+ " \"name\": \"Ramirez Brooks\""
+ " },"
+ " {"
+ " \"id\": 2,"
+ " \"name\": \"Vivian Whitfield\""
+ " }"
+ " ] } ",
"ARRAY(OBJECT(name VARCHAR, id LONG))"),
// array
arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)"),
Expand Down
Loading

0 comments on commit 3666b25

Please sign in to comment.