From abf5a85ec4c80fea4db2fcf662a9b79d7716fd68 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 30 Oct 2024 13:42:23 -0700 Subject: [PATCH] SNOW-1774276 Update fileId key in metadata for Iceberg mode (#880) --- .../internal/ClientBufferParameters.java | 2 +- .../streaming/internal/ParquetFlusher.java | 16 ++++++++++++---- .../streaming/internal/ParquetRowBuffer.java | 13 +++++++------ .../net/snowflake/ingest/utils/Constants.java | 1 + .../streaming/internal/BlobBuilderTest.java | 1 + .../ingest/streaming/internal/RowBufferTest.java | 9 ++++++++- 6 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index d1a93f9b3..db2d7c841 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -142,7 +142,7 @@ public boolean isEnableNewJsonParsingLogic() { return enableNewJsonParsingLogic; } - public boolean getEnableIcebergStreaming() { + public boolean isEnableIcebergStreaming() { return enableIcebergStreaming; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index e7272d94a..e6782fecc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -33,6 +33,7 @@ public class ParquetFlusher implements Flusher { private final Constants.BdecParquetCompression bdecParquetCompression; private final ParquetProperties.WriterVersion parquetWriterVersion; private final boolean enableDictionaryEncoding; + private final boolean enableIcebergStreaming; /** Construct parquet flusher from its schema. */ public ParquetFlusher( @@ -41,13 +42,15 @@ public ParquetFlusher( Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression, ParquetProperties.WriterVersion parquetWriterVersion, - boolean enableDictionaryEncoding) { + boolean enableDictionaryEncoding, + boolean enableIcebergStreaming) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; this.parquetWriterVersion = parquetWriterVersion; this.enableDictionaryEncoding = enableDictionaryEncoding; + this.enableIcebergStreaming = enableIcebergStreaming; } @Override @@ -125,9 +128,14 @@ private SerializationResult serializeFromJavaObjects( Map metadata = channelsDataPerTable.get(0).getVectors().metadata; // We insert the filename in the file itself as metadata so that streams can work on replicated - // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and - // http://go/streams-on-replicated-mixed-tables - metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); + // tables. For a more detailed discussion on the topic see SNOW-561447, + // http://go/streams-on-replicated-mixed-tables, and + // http://go/managed-iceberg-replication-change-tracking + metadata.put( + enableIcebergStreaming + ? Constants.ASSIGNED_FULL_FILE_NAME_KEY + : Constants.PRIMARY_FILE_ID_KEY, + StreamingIngestUtils.getShortname(filePath)); parquetWriter = new SnowflakeParquetWriter( mergedData, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 12a6cc875..d1d6a886d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -86,7 +86,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { public void setupSchema(List columns) { fieldIndex.clear(); metadata.clear(); - if (!clientBufferParameters.getEnableIcebergStreaming()) { + if (!clientBufferParameters.isEnableIcebergStreaming()) { metadata.put("sfVer", "1,1"); } List parquetTypes = new ArrayList<>(); @@ -106,7 +106,7 @@ public void setupSchema(List columns) { addNonNullableFieldName(column.getInternalName()); } - if (!clientBufferParameters.getEnableIcebergStreaming()) { + if (!clientBufferParameters.isEnableIcebergStreaming()) { /* Streaming to FDN table doesn't support sub-columns, set up the stats here. */ this.statsMap.put( column.getInternalName(), @@ -190,7 +190,7 @@ public void setupSchema(List columns) { * F6.element: ordinal=6, fieldId=12 * F7: ordinal=7, fieldId=0 */ - if (clientBufferParameters.getEnableIcebergStreaming()) { + if (clientBufferParameters.isEnableIcebergStreaming()) { for (ColumnDescriptor columnDescriptor : schema.getColumns()) { String[] path = columnDescriptor.getPath(); String columnDotPath = concatDotPath(path); @@ -313,7 +313,7 @@ private float addRow( int colIndex = parquetColumn.index; ColumnMetadata column = parquetColumn.columnMetadata; ParquetBufferValue valueWithSize = - (clientBufferParameters.getEnableIcebergStreaming() + (clientBufferParameters.isEnableIcebergStreaming() ? IcebergParquetValueParser.parseColumnValueToParquet( value, parquetColumn.type, @@ -356,7 +356,7 @@ private float addRow( // Increment null count for column and its sub-columns missing in the input map for (String columnName : Sets.difference(this.fieldIndex.keySet(), inputColumnNames)) { - if (clientBufferParameters.getEnableIcebergStreaming()) { + if (clientBufferParameters.isEnableIcebergStreaming()) { if (subColumnFinder == null) { throw new SFException(ErrorCode.INTERNAL_ERROR, "SubColumnFinder is not initialized."); } @@ -458,6 +458,7 @@ public Flusher createFlusher() { clientBufferParameters.getMaxRowGroups(), clientBufferParameters.getBdecParquetCompression(), parquetWriterVersion, - clientBufferParameters.isEnableDictionaryEncoding()); + clientBufferParameters.isEnableDictionaryEncoding(), + clientBufferParameters.isEnableIcebergStreaming()); } } diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 754c81cff..436bb580f 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -36,6 +36,7 @@ public class Constants { public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request"; public static final String PRIMARY_FILE_ID_KEY = "primaryFileId"; // Don't change, should match Parquet Scanner + public static final String ASSIGNED_FULL_FILE_NAME_KEY = "assignedFullFileName"; public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST = 10L; // Don't change, should match server side diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 974a32cbf..503b67c6a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -136,6 +136,7 @@ private List> createChannelDataPerTable(int metada enableIcebergStreaming ? ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0, + enableIcebergStreaming, enableIcebergStreaming)) .when(channelData) .createFlusher(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 4a0e7d4c5..361e587f0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -2032,7 +2032,14 @@ public void testParquetFileNameMetadata() throws IOException { flusher.serialize(Collections.singletonList(data), filePath); BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals( + filePath, + reader + .getKeyValueMetadata() + .get( + enableIcebergStreaming + ? Constants.ASSIGNED_FULL_FILE_NAME_KEY + : Constants.PRIMARY_FILE_ID_KEY)); } @Test