Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add chunk offset to file id key to make each chunk have a uni… #848

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static <T> Blob constructBlobAndMetadata(

Flusher<T> flusher = channelsDataPerTable.get(0).createFlusher();
Flusher.SerializationResult serializedChunk =
flusher.serialize(channelsDataPerTable, filePath, curDataSize);
flusher.serialize(channelsDataPerTable, filePath);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
final byte[] compressedChunkData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ public interface Flusher<T> {
/**
* Serialize buffered rows into the underlying format.
*
* @param fullyQualifiedTableName
* @param channelsDataPerTable buffered rows
* @param filePath file path
* @param chunkStartOffset
* @return {@link SerializationResult}
* @throws IOException
*/
SerializationResult serialize(
List<ChannelData<T>> channelsDataPerTable, String filePath, long chunkStartOffset)
SerializationResult serialize(List<ChannelData<T>> channelsDataPerTable, String filePath)
throws IOException;

/** Holds result of the buffered rows conversion: channel metadata and stats. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.Preconditions;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;

Expand Down Expand Up @@ -46,17 +45,13 @@ public ParquetFlusher(

@Override
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
throws IOException {
return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset);
return serializeFromJavaObjects(channelsDataPerTable, filePath);
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
Expand Down Expand Up @@ -122,7 +117,10 @@ private SerializationResult serializeFromJavaObjects(
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
addFileIdToMetadata(filePath, chunkStartOffset, metadata);
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
parquetWriter =
new BdecParquetWriter(
mergedData,
Expand All @@ -146,26 +144,6 @@ private SerializationResult serializeFromJavaObjects(
chunkMinMaxInsertTimeInMs);
}

private static void addFileIdToMetadata(
String filePath, long chunkStartOffset, Map<String, String> metadata) {
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
// Using chunk offset as suffix ensures that for interleaved tables, the file
// id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens
// to be bundled together.
if (chunkStartOffset == 0) {
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
} else {
String shortName = StreamingIngestUtils.getShortname(filePath);
final String[] parts = shortName.split("\\.");
Preconditions.checkState(parts.length == 2, "Invalid file name format");
metadata.put(
Constants.PRIMARY_FILE_ID_KEY,
String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1]));
}
}

/**
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import java.util.Set;
import java.util.function.Consumer;
import net.snowflake.client.jdbc.internal.google.common.collect.Sets;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -83,7 +81,6 @@ public void setupSchema(List<ColumnMetadata> columns) {
fieldIndex.clear();
metadata.clear();
metadata.put("sfVer", "1,1");
metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION);
List<Type> parquetTypes = new ArrayList<>();
int id = 1;

Expand Down
1 change: 0 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class Constants {
public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request";
public static final String PRIMARY_FILE_ID_KEY =
"primaryFileId"; // Don't change, should match Parquet Scanner
public static final String SDK_VERSION_KEY = "sdkVersion";
public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side
public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST =
10L; // Don't change, should match server side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ ChannelData<T> flushChannel(String name) {
BlobMetadata buildAndUpload() throws Exception {
List<List<ChannelData<T>>> blobData = Collections.singletonList(channelData);
return flushService.buildAndUpload(
BlobPath.fileNameWithoutToken("file_name.bdec"),
BlobPath.fileNameWithoutToken("file_name"),
blobData,
blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName());
}
Expand Down Expand Up @@ -951,7 +951,7 @@ public void testBuildAndUpload() throws Exception {
blobCaptor.capture(),
metadataCaptor.capture(),
ArgumentMatchers.any());
Assert.assertEquals("file_name.bdec", nameCaptor.getValue().fileName);
Assert.assertEquals("file_name", nameCaptor.getValue().fileName);

ChunkMetadata metadataResult = metadataCaptor.getValue().get(0);
List<ChannelMetadata> channelMetadataResult = metadataResult.getChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
Expand Down Expand Up @@ -1912,30 +1911,11 @@ public void testParquetFileNameMetadata() throws IOException {
data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L));

ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher();
{
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 0);
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata.bdec",
reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
}
{
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 13);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata_13.bdec",
reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
}
BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY));
}

private static Thread getThreadThatWaitsForLockReleaseAndFlushes(
Expand Down
Loading