Skip to content

Commit

Permalink
fix ep tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Sep 19, 2024
1 parent bc52b6b commit f167a17
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,10 @@ static EpInfo buildEpInfoFromStats(long rowCount, Map<String, RowBufferStats> co
return epInfo;
}

static EpInfo buildEPInfoFromBlocksMetadata(List<BlockMetaData> blocksMetadata) {
static EpInfo buildEPInfoFromParquetWriterResults(
List<BlockMetaData> blocksMetadata,
Map<String, Integer> ndvStats,
Map<String, Integer> maxLengthStats) {
if (blocksMetadata.isEmpty()) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "No blocks metadata found");
}
Expand Down Expand Up @@ -690,7 +693,9 @@ static EpInfo buildEPInfoFromBlocksMetadata(List<BlockMetaData> blocksMetadata)
new FileColumnProperties(
columnOrdinal,
columnChunkMetaData.getPrimitiveType().getId().intValue(),
mergedStatistics.get(subColumnName));
mergedStatistics.get(subColumnName),
ndvStats.getOrDefault(subColumnName, 0),
maxLengthStats.getOrDefault(subColumnName, 0));
epInfo.getColumnEps().put(subColumnName, dto);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ static <T> Blob constructBlobAndMetadata(
.setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId())
.setEpInfo(
isIceberg
? AbstractRowBuffer.buildEPInfoFromBlocksMetadata(
serializedChunk.blocksMetadata)
? AbstractRowBuffer.buildEPInfoFromParquetWriterResults(
serializedChunk.blocksMetadata,
serializedChunk.ndvStats,
serializedChunk.maxLengthStats)
: AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -200,4 +200,14 @@ Long getFirstInsertTimeInMs() {
Long getLastInsertTimeInMs() {
return this.lastInsertTimeInMs;
}

// @JsonProperty("major_vers")
// Integer getMajorVersion() {
// return PARQUET_MAJOR_VERSION;
// }
//
// @JsonProperty("minor_vers")
// Integer getMinorVersion() {
// return PARQUET_MINOR_VERSION;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;
import static net.snowflake.ingest.utils.Constants.EP_NDV_UNKNOWN;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -99,11 +98,12 @@ class FileColumnProperties {
this.setDistinctValues(stats.getDistinctValues());
}

FileColumnProperties(int columnOrdinal, int fieldId, Statistics<?> statistics) {
FileColumnProperties(
int columnOrdinal, int fieldId, Statistics<?> statistics, int ndv, int maxLength) {
this.setColumnOrdinal(columnOrdinal);
this.setFieldId(fieldId);
this.setNullCount(statistics.getNumNulls());
this.setDistinctValues(EP_NDV_UNKNOWN);
this.setDistinctValues(ndv);
this.setCollation(null);
this.setMaxStrNonCollated(null);
this.setMinStrNonCollated(null);
Expand All @@ -127,6 +127,7 @@ class FileColumnProperties {
} else {
this.setMinStrValue(truncateBytesAsHex(statistics.getMinBytes(), false));
this.setMaxStrValue(truncateBytesAsHex(statistics.getMaxBytes(), true));
this.setMaxLength(maxLength);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class SerializationResult {
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;
final List<BlockMetaData> blocksMetadata;
final Map<String, Integer> ndvStats;
final Map<String, Integer> maxLengthStats;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Expand All @@ -46,14 +48,18 @@ public SerializationResult(
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs,
List<BlockMetaData> blocksMetadata) {
List<BlockMetaData> blocksMetadata,
Map<String, Integer> ndvStats,
Map<String, Integer> maxLengthStats) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
this.blocksMetadata = blocksMetadata;
this.ndvStats = ndvStats;
this.maxLengthStats = maxLengthStats;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private static final Logging logger = new Logging(ParquetFlusher.class);
private final MessageType schema;
private final long maxChunkSizeInBytes;
private final boolean enableNdvAndMaxLengthStats;

private final Constants.BdecParquetCompression bdecParquetCompression;

/** Construct parquet flusher from its schema. */
public ParquetFlusher(
MessageType schema,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNdvAndMaxLengthStats) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNdvAndMaxLengthStats = enableNdvAndMaxLengthStats;
}

@Override
Expand Down Expand Up @@ -124,7 +127,8 @@ private SerializationResult serializeFromJavaObjects(
metadata,
firstChannelFullyQualifiedTableName,
maxChunkSizeInBytes,
bdecParquetCompression);
bdecParquetCompression,
this.enableNdvAndMaxLengthStats);
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

Expand All @@ -137,7 +141,9 @@ private SerializationResult serializeFromJavaObjects(
chunkEstimatedUncompressedSize,
mergedData,
chunkMinMaxInsertTimeInMs,
parquetWriter.getBlocksMetadata());
parquetWriter.getBlocksMetadata(),
parquetWriter.getNdvStats(),
parquetWriter.getMaxLengthStats());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public Flusher<ParquetChunkData> createFlusher() {
return new ParquetFlusher(
schema,
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
clientBufferParameters.getBdecParquetCompression(),
this.clientBufferParameters.getIsIcebergMode());
}
}
3 changes: 3 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class Constants {
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public static final int PARQUET_MAJOR_VERSION = 1;
public static final int PARQUET_MINOR_VERSION = 0;

public enum WriteMode {
CLOUD_STORAGE,
REST_API,
Expand Down
53 changes: 45 additions & 8 deletions src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.utils.Constants;
Expand Down Expand Up @@ -38,6 +40,7 @@
public class BdecParquetWriter implements AutoCloseable {
private final InternalParquetRecordWriter<List<Object>> writer;
private final CodecFactory codecFactory;
private final BdecWriteSupport writeSupport;
private long rowsWritten = 0;

/**
Expand All @@ -47,6 +50,7 @@ public class BdecParquetWriter implements AutoCloseable {
* @param schema row schema
* @param extraMetaData extra metadata
* @param channelName name of the channel that is using the writer
* @param enableNdvAndMaxLengthStats whether to record NDV and MaxLength
* @throws IOException
*/
public BdecParquetWriter(
Expand All @@ -55,13 +59,14 @@ public BdecParquetWriter(
Map<String, String> extraMetaData,
String channelName,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression)
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNdvAndMaxLengthStats)
throws IOException {
OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes);
ParquetProperties encodingProps = createParquetProperties();
Configuration conf = new Configuration();
WriteSupport<List<Object>> writeSupport =
new BdecWriteSupport(schema, extraMetaData, channelName);
writeSupport =
new BdecWriteSupport(schema, extraMetaData, channelName, enableNdvAndMaxLengthStats);
WriteSupport.WriteContext writeContext = writeSupport.init(conf);

ParquetFileWriter fileWriter =
Expand Down Expand Up @@ -117,6 +122,19 @@ public List<BlockMetaData> getBlocksMetadata() {
return writer.getFooter().getBlocks();
}

public Map<String, Integer> getMaxLengthStats() {
return writeSupport.maxLengthStats;
}

public Map<String, Integer> getNdvStats() {
if (!writeSupport.enableNdvAndMaxLengthStats) {
return null;
}
Map<String, Integer> ndvStats = new HashMap<>();
writeSupport.ndvStatsSet.forEach((k, v) -> ndvStats.put(k, v.size()));
return ndvStats;
}

public void writeRow(List<Object> row) {
try {
writer.write(row);
Expand Down Expand Up @@ -249,16 +267,28 @@ private static class BdecWriteSupport extends WriteSupport<List<Object>> {
RecordConsumer recordConsumer;
Map<String, String> extraMetadata;
private final String channelName;
boolean enableNdvAndMaxLengthStats;
Map<String, HashSet<Object>> ndvStatsSet;
Map<String, Integer> maxLengthStats;

// TODO SNOW-672156: support specifying encodings and compression
BdecWriteSupport(MessageType schema, Map<String, String> extraMetadata, String channelName) {
BdecWriteSupport(
MessageType schema,
Map<String, String> extraMetadata,
String channelName,
boolean enableNdvAndMaxLengthStats) {
this.schema = schema;
this.extraMetadata = extraMetadata;
this.channelName = channelName;
this.enableNdvAndMaxLengthStats = enableNdvAndMaxLengthStats;
}

@Override
public WriteContext init(Configuration config) {
if (enableNdvAndMaxLengthStats) {
ndvStatsSet = new HashMap<>();
maxLengthStats = new HashMap<>();
}
return new WriteContext(schema, extraMetadata);
}

Expand All @@ -284,20 +314,24 @@ public void write(List<Object> values) {
+ values);
}
recordConsumer.startMessage();
writeValues(values, schema);
writeValues(values, schema, "");
recordConsumer.endMessage();
}

private void writeValues(List<Object> values, GroupType type) {
private void writeValues(List<Object> values, GroupType type, String path) {
List<Type> cols = type.getFields();
for (int i = 0; i < cols.size(); ++i) {
Object val = values.get(i);
if (val != null) {
String fieldName = cols.get(i).getName();
String currentPath = !path.isEmpty() ? path + "." + fieldName : fieldName;
recordConsumer.startField(fieldName, i);
if (cols.get(i).isPrimitive()) {
PrimitiveType.PrimitiveTypeName typeName =
cols.get(i).asPrimitiveType().getPrimitiveTypeName();
if (enableNdvAndMaxLengthStats) {
ndvStatsSet.computeIfAbsent(currentPath, k -> new HashSet<>()).add(val);
}
switch (typeName) {
case BOOLEAN:
recordConsumer.addBoolean((boolean) val);
Expand All @@ -320,6 +354,9 @@ private void writeValues(List<Object> values, GroupType type) {
? Binary.fromString((String) val)
: Binary.fromConstantByteArray((byte[]) val);
recordConsumer.addBinary(binVal);
if (enableNdvAndMaxLengthStats) {
maxLengthStats.merge(currentPath, binVal.length(), Math::max);
}
break;
case FIXED_LEN_BYTE_ARRAY:
Binary binary = Binary.fromConstantByteArray((byte[]) val);
Expand All @@ -334,13 +371,13 @@ private void writeValues(List<Object> values, GroupType type) {
for (Object o : values) {
recordConsumer.startGroup();
if (o != null) {
writeValues((List<Object>) o, cols.get(i).asGroupType());
writeValues((List<Object>) o, cols.get(i).asGroupType(), currentPath);
}
recordConsumer.endGroup();
}
} else {
recordConsumer.startGroup();
writeValues((List<Object>) val, cols.get(i).asGroupType());
writeValues((List<Object>) val, cols.get(i).asGroupType(), currentPath);
recordConsumer.endGroup();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(int metada
String columnName = "C1";
ChannelData<ParquetChunkData> channelData = Mockito.spy(new ChannelData<>());
MessageType schema = createSchema(columnName);
Mockito.doReturn(new ParquetFlusher(schema, 100L, Constants.BdecParquetCompression.GZIP))
Mockito.doReturn(new ParquetFlusher(schema, 100L, Constants.BdecParquetCompression.GZIP, false))
.when(channelData)
.createFlusher();

Expand All @@ -82,7 +82,8 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(int metada
new HashMap<>(),
"CHANNEL",
1000,
Constants.BdecParquetCompression.GZIP);
Constants.BdecParquetCompression.GZIP,
false);
bdecParquetWriter.writeRow(Collections.singletonList("1"));
channelData.setVectors(
new ParquetChunkData(
Expand Down

0 comments on commit f167a17

Please sign in to comment.