Skip to content

Commit

Permalink
apply changes suggested in a review
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Dec 2, 2024
1 parent 3666b25 commit f918bd0
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeConnectionV1;
import net.snowflake.client.jdbc.SnowflakeDriver;
import net.snowflake.client.jdbc.cloud.storage.StageInfo;
Expand Down Expand Up @@ -515,15 +516,18 @@ public void alterColumnsDataTypeIcebergTable(
private String generateAlterSetDataTypeQuery(Map<String, ColumnInfos> columnsToModify) {
StringBuilder setDataTypeQuery = new StringBuilder("alter iceberg ");
setDataTypeQuery.append("table identifier(?) alter column ");
for (Map.Entry<String, ColumnInfos> column : columnsToModify.entrySet()) {
String columnName = column.getKey();
String dataType = column.getValue().getColumnType();

setDataTypeQuery.append(columnName).append(" set data type ").append(dataType).append(", ");
}
// remove last comma and whitespace
setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1);
setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1);
String columnsPart =
columnsToModify.entrySet().stream()
.map(
column -> {
String columnName = column.getKey();
String dataType = column.getValue().getColumnType();
return columnName + " set data type " + dataType;
})
.collect(Collectors.joining(", "));

setDataTypeQuery.append(columnsPart);

return setDataTypeQuery.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public interface SchemaEvolutionService {
* @param targetItems target items for schema evolution such as table name, columns to drop
* nullability, and columns to add
* @param record the sink record that contains the schema and actual data
* @param existingSchema schema stored in a channel
*/
void evolveSchemaIfNeeded(
SchemaEvolutionTargetItems targetItems,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public class IcebergColumnTreeTypeBuilder {

private static final String ROOT_NODE_TYPE = "ROOT_NODE";

/** Returns data type of the column */
String buildType(IcebergColumnTree columnTree) {
StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -53,6 +55,4 @@ private void removeLastSeparator(StringBuilder sb) {
sb.deleteCharAt(sb.length() - 1);
sb.deleteCharAt(sb.length() - 1);
}

private static final String ROOT_NODE_TYPE = "ROOT_NODE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnInfos;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionTargetItems;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.internal.ColumnProperties;
import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -150,12 +148,20 @@ private void mergeChangesIntoExistingColumns(
List<IcebergColumnTree> alreadyExistingColumns, List<IcebergColumnTree> modifiedColumns) {
alreadyExistingColumns.forEach(
existingColumn -> {
IcebergColumnTree mewVersion =
List<IcebergColumnTree> modifiedColumnMatchingExisting =
modifiedColumns.stream()
.filter(c -> c.getColumnName().equals(existingColumn.getColumnName()))
.collect(Collectors.toList())
.get(0);
mergeTreeService.merge(existingColumn, mewVersion);
.collect(Collectors.toList());
if (modifiedColumnMatchingExisting.size() != 1) {
LOGGER.warn(
"Skipping schema evolution of a column {}. Incorrect number of new versions of the"
+ " column: {}",
existingColumn.getColumnName(),
modifiedColumnMatchingExisting.stream()
.map(IcebergColumnTree::getColumnName)
.collect(Collectors.toList()));
}
mergeTreeService.merge(existingColumn, modifiedColumnMatchingExisting.get(0));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ public List<IcebergColumnTree> resolveIcebergSchemaFromRecord(
}
if (hasSchema(record)) {
LOGGER.debug(
"Schema found. Evolve columns basing on a record schema, column: " + columnsToEvolve);
"Schema found. Evolve columns basing on a record's schema, column: {}", columnsToEvolve);
return getTableSchemaFromRecordSchema(record, columnsToEvolve);
} else {
LOGGER.debug(
"Schema NOT found. Evolve columns basing on a records payload, columns: "
+ columnsToEvolve);
"Schema NOT found. Evolve columns basing on a record's payload, columns: {}",
columnsToEvolve);
return getTableSchemaFromJson(record, columnsToEvolve);
}
}
Expand All @@ -77,7 +77,7 @@ private static String getIcebergSchema(ColumnProperties columnProperties) {
return (String) field.get(columnProperties);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new IllegalStateException(
"Couldn't set iceberg by accessing private field: " + "isIceberg", e);
"Couldn't set iceberg by accessing private field: isIceberg", e);
}
}

Expand All @@ -102,7 +102,7 @@ private List<IcebergColumnTree> getTableSchemaFromJson(
* Given a SinkRecord, get the schema information from it
*
* @param record the sink record that contains the schema and actual data
* @return list of column represantation in a form of tree
* @return list of column representation in a form of tree
*/
private List<IcebergColumnTree> getTableSchemaFromRecordSchema(
SinkRecord record, Set<String> columnsToEvolve) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static Stream<Arguments> icebergSchemas() {
@MethodSource("parseFromJsonArguments")
void parseFromJsonRecordSchema(String jsonString, String expectedType) {
// given
SinkRecord record = createKafkaRecord(jsonString, false);
SinkRecord record = createKafkaRecord(jsonString);
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
IcebergColumnJsonValuePair columnValuePair =
IcebergColumnJsonValuePair.from(recordNode.fields().next());
Expand Down Expand Up @@ -161,15 +161,15 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe
IcebergColumnTree alreadyExistingTree = treeFactory.fromIcebergSchema(apacheSchema);

// tree parsed from a record
SinkRecord record = createKafkaRecord(recordJson, false);
SinkRecord record = createKafkaRecord(recordJson);
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
IcebergColumnJsonValuePair columnValuePair =
IcebergColumnJsonValuePair.from(recordNode.fields().next());

IcebergColumnTree modifiedTree = treeFactory.fromJson(columnValuePair);
// then
// when
mergeTreeService.merge(alreadyExistingTree, modifiedTree);

// then
String expected = expectedResult.replaceAll("/ +/g", " ");
Assertions.assertEquals(expected, typeBuilder.buildType(alreadyExistingTree));
Assertions.assertEquals("TESTSTRUCT", alreadyExistingTree.getColumnName());
Expand Down Expand Up @@ -220,11 +220,10 @@ static Stream<Arguments> mergeTestArguments() {
"ARRAY(OBJECT(primitive BOOLEAN, new_field LONG))"));
}

protected SinkRecord createKafkaRecord(String jsonString, boolean withSchema) {
protected SinkRecord createKafkaRecord(String jsonString) {
int offset = 0;
JsonConverter converter = new JsonConverter();
converter.configure(
Collections.singletonMap("schemas.enable", Boolean.toString(withSchema)), false);
converter.configure(Collections.singletonMap("schemas.enable", Boolean.toString(false)), false);
SchemaAndValue inputValue =
converter.toConnectData("TOPIC_NAME", jsonString.getBytes(StandardCharsets.UTF_8));
Headers headers = new ConnectHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@

public class IcebergIngestionSchemaEvolutionIT extends IcebergIngestionIT {

private static final String RECORD_METADATA_TYPE =
"OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition NUMBER(10,0), key"
+ " VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id NUMBER(10,0),"
+ " CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0),"
+ " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216),"
+ " VARCHAR(16777216)))";

@Override
protected Boolean isSchemaEvolutionEnabled() {
return true;
Expand Down Expand Up @@ -116,7 +123,7 @@ private static Stream<Arguments> prepareData() {

/** Verify a scenario when structure is enriched with another field. */
@Test
@Disabled
// @Disabled
public void alterStructure_noSchema() throws Exception {
// k1, k2
String testStruct1 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2 } }";
Expand Down Expand Up @@ -194,7 +201,7 @@ private void assertRecordsInTable() {
}

@Test
@Disabled
// @Disabled
public void testComplexRecordEvolution_withSchema() throws Exception {
insertWithRetry(complexJsonWithSchemaExample, 0, true);
waitForOffset(1);
Expand Down Expand Up @@ -233,7 +240,7 @@ public void testComplexRecordEvolution_withSchema() throws Exception {
}

@Test
@Disabled
// @Disabled
public void testComplexRecordEvolution() throws Exception {
insertWithRetry(complexJsonPayloadExample, 0, false);
waitForOffset(1);
Expand Down Expand Up @@ -272,7 +279,7 @@ public void testComplexRecordEvolution() throws Exception {
/** Test just for a scenario when we see a record for the first time. */
@ParameterizedTest
@MethodSource("schemasAndPayloads_brandNewColumns")
@Disabled
// @Disabled
public void addBrandNewColumns_withSchema(
String payloadWithSchema, String expectedColumnName, String expectedType) throws Exception {
// when
Expand Down Expand Up @@ -303,7 +310,7 @@ private static Stream<Arguments> schemasAndPayloads_brandNewColumns() {

@ParameterizedTest
@MethodSource("primitiveEvolutionDataSource")
@Disabled
// @Disabled
public void testEvolutionOfPrimitives_withSchema(
String singleBooleanField,
String booleanAndInt,
Expand Down Expand Up @@ -392,7 +399,7 @@ private static Stream<Arguments> primitiveEvolutionDataSource() {

@ParameterizedTest
@MethodSource("testEvolutionOfComplexTypes_dataSource")
@Disabled
// @Disabled
public void testEvolutionOfComplexTypes_withSchema(
String objectVarchar,
String objectWithNestedObject,
Expand Down Expand Up @@ -476,11 +483,4 @@ private static Stream<Arguments> testEvolutionOfComplexTypes_dataSource() {
twoObjectsExtendedWithMapAndArrayPayload(),
false));
}

private static final String RECORD_METADATA_TYPE =
"OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition NUMBER(10,0), key"
+ " VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id NUMBER(10,0),"
+ " CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0),"
+ " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216),"
+ " VARCHAR(16777216)))";
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
class TestJsons {

public static String nestedObjectWithSchema() {
static String nestedObjectWithSchema() {
return "{"
+ " \"schema\": {"
+ " \"type\": \"struct\","
Expand Down Expand Up @@ -501,9 +501,4 @@ static String twoObjectsExtendedWithMapAndArrayPayload() {
private static final String SCHEMA_BEGINNING =
"{ \"schema\": { \"type\": \"struct\", \"fields\": [";
private static final String SCHEMA_END = "]},";

private static final String OBJECT_SCHEMA_BEGINNING =
"{\"field\": \"object\", \"type\": \"struct\", \"fields\": [";

private static final String OBJECT_SCHEMA_END = "]}";
}

0 comments on commit f918bd0

Please sign in to comment.