Skip to content

Commit

Permalink
SNOW-1774276 Update fileId key in metadata for Iceberg mode (#880)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang authored Oct 30, 2024
1 parent 70d4aa9 commit abf5a85
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public boolean isEnableNewJsonParsingLogic() {
return enableNewJsonParsingLogic;
}

public boolean getEnableIcebergStreaming() {
public boolean isEnableIcebergStreaming() {
return enableIcebergStreaming;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final Constants.BdecParquetCompression bdecParquetCompression;
private final ParquetProperties.WriterVersion parquetWriterVersion;
private final boolean enableDictionaryEncoding;
private final boolean enableIcebergStreaming;

/** Construct parquet flusher from its schema. */
public ParquetFlusher(
Expand All @@ -41,13 +42,15 @@ public ParquetFlusher(
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding) {
boolean enableDictionaryEncoding,
boolean enableIcebergStreaming) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
this.enableIcebergStreaming = enableIcebergStreaming;
}

@Override
Expand Down Expand Up @@ -125,9 +128,14 @@ private SerializationResult serializeFromJavaObjects(

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
// tables. For a more detailed discussion on the topic see SNOW-561447,
// http://go/streams-on-replicated-mixed-tables, and
// http://go/managed-iceberg-replication-change-tracking
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
parquetWriter =
new SnowflakeParquetWriter(
mergedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
public void setupSchema(List<ColumnMetadata> columns) {
fieldIndex.clear();
metadata.clear();
if (!clientBufferParameters.getEnableIcebergStreaming()) {
if (!clientBufferParameters.isEnableIcebergStreaming()) {
metadata.put("sfVer", "1,1");
}
List<Type> parquetTypes = new ArrayList<>();
Expand All @@ -106,7 +106,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
addNonNullableFieldName(column.getInternalName());
}

if (!clientBufferParameters.getEnableIcebergStreaming()) {
if (!clientBufferParameters.isEnableIcebergStreaming()) {
/* Streaming to FDN table doesn't support sub-columns, set up the stats here. */
this.statsMap.put(
column.getInternalName(),
Expand Down Expand Up @@ -190,7 +190,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
* F6.element: ordinal=6, fieldId=12
* F7: ordinal=7, fieldId=0
*/
if (clientBufferParameters.getEnableIcebergStreaming()) {
if (clientBufferParameters.isEnableIcebergStreaming()) {
for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
String[] path = columnDescriptor.getPath();
String columnDotPath = concatDotPath(path);
Expand Down Expand Up @@ -313,7 +313,7 @@ private float addRow(
int colIndex = parquetColumn.index;
ColumnMetadata column = parquetColumn.columnMetadata;
ParquetBufferValue valueWithSize =
(clientBufferParameters.getEnableIcebergStreaming()
(clientBufferParameters.isEnableIcebergStreaming()
? IcebergParquetValueParser.parseColumnValueToParquet(
value,
parquetColumn.type,
Expand Down Expand Up @@ -356,7 +356,7 @@ private float addRow(

// Increment null count for column and its sub-columns missing in the input map
for (String columnName : Sets.difference(this.fieldIndex.keySet(), inputColumnNames)) {
if (clientBufferParameters.getEnableIcebergStreaming()) {
if (clientBufferParameters.isEnableIcebergStreaming()) {
if (subColumnFinder == null) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "SubColumnFinder is not initialized.");
}
Expand Down Expand Up @@ -458,6 +458,7 @@ public Flusher<ParquetChunkData> createFlusher() {
clientBufferParameters.getMaxRowGroups(),
clientBufferParameters.getBdecParquetCompression(),
parquetWriterVersion,
clientBufferParameters.isEnableDictionaryEncoding());
clientBufferParameters.isEnableDictionaryEncoding(),
clientBufferParameters.isEnableIcebergStreaming());
}
}
1 change: 1 addition & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class Constants {
public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request";
public static final String PRIMARY_FILE_ID_KEY =
"primaryFileId"; // Don't change, should match Parquet Scanner
public static final String ASSIGNED_FULL_FILE_NAME_KEY = "assignedFullFileName";
public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side
public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST =
10L; // Don't change, should match server side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(int metada
enableIcebergStreaming
? ParquetProperties.WriterVersion.PARQUET_2_0
: ParquetProperties.WriterVersion.PARQUET_1_0,
enableIcebergStreaming,
enableIcebergStreaming))
.when(channelData)
.createFlusher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2032,7 +2032,14 @@ public void testParquetFileNameMetadata() throws IOException {
flusher.serialize(Collections.singletonList(data), filePath);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
filePath,
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
}

@Test
Expand Down

0 comments on commit abf5a85

Please sign in to comment.