Skip to content

Commit

Permalink
Add chunk index to file id key to make each chunk have a unique key
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha committed Sep 11, 2024
1 parent 3734061 commit 4165b5a
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@ static <T> Blob constructBlobAndMetadata(
CRC32 crc = new CRC32();

// TODO: channels with different schema can't be combined even if they belongs to same table
int chunkIndex = 0;
for (List<ChannelData<T>> channelsDataPerTable : blobData) {
ChannelFlushContext firstChannelFlushContext =
channelsDataPerTable.get(0).getChannelContext();

Flusher<T> flusher = channelsDataPerTable.get(0).createFlusher();
Flusher.SerializationResult serializedChunk =
flusher.serialize(channelsDataPerTable, filePath);
flusher.serialize(channelsDataPerTable, filePath, chunkIndex++);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
final byte[] compressedChunkData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ public interface Flusher<T> {
*
* @param channelsDataPerTable buffered rows
* @param filePath file path
* @param fullyQualifiedTableName
* @return {@link SerializationResult}
* @throws IOException
*/
SerializationResult serialize(List<ChannelData<T>> channelsDataPerTable, String filePath)
SerializationResult serialize(
List<ChannelData<T>> channelsDataPerTable, String filePath, int chunkIndex)
throws IOException;

/** Holds result of the buffered rows conversion: channel metadata and stats. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public ParquetFlusher(

@Override
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath, int chunkIndex)
throws IOException {
if (enableParquetInternalBuffering) {
return serializeFromParquetWriteBuffers(channelsDataPerTable, filePath);
}
return serializeFromJavaObjects(channelsDataPerTable, filePath);
return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkIndex);
}

private SerializationResult serializeFromParquetWriteBuffers(
Expand Down Expand Up @@ -142,7 +142,7 @@ private SerializationResult serializeFromParquetWriteBuffers(
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath, int chunkIndex)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
Expand Down Expand Up @@ -215,7 +215,12 @@ private SerializationResult serializeFromJavaObjects(
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
// Using chunk index as suffix ensures that for interleaved tables, the file
// id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens
// to be bundled together.
metadata.put(
Constants.PRIMARY_FILE_ID_KEY,
String.format("%s-%s", StreamingIngestUtils.getShortname(filePath), 0));
parquetWriter =
new BdecParquetWriter(
mergedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1796,10 +1796,12 @@ public void testParquetFileNameMetadata() throws IOException {

ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher();
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath);
flusher.serialize(Collections.singletonList(data), filePath, 0);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
String.format("%s-%s", filePath, 0),
reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY));
}

private static Thread getThreadThatWaitsForLockReleaseAndFlushes(
Expand Down

0 comments on commit 4165b5a

Please sign in to comment.