Skip to content

Commit

Permalink
Revert "Revert "Add chunk offset to file id key to make each chunk ha…
Browse files Browse the repository at this point in the history
…ve a uni… (#848)"

This reverts commit 0930648.
  • Loading branch information
sfc-gh-psaha committed Oct 31, 2024
1 parent e5c3316 commit 17de48f
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static <T> Blob constructBlobAndMetadata(

Flusher<T> flusher = channelsDataPerTable.get(0).createFlusher();
Flusher.SerializationResult serializedChunk =
flusher.serialize(channelsDataPerTable, filePath);
flusher.serialize(channelsDataPerTable, filePath, curDataSize);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
final byte[] compressedChunkData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ public interface Flusher<T> {
/**
* Serialize buffered rows into the underlying format.
*
* @param fullyQualifiedTableName
* @param channelsDataPerTable buffered rows
* @param filePath file path
* @param chunkStartOffset
* @return {@link SerializationResult}
* @throws IOException
*/
SerializationResult serialize(List<ChannelData<T>> channelsDataPerTable, String filePath)
SerializationResult serialize(
List<ChannelData<T>> channelsDataPerTable, String filePath, long chunkStartOffset)
throws IOException;

/** Holds result of the buffered rows conversion: channel metadata and stats. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.SnowflakeParquetWriter;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -55,13 +56,17 @@ public ParquetFlusher(

@Override
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
throws IOException {
return serializeFromJavaObjects(channelsDataPerTable, filePath);
return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset);
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
Expand Down Expand Up @@ -127,15 +132,7 @@ private SerializationResult serializeFromJavaObjects(
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
// We insert the filename in the file itself as metadata so that streams can work on replicated
// tables. For a more detailed discussion on the topic see SNOW-561447,
// http://go/streams-on-replicated-mixed-tables, and
// http://go/managed-iceberg-replication-change-tracking
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
addFileIdToMetadata(filePath, chunkStartOffset, metadata);
parquetWriter =
new SnowflakeParquetWriter(
mergedData,
Expand All @@ -162,6 +159,32 @@ private SerializationResult serializeFromJavaObjects(
parquetWriter.getExtendedMetadataSize());
}

private void addFileIdToMetadata(
String filePath, long chunkStartOffset, Map<String, String> metadata) {
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
// Using chunk offset as suffix ensures that for interleaved tables, the file
// id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens
// to be bundled together.
if (chunkStartOffset == 0) {
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
} else {
String shortName = StreamingIngestUtils.getShortname(filePath);
final String[] parts = shortName.split("\\.");
Preconditions.checkState(parts.length == 2, "Invalid file name format");
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1]));
}
}

/**
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import java.util.Set;
import java.util.function.Consumer;
import net.snowflake.client.jdbc.internal.google.common.collect.Sets;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.IcebergDataTypeParser;
import net.snowflake.ingest.utils.SFException;
Expand Down Expand Up @@ -89,6 +91,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
if (!clientBufferParameters.isEnableIcebergStreaming()) {
metadata.put("sfVer", "1,1");
}
metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION);
List<Type> parquetTypes = new ArrayList<>();
int id = 1;

Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class Constants {
public static final String PRIMARY_FILE_ID_KEY =
"primaryFileId"; // Don't change, should match Parquet Scanner
public static final String ASSIGNED_FULL_FILE_NAME_KEY = "assignedFullFileName";
public static final String SDK_VERSION_KEY = "sdkVersion";
public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side
public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST =
10L; // Don't change, should match server side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
Expand Down Expand Up @@ -2028,18 +2029,40 @@ public void testParquetFileNameMetadata() throws IOException {
data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L));

ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher();
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath);
{
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 0);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
filePath,
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata.bdec",
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
}
{
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 13);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata_13.bdec",
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
}
}

@Test
Expand Down

0 comments on commit 17de48f

Please sign in to comment.