From 4165b5aecbe2cfc2973ca63f221b76b681513b34 Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Wed, 11 Sep 2024 22:17:16 +0000 Subject: [PATCH] Add chunk index to file id key to make each chunk have a unique key --- .../ingest/streaming/internal/BlobBuilder.java | 3 ++- .../ingest/streaming/internal/Flusher.java | 4 +++- .../ingest/streaming/internal/ParquetFlusher.java | 13 +++++++++---- .../ingest/streaming/internal/RowBufferTest.java | 6 ++++-- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 8e65ed677..2fb96e6d3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -78,13 +78,14 @@ static Blob constructBlobAndMetadata( CRC32 crc = new CRC32(); // TODO: channels with different schema can't be combined even if they belongs to same table + int chunkIndex = 0; for (List> channelsDataPerTable : blobData) { ChannelFlushContext firstChannelFlushContext = channelsDataPerTable.get(0).getChannelContext(); Flusher flusher = channelsDataPerTable.get(0).createFlusher(); Flusher.SerializationResult serializedChunk = - flusher.serialize(channelsDataPerTable, filePath); + flusher.serialize(channelsDataPerTable, filePath, chunkIndex++); if (!serializedChunk.channelsMetadataList.isEmpty()) { final byte[] compressedChunkData; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java index 3bb339975..d4910a156 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java @@ -22,10 +22,12 @@ public interface Flusher { * * @param channelsDataPerTable buffered rows * @param filePath file path + * @param fullyQualifiedTableName * @return {@link SerializationResult} * @throws IOException */ - SerializationResult serialize(List> channelsDataPerTable, String filePath) + SerializationResult serialize( + List> channelsDataPerTable, String filePath, int chunkIndex) throws IOException; /** Holds result of the buffered rows conversion: channel metadata and stats. */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index d338a6a7b..09109a844 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -48,12 +48,12 @@ public ParquetFlusher( @Override public SerializationResult serialize( - List> channelsDataPerTable, String filePath) + List> channelsDataPerTable, String filePath, int chunkIndex) throws IOException { if (enableParquetInternalBuffering) { return serializeFromParquetWriteBuffers(channelsDataPerTable, filePath); } - return serializeFromJavaObjects(channelsDataPerTable, filePath); + return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkIndex); } private SerializationResult serializeFromParquetWriteBuffers( @@ -142,7 +142,7 @@ private SerializationResult serializeFromParquetWriteBuffers( } private SerializationResult serializeFromJavaObjects( - List> channelsDataPerTable, String filePath) + List> channelsDataPerTable, String filePath, int chunkIndex) throws IOException { List channelsMetadataList = new ArrayList<>(); long rowCount = 0L; @@ -215,7 +215,12 @@ private SerializationResult serializeFromJavaObjects( // We insert the filename in the file itself as metadata so that streams can work on replicated // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and // http://go/streams-on-replicated-mixed-tables - metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); + // Using chunk index as suffix ensures that for interleaved tables, the file + // id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens + // to be bundled together. + metadata.put( + Constants.PRIMARY_FILE_ID_KEY, + String.format("%s-%s", StreamingIngestUtils.getShortname(filePath), 0)); parquetWriter = new BdecParquetWriter( mergedData, diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 3f8e927a4..2f3d75528 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -1796,10 +1796,12 @@ public void testParquetFileNameMetadata() throws IOException { ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); Flusher.SerializationResult result = - flusher.serialize(Collections.singletonList(data), filePath); + flusher.serialize(Collections.singletonList(data), filePath, 0); BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals( + String.format("%s-%s", filePath, 0), + reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); } private static Thread getThreadThatWaitsForLockReleaseAndFlushes(