Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-merge "Add chunk offset to file id key to make each chunk have a unique key" from pull 825 #865

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static <T> Blob constructBlobAndMetadata(

Flusher<T> flusher = channelsDataPerTable.get(0).createFlusher();
Flusher.SerializationResult serializedChunk =
flusher.serialize(channelsDataPerTable, filePath);
flusher.serialize(channelsDataPerTable, filePath, curDataSize);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
final byte[] compressedChunkData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ public interface Flusher<T> {
/**
* Serialize buffered rows into the underlying format.
*
* @param fullyQualifiedTableName
* @param channelsDataPerTable buffered rows
* @param filePath file path
* @param chunkStartOffset
* @return {@link SerializationResult}
* @throws IOException
*/
SerializationResult serialize(List<ChannelData<T>> channelsDataPerTable, String filePath)
SerializationResult serialize(
List<ChannelData<T>> channelsDataPerTable, String filePath, long chunkStartOffset)
throws IOException;

/** Holds result of the buffered rows conversion: channel metadata and stats. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.SnowflakeParquetWriter;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -55,13 +56,17 @@ public ParquetFlusher(

@Override
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
throws IOException {
return serializeFromJavaObjects(channelsDataPerTable, filePath);
return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset);
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
Expand Down Expand Up @@ -127,15 +132,7 @@ private SerializationResult serializeFromJavaObjects(
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
// We insert the filename in the file itself as metadata so that streams can work on replicated
// tables. For a more detailed discussion on the topic see SNOW-561447,
// http://go/streams-on-replicated-mixed-tables, and
// http://go/managed-iceberg-replication-change-tracking
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
addFileIdToMetadata(filePath, chunkStartOffset, metadata);
parquetWriter =
new SnowflakeParquetWriter(
mergedData,
Expand All @@ -162,6 +159,32 @@ private SerializationResult serializeFromJavaObjects(
parquetWriter.getExtendedMetadataSize());
}

private void addFileIdToMetadata(
String filePath, long chunkStartOffset, Map<String, String> metadata) {
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pl add the other golink here too from line 133 in old code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Using chunk offset as suffix ensures that for interleaved tables, the file
// id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens
// to be bundled together.
if (chunkStartOffset == 0) {
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
} else {
String shortName = StreamingIngestUtils.getShortname(filePath);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also assert / validate that enableIcebergStreaming is false when we're in the else path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

final String[] parts = shortName.split("\\.");
Preconditions.checkState(parts.length == 2, "Invalid file name format");
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1]));
}
}

/**
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import java.util.Set;
import java.util.function.Consumer;
import net.snowflake.client.jdbc.internal.google.common.collect.Sets;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.IcebergDataTypeParser;
import net.snowflake.ingest.utils.SFException;
Expand Down Expand Up @@ -89,6 +91,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
if (!clientBufferParameters.isEnableIcebergStreaming()) {
metadata.put("sfVer", "1,1");
}
metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, Alec and I had discussed doing precisely this just yesterday!

We're going to have multiple SDK languages in a few months, should we have a slightly more verbose key-value pair here, such as:
userAgent=RequestBuilder.USER_AGENT ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User-Agent is a very HTTP-ish thing, we could name the key createdBy instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This current key-value is what XP expects. Let's iterate on it later?

List<Type> parquetTypes = new ArrayList<>();
int id = 1;

Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class Constants {
public static final String PRIMARY_FILE_ID_KEY =
"primaryFileId"; // Don't change, should match Parquet Scanner
public static final String ASSIGNED_FULL_FILE_NAME_KEY = "assignedFullFileName";
public static final String SDK_VERSION_KEY = "sdkVersion";
public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side
public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST =
10L; // Don't change, should match server side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
Expand Down Expand Up @@ -2028,18 +2029,40 @@ public void testParquetFileNameMetadata() throws IOException {
data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L));

ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher();
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath);
{
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 0);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
filePath,
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata.bdec",
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
}
{
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 13);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata_13.bdec",
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
}
}

@Test
Expand Down
Loading