Skip to content

Commit

Permalink
Fix tests and formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha committed Sep 16, 2024
1 parent 6a46ded commit 3f6a5c0
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ public interface Flusher<T> {
* Serialize buffered rows into the underlying format.
*
* @param fullyQualifiedTableName
* @param channelsDataPerTable buffered rows
* @param filePath file path
* @param channelsDataPerTable buffered rows
* @param filePath file path
* @param chunkStartOffset
* @return {@link SerializationResult}
* @throws IOException
*/
SerializationResult serialize(
List<ChannelData<T>> channelsDataPerTable, String filePath, long chunkStartOffset)
List<ChannelData<T>> channelsDataPerTable, String filePath, long chunkStartOffset)
throws IOException;

/** Holds result of the buffered rows conversion: channel metadata and stats. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.Preconditions;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;

Expand All @@ -41,13 +42,17 @@ public ParquetFlusher(

@Override
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath, long chunkStartOffset)
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
throws IOException {
return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset);
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath, long chunkStartOffset)
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
Expand Down Expand Up @@ -121,6 +126,7 @@ private SerializationResult serializeFromJavaObjects(
// to be bundled together.
String shortName = StreamingIngestUtils.getShortname(filePath);
final String[] parts = shortName.split("\\.");
Preconditions.checkState(parts.length == 2, "Invalid file name format");
metadata.put(
Constants.PRIMARY_FILE_ID_KEY,
String.format("%s%s.%s", parts[0], chunkStartOffset, parts[1]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ ChannelData<T> flushChannel(String name) {
BlobMetadata buildAndUpload() throws Exception {
List<List<ChannelData<T>>> blobData = Collections.singletonList(channelData);
return flushService.buildAndUpload(
BlobPath.fileNameWithoutToken("file_name"),
BlobPath.fileNameWithoutToken("file_name.bdec"),
blobData,
blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName());
}
Expand Down Expand Up @@ -940,7 +940,7 @@ public void testBuildAndUpload() throws Exception {
blobCaptor.capture(),
metadataCaptor.capture(),
ArgumentMatchers.any());
Assert.assertEquals("file_name", nameCaptor.getValue().fileName);
Assert.assertEquals("file_name.bdec", nameCaptor.getValue().fileName);

ChunkMetadata metadataResult = metadataCaptor.getValue().get(0);
List<ChannelMetadata> channelMetadataResult = metadataResult.getChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
Expand Down Expand Up @@ -1795,9 +1794,11 @@ public void testParquetFileNameMetadata() throws IOException {

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata13.bdec",
"testParquetFileNameMetadata13.bdec",
reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(RequestBuilder.DEFAULT_VERSION, reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
}

private static Thread getThreadThatWaitsForLockReleaseAndFlushes(
Expand Down

0 comments on commit 3f6a5c0

Please sign in to comment.