From eedde63495b64a18e98b8a59cbb52c3b5654225a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Wed, 4 Dec 2024 16:33:56 +0100 Subject: [PATCH] SNOW-1831191 measure time of the schema evolution --- .../streaming/iceberg/IcebergIngestionIT.java | 5 + .../IcebergIngestionSchemaEvolutionIT.java | 5 - ...IcebergSchemaEvolutionPerformanceTest.java | 140 ++++++++++++++++++ 3 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergSchemaEvolutionPerformanceTest.java diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 5b2587e2b..912027b19 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -132,4 +132,9 @@ protected List> selectAllFromRecordConte selectAllComplexJsonRecordFromRecordContent() { return select(tableName, selectAllSortByOffset, ComplexJsonRecord::fromRecordContentColumn); } + + protected void insertWithRetry(String record, int offset, boolean withSchema) { + service.insert(Collections.singletonList(createKafkaRecord(record, offset, withSchema))); + service.insert(Collections.singletonList(createKafkaRecord(record, offset, withSchema))); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 338bc4130..c2fde9ffd 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -169,11 +169,6 @@ public void alterStructure_noSchema() throws Exception { assertEquals(columns.size(), 2); } - private void insertWithRetry(String record, int offset, boolean withSchema) { - service.insert(Collections.singletonList(createKafkaRecord(record, offset, withSchema))); - service.insert(Collections.singletonList(createKafkaRecord(record, offset, withSchema))); - } - private void assertRecordsInTable() { List> recordsWithMetadata = selectAllSchematizedRecords(); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergSchemaEvolutionPerformanceTest.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergSchemaEvolutionPerformanceTest.java new file mode 100644 index 000000000..2f6a7a304 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergSchemaEvolutionPerformanceTest.java @@ -0,0 +1,140 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +import static com.snowflake.kafka.connector.streaming.iceberg.TestJsons.STRING_PAYLOAD; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.snowflake.kafka.connector.internal.DescribeTableRow; +import java.util.ArrayList; +import java.util.List; +import java.util.StringJoiner; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +public class IcebergSchemaEvolutionPerformanceTest extends IcebergIngestionIT { + + @Override + protected void createIcebergTable() { + createIcebergTable(tableName); + } + + @Override + protected Boolean isSchemaEvolutionEnabled() { + return true; + } + + @Test + @Disabled + void performanceNoSchemaEvolution() throws Exception { + String payload = "{" + STRING_PAYLOAD + "}"; + int numberOfRecords = 333; + long start = System.currentTimeMillis(); + + for (int offset = 0; offset < numberOfRecords; offset++) { + insertWithRetry(payload, offset, false); + } + long end = System.currentTimeMillis(); + long time = end - start; + + List columns = describeTable(tableName); + assertEquals(2, columns.size()); + waitForOffset(numberOfRecords); + } + + @Test + @Disabled + void testPerformanceInDepth() throws Exception { + // given + // number of object nested in one another, this is maximum limit fo ingest-sdk + int depth = 333; + ArrayList payloads = preparePayloadsForNestedObjectEvolution(depth); + // when + long start = System.currentTimeMillis(); + int offset = 0; + for (String payload : payloads) { + insertWithRetry(payload, offset, false); + offset++; + } + long end = System.currentTimeMillis(); + long time = end - start; + + // The average time is 800 ms/per single schema evolution. Assume we can't go above 1200 + // ms/evolution + assertTrue(time / depth < 1200); + + List columns = describeTable(tableName); + assertEquals(2, columns.size()); + waitForOffset(depth); + } + + @Test + @Disabled + void testPerformanceWhenAddingNewColumns() throws Exception { + // given + // number of object nested in one another, this is maximum limit fo ingest-sdk + int columnQuantity = 333; + ArrayList payloads = preparePayloadWithNewColumns(columnQuantity); + // when + long start = System.currentTimeMillis(); + int offset = 0; + for (String payload : payloads) { + insertWithRetry(payload, offset, false); + offset++; + } + long end = System.currentTimeMillis(); + long time = end - start; + + // The average time is 800 ms/per single schema evolution. Assume we can't go above 1200 + // ms/evolution + assertTrue(time / columnQuantity < 1200); + + List columns = describeTable(tableName); + assertEquals(columnQuantity + 1, columns.size()); + waitForOffset(columnQuantity); + } + + /** Every next record has additional column. */ + private ArrayList preparePayloadWithNewColumns(int depth) { + ArrayList payloads = new ArrayList<>(); + StringJoiner joiner = new StringJoiner(","); + + for (int level = 0; level < depth; level++) { + String objectName = "object" + level; + joiner.add("\"" + objectName + "\" : \"text\""); + payloads.add(toValidPayloadNewColumns(joiner)); + } + return payloads; + } + + private String toValidPayloadNewColumns(StringJoiner joiner) { + return "{" + joiner.toString() + "}"; + } + + /** + * Every next payload has one more nested object. { "object0": { "description": "text", "object1": + * { "description": "text", "object2": { ... "objectN": { "description": "text" } } + */ + private ArrayList preparePayloadsForNestedObjectEvolution(int depth) { + ArrayList payloads = new ArrayList<>(); + StringJoiner joiner = new StringJoiner(","); + + for (int level = 0; level < depth; level++) { + String objectName = "object" + level; + joiner.add("\"" + objectName + "\" : { \"description\": \"text\""); + payloads.add(toValidNestedPayloadJson(level, joiner)); + } + + return payloads; + } + + private String toValidNestedPayloadJson(int depth, StringJoiner joiner) { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append(joiner.toString()); + for (int i = 0; i <= depth; i++) { + sb.append("}"); + } + sb.append("}"); + return sb.toString(); + } +}