From 8ad2ee6bc83d1a81c9b0ff34111e153dcebf1a50 Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Tue, 1 Oct 2024 16:07:27 +0000 Subject: [PATCH] Revert "Add chunk offset to file id key to make each chunk have a unique key (#825)" This reverts commit a107b50cb2c0fc29249a63bc2751e37b2a777e10. --- .../streaming/internal/BlobBuilder.java | 2 +- .../ingest/streaming/internal/Flusher.java | 5 +-- .../streaming/internal/ParquetFlusher.java | 36 ++++--------------- .../streaming/internal/ParquetRowBuffer.java | 3 -- .../net/snowflake/ingest/utils/Constants.java | 1 - .../streaming/internal/FlushServiceTest.java | 4 +-- .../streaming/internal/RowBufferTest.java | 28 +++------------ 7 files changed, 15 insertions(+), 64 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 30abfac38..edc8fd4c9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -83,7 +83,7 @@ static Blob constructBlobAndMetadata( Flusher flusher = channelsDataPerTable.get(0).createFlusher(); Flusher.SerializationResult serializedChunk = - flusher.serialize(channelsDataPerTable, filePath, curDataSize); + flusher.serialize(channelsDataPerTable, filePath); if (!serializedChunk.channelsMetadataList.isEmpty()) { final byte[] compressedChunkData; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java index eb1d580c5..241defdfc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java @@ -20,15 +20,12 @@ public interface Flusher { /** * Serialize buffered rows into the underlying format. * - * @param fullyQualifiedTableName * @param channelsDataPerTable buffered rows * @param filePath file path - * @param chunkStartOffset * @return {@link SerializationResult} * @throws IOException */ - SerializationResult serialize( - List> channelsDataPerTable, String filePath, long chunkStartOffset) + SerializationResult serialize(List> channelsDataPerTable, String filePath) throws IOException; /** Holds result of the buffered rows conversion: channel metadata and stats. */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index f8f60cecb..fcdd9cdfc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -16,7 +16,6 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; -import org.apache.parquet.Preconditions; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; @@ -46,17 +45,13 @@ public ParquetFlusher( @Override public SerializationResult serialize( - List> channelsDataPerTable, - String filePath, - long chunkStartOffset) + List> channelsDataPerTable, String filePath) throws IOException { - return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset); + return serializeFromJavaObjects(channelsDataPerTable, filePath); } private SerializationResult serializeFromJavaObjects( - List> channelsDataPerTable, - String filePath, - long chunkStartOffset) + List> channelsDataPerTable, String filePath) throws IOException { List channelsMetadataList = new ArrayList<>(); long rowCount = 0L; @@ -122,7 +117,10 @@ private SerializationResult serializeFromJavaObjects( } Map metadata = channelsDataPerTable.get(0).getVectors().metadata; - addFileIdToMetadata(filePath, chunkStartOffset, metadata); + // We insert the filename in the file itself as metadata so that streams can work on replicated + // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and + // http://go/streams-on-replicated-mixed-tables + metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); parquetWriter = new BdecParquetWriter( mergedData, @@ -146,26 +144,6 @@ private SerializationResult serializeFromJavaObjects( chunkMinMaxInsertTimeInMs); } - private static void addFileIdToMetadata( - String filePath, long chunkStartOffset, Map metadata) { - // We insert the filename in the file itself as metadata so that streams can work on replicated - // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and - // http://go/streams-on-replicated-mixed-tables - // Using chunk offset as suffix ensures that for interleaved tables, the file - // id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens - // to be bundled together. - if (chunkStartOffset == 0) { - metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); - } else { - String shortName = StreamingIngestUtils.getShortname(filePath); - final String[] parts = shortName.split("\\."); - Preconditions.checkState(parts.length == 2, "Invalid file name format"); - metadata.put( - Constants.PRIMARY_FILE_ID_KEY, - String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1])); - } - } - /** * Validates that rows count in metadata matches the row count in Parquet footer and the row count * written by the parquet writer diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 38f8a3bce..ed19971ab 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -20,11 +20,9 @@ import java.util.Set; import java.util.function.Consumer; import net.snowflake.client.jdbc.internal.google.common.collect.Sets; -import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.connection.TelemetryService; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.column.ColumnDescriptor; @@ -83,7 +81,6 @@ public void setupSchema(List columns) { fieldIndex.clear(); metadata.clear(); metadata.put("sfVer", "1,1"); - metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION); List parquetTypes = new ArrayList<>(); int id = 1; diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 369a6546a..cb4bacf92 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -35,7 +35,6 @@ public class Constants { public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request"; public static final String PRIMARY_FILE_ID_KEY = "primaryFileId"; // Don't change, should match Parquet Scanner - public static final String SDK_VERSION_KEY = "sdkVersion"; public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST = 10L; // Don't change, should match server side diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index b5ed0ba96..cd7354f09 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -145,7 +145,7 @@ ChannelData flushChannel(String name) { BlobMetadata buildAndUpload() throws Exception { List>> blobData = Collections.singletonList(channelData); return flushService.buildAndUpload( - BlobPath.fileNameWithoutToken("file_name.bdec"), + BlobPath.fileNameWithoutToken("file_name"), blobData, blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName()); } @@ -951,7 +951,7 @@ public void testBuildAndUpload() throws Exception { blobCaptor.capture(), metadataCaptor.capture(), ArgumentMatchers.any()); - Assert.assertEquals("file_name.bdec", nameCaptor.getValue().fileName); + Assert.assertEquals("file_name", nameCaptor.getValue().fileName); ChunkMetadata metadataResult = metadataCaptor.getValue().get(0); List channelMetadataResult = metadataResult.getChannels(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 393750e25..5e5b96fc3 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -21,7 +21,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; @@ -1912,30 +1911,11 @@ public void testParquetFileNameMetadata() throws IOException { data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L)); ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); - { - Flusher.SerializationResult result = - flusher.serialize(Collections.singletonList(data), filePath, 0); + Flusher.SerializationResult result = + flusher.serialize(Collections.singletonList(data), filePath); - BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals( - "testParquetFileNameMetadata.bdec", - reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); - Assert.assertEquals( - RequestBuilder.DEFAULT_VERSION, - reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY)); - } - { - Flusher.SerializationResult result = - flusher.serialize(Collections.singletonList(data), filePath, 13); - - BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals( - "testParquetFileNameMetadata_13.bdec", - reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); - Assert.assertEquals( - RequestBuilder.DEFAULT_VERSION, - reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY)); - } + BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); + Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); } private static Thread getThreadThatWaitsForLockReleaseAndFlushes(