diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 8e65ed677..939e6d255 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -84,7 +84,7 @@ static Blob constructBlobAndMetadata( Flusher flusher = channelsDataPerTable.get(0).createFlusher(); Flusher.SerializationResult serializedChunk = - flusher.serialize(channelsDataPerTable, filePath); + flusher.serialize(channelsDataPerTable, filePath, curDataSize); if (!serializedChunk.channelsMetadataList.isEmpty()) { final byte[] compressedChunkData; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java index 3bb339975..060816fa8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java @@ -20,12 +20,15 @@ public interface Flusher { /** * Serialize buffered rows into the underlying format. * - * @param channelsDataPerTable buffered rows - * @param filePath file path + * @param fullyQualifiedTableName + * @param channelsDataPerTable buffered rows + * @param filePath file path + * @param chunkStartOffset * @return {@link SerializationResult} * @throws IOException */ - SerializationResult serialize(List> channelsDataPerTable, String filePath) + SerializationResult serialize( + List> channelsDataPerTable, String filePath, long chunkStartOffset) throws IOException; /** Holds result of the buffered rows conversion: channel metadata and stats. */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index ddfca4a42..5e4781f44 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -41,13 +41,13 @@ public ParquetFlusher( @Override public SerializationResult serialize( - List> channelsDataPerTable, String filePath) + List> channelsDataPerTable, String filePath, long chunkStartOffset) throws IOException { - return serializeFromJavaObjects(channelsDataPerTable, filePath); + return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset); } private SerializationResult serializeFromJavaObjects( - List> channelsDataPerTable, String filePath) + List> channelsDataPerTable, String filePath, long chunkStartOffset) throws IOException { List channelsMetadataList = new ArrayList<>(); long rowCount = 0L; @@ -116,7 +116,14 @@ private SerializationResult serializeFromJavaObjects( // We insert the filename in the file itself as metadata so that streams can work on replicated // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and // http://go/streams-on-replicated-mixed-tables - metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); + // Using chunk offset as suffix ensures that for interleaved tables, the file + // id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens + // to be bundled together. + String shortName = StreamingIngestUtils.getShortname(filePath); + final String[] parts = shortName.split("\\."); + metadata.put( + Constants.PRIMARY_FILE_ID_KEY, + String.format("%s%s.%s", parts[0], chunkStartOffset, parts[1])); parquetWriter = new BdecParquetWriter( mergedData, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 5ada286e5..bcd01aea3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -17,9 +17,11 @@ import java.util.Set; import java.util.function.Consumer; import net.snowflake.client.jdbc.internal.google.common.collect.Sets; +import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.connection.TelemetryService; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.schema.MessageType; @@ -74,6 +76,7 @@ public void setupSchema(List columns) { fieldIndex.clear(); metadata.clear(); metadata.put("sfVer", "1,1"); + metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION); List parquetTypes = new ArrayList<>(); int id = 1; for (ColumnMetadata column : columns) { diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 3d09d9a2a..534a1815e 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -35,6 +35,7 @@ public class Constants { public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request"; public static final String PRIMARY_FILE_ID_KEY = "primaryFileId"; // Don't change, should match Parquet Scanner + public static final String SDK_VERSION_KEY = "sdkVersion"; public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST = 10L; // Don't change, should match server side diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index f0a867075..0e9b3bc6d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -19,6 +19,8 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; + +import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; @@ -1789,10 +1791,13 @@ public void testParquetFileNameMetadata() throws IOException { ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); Flusher.SerializationResult result = - flusher.serialize(Collections.singletonList(data), filePath); + flusher.serialize(Collections.singletonList(data), filePath, 13); BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals( + "testParquetFileNameMetadata13.bdec", + reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals(RequestBuilder.DEFAULT_VERSION, reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY)); } private static Thread getThreadThatWaitsForLockReleaseAndFlushes(