From 17de48ff2e14139ecf14643267196e7aa53e0287 Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Fri, 18 Oct 2024 21:03:08 +0000 Subject: [PATCH 1/4] =?UTF-8?q?Revert=20"Revert=20"Add=20chunk=20offset=20?= =?UTF-8?q?to=20file=20id=20key=20to=20make=20each=20chunk=20have=20a=20un?= =?UTF-8?q?i=E2=80=A6=20(#848)"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 0930648dfaaefa5abb6fe7aa4dcb83423ac1f11e. --- .../streaming/internal/BlobBuilder.java | 2 +- .../ingest/streaming/internal/Flusher.java | 5 +- .../streaming/internal/ParquetFlusher.java | 47 ++++++++++++++----- .../streaming/internal/ParquetRowBuffer.java | 3 ++ .../net/snowflake/ingest/utils/Constants.java | 1 + .../streaming/internal/RowBufferTest.java | 45 +++++++++++++----- 6 files changed, 78 insertions(+), 25 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 060af357f..1aa81e1b1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -86,7 +86,7 @@ static Blob constructBlobAndMetadata( Flusher flusher = channelsDataPerTable.get(0).createFlusher(); Flusher.SerializationResult serializedChunk = - flusher.serialize(channelsDataPerTable, filePath); + flusher.serialize(channelsDataPerTable, filePath, curDataSize); if (!serializedChunk.channelsMetadataList.isEmpty()) { final byte[] compressedChunkData; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java index 5a426e873..b110ae36a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java @@ -20,12 +20,15 @@ public interface Flusher { /** * Serialize buffered rows into the underlying format. * + * @param fullyQualifiedTableName * @param channelsDataPerTable buffered rows * @param filePath file path + * @param chunkStartOffset * @return {@link SerializationResult} * @throws IOException */ - SerializationResult serialize(List> channelsDataPerTable, String filePath) + SerializationResult serialize( + List> channelsDataPerTable, String filePath, long chunkStartOffset) throws IOException; /** Holds result of the buffered rows conversion: channel metadata and stats. */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index e6782fecc..10f247453 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -16,6 +16,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.Preconditions; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.SnowflakeParquetWriter; import org.apache.parquet.schema.MessageType; @@ -55,13 +56,17 @@ public ParquetFlusher( @Override public SerializationResult serialize( - List> channelsDataPerTable, String filePath) + List> channelsDataPerTable, + String filePath, + long chunkStartOffset) throws IOException { - return serializeFromJavaObjects(channelsDataPerTable, filePath); + return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset); } private SerializationResult serializeFromJavaObjects( - List> channelsDataPerTable, String filePath) + List> channelsDataPerTable, + String filePath, + long chunkStartOffset) throws IOException { List channelsMetadataList = new ArrayList<>(); long rowCount = 0L; @@ -127,15 +132,7 @@ private SerializationResult serializeFromJavaObjects( } Map metadata = channelsDataPerTable.get(0).getVectors().metadata; - // We insert the filename in the file itself as metadata so that streams can work on replicated - // tables. For a more detailed discussion on the topic see SNOW-561447, - // http://go/streams-on-replicated-mixed-tables, and - // http://go/managed-iceberg-replication-change-tracking - metadata.put( - enableIcebergStreaming - ? Constants.ASSIGNED_FULL_FILE_NAME_KEY - : Constants.PRIMARY_FILE_ID_KEY, - StreamingIngestUtils.getShortname(filePath)); + addFileIdToMetadata(filePath, chunkStartOffset, metadata); parquetWriter = new SnowflakeParquetWriter( mergedData, @@ -162,6 +159,32 @@ private SerializationResult serializeFromJavaObjects( parquetWriter.getExtendedMetadataSize()); } + private void addFileIdToMetadata( + String filePath, long chunkStartOffset, Map metadata) { + // We insert the filename in the file itself as metadata so that streams can work on replicated + // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and + // http://go/streams-on-replicated-mixed-tables + // Using chunk offset as suffix ensures that for interleaved tables, the file + // id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens + // to be bundled together. + if (chunkStartOffset == 0) { + metadata.put( + enableIcebergStreaming + ? Constants.ASSIGNED_FULL_FILE_NAME_KEY + : Constants.PRIMARY_FILE_ID_KEY, + StreamingIngestUtils.getShortname(filePath)); + } else { + String shortName = StreamingIngestUtils.getShortname(filePath); + final String[] parts = shortName.split("\\."); + Preconditions.checkState(parts.length == 2, "Invalid file name format"); + metadata.put( + enableIcebergStreaming + ? Constants.ASSIGNED_FULL_FILE_NAME_KEY + : Constants.PRIMARY_FILE_ID_KEY, + String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1])); + } + } + /** * Validates that rows count in metadata matches the row count in Parquet footer and the row count * written by the parquet writer diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index d1d6a886d..000d34493 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -20,9 +20,11 @@ import java.util.Set; import java.util.function.Consumer; import net.snowflake.client.jdbc.internal.google.common.collect.Sets; +import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.connection.TelemetryService; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.IcebergDataTypeParser; import net.snowflake.ingest.utils.SFException; @@ -89,6 +91,7 @@ public void setupSchema(List columns) { if (!clientBufferParameters.isEnableIcebergStreaming()) { metadata.put("sfVer", "1,1"); } + metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION); List parquetTypes = new ArrayList<>(); int id = 1; diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 436bb580f..2e1cc0d01 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -37,6 +37,7 @@ public class Constants { public static final String PRIMARY_FILE_ID_KEY = "primaryFileId"; // Don't change, should match Parquet Scanner public static final String ASSIGNED_FULL_FILE_NAME_KEY = "assignedFullFileName"; + public static final String SDK_VERSION_KEY = "sdkVersion"; public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST = 10L; // Don't change, should match server side diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 361e587f0..5402aba95 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; @@ -2028,18 +2029,40 @@ public void testParquetFileNameMetadata() throws IOException { data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L)); ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); - Flusher.SerializationResult result = - flusher.serialize(Collections.singletonList(data), filePath); + { + Flusher.SerializationResult result = + flusher.serialize(Collections.singletonList(data), filePath, 0); - BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals( - filePath, - reader - .getKeyValueMetadata() - .get( - enableIcebergStreaming - ? Constants.ASSIGNED_FULL_FILE_NAME_KEY - : Constants.PRIMARY_FILE_ID_KEY)); + BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); + Assert.assertEquals( + "testParquetFileNameMetadata.bdec", + reader + .getKeyValueMetadata() + .get( + enableIcebergStreaming + ? Constants.ASSIGNED_FULL_FILE_NAME_KEY + : Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals( + RequestBuilder.DEFAULT_VERSION, + reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY)); + } + { + Flusher.SerializationResult result = + flusher.serialize(Collections.singletonList(data), filePath, 13); + + BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); + Assert.assertEquals( + "testParquetFileNameMetadata_13.bdec", + reader + .getKeyValueMetadata() + .get( + enableIcebergStreaming + ? Constants.ASSIGNED_FULL_FILE_NAME_KEY + : Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals( + RequestBuilder.DEFAULT_VERSION, + reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY)); + } } @Test From 4008e234b0ac11762a84a63ddb0d245e896bdb1d Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Thu, 31 Oct 2024 17:52:30 +0000 Subject: [PATCH 2/4] Comments from Hitesh --- .../ingest/streaming/internal/ParquetFlusher.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 10f247453..d07753e12 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -163,7 +163,8 @@ private void addFileIdToMetadata( String filePath, long chunkStartOffset, Map metadata) { // We insert the filename in the file itself as metadata so that streams can work on replicated // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and - // http://go/streams-on-replicated-mixed-tables + // http://go/streams-on-replicated-mixed-tables, and + // http://go/managed-iceberg-replication-change-tracking // Using chunk offset as suffix ensures that for interleaved tables, the file // id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens // to be bundled together. @@ -174,13 +175,13 @@ private void addFileIdToMetadata( : Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); } else { + Preconditions.checkState( + !enableIcebergStreaming, "Iceberg streaming is not supported with non-zero offsets"); String shortName = StreamingIngestUtils.getShortname(filePath); final String[] parts = shortName.split("\\."); Preconditions.checkState(parts.length == 2, "Invalid file name format"); metadata.put( - enableIcebergStreaming - ? Constants.ASSIGNED_FULL_FILE_NAME_KEY - : Constants.PRIMARY_FILE_ID_KEY, + Constants.PRIMARY_FILE_ID_KEY, String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1])); } } From 6dcad1e4905058ff45eb339edce0dbd892447cbc Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Thu, 31 Oct 2024 18:03:56 +0000 Subject: [PATCH 3/4] Fix test to handle iceberg case --- .../streaming/internal/RowBufferTest.java | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 5402aba95..747773e9e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -2047,21 +2047,31 @@ public void testParquetFileNameMetadata() throws IOException { reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY)); } { - Flusher.SerializationResult result = - flusher.serialize(Collections.singletonList(data), filePath, 13); - - BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals( - "testParquetFileNameMetadata_13.bdec", - reader - .getKeyValueMetadata() - .get( - enableIcebergStreaming - ? Constants.ASSIGNED_FULL_FILE_NAME_KEY - : Constants.PRIMARY_FILE_ID_KEY)); - Assert.assertEquals( - RequestBuilder.DEFAULT_VERSION, - reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY)); + try { + Flusher.SerializationResult result = + flusher.serialize(Collections.singletonList(data), filePath, 13); + if (enableIcebergStreaming) { + Assert.fail( + "Should have thrown an exception because iceberg streams do not support offsets"); + } + + BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); + Assert.assertEquals( + "testParquetFileNameMetadata_13.bdec", + reader + .getKeyValueMetadata() + .get( + enableIcebergStreaming + ? Constants.ASSIGNED_FULL_FILE_NAME_KEY + : Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals( + RequestBuilder.DEFAULT_VERSION, + reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY)); + } catch (IllegalStateException ex) { + if (!enableIcebergStreaming) { + throw ex; + } + } } } From 52b133572ab2159a4adc407cc8a7e3143314747c Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Thu, 31 Oct 2024 20:13:34 +0000 Subject: [PATCH 4/4] Deflake a test --- .../snowflake/ingest/streaming/internal/InternalStageTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java index 150ec566a..e9a80c42c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java @@ -487,8 +487,9 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { throw new RuntimeException(e); } }); + workers.shutdown(); - workers.awaitTermination(150, TimeUnit.MILLISECONDS); + Assert.assertTrue(workers.awaitTermination(1, TimeUnit.SECONDS)); Mockito.verify(mockClient).execute(Mockito.any()); }