Skip to content

Commit

Permalink
fix npe
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 15, 2024
1 parent bef322d commit 19d66e4
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 213 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<fasterxml.version>2.17.2</fasterxml.version>
<guava.version>32.0.1-jre</guava.version>
<hadoop.version>3.4.0</hadoop.version>
<iceberg.version>1.5.2</iceberg.version>
<iceberg.version>1.6.1</iceberg.version>
<jacoco.skip.instrument>true</jacoco.skip.instrument>
<jacoco.version>0.8.5</jacoco.version>
<license.processing.dependencyJarsDir>${project.build.directory}/dependency-jars</license.processing.dependencyJarsDir>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ public InsertValidationResponse insertRows(
// Temp stats map to use until all the rows are validated
@VisibleForTesting Map<String, RowBufferStats> tempStatsMap;

// Map of the column name to the column object, used for null/missing column check
protected final Map<String, ParquetColumn> fieldIndex;

// Lock used to protect the buffers from concurrent read/write
private final Lock flushLock;

Expand Down Expand Up @@ -352,6 +355,8 @@ public InsertValidationResponse insertRows(
// Initialize empty stats
this.statsMap = new HashMap<>();
this.tempStatsMap = new HashMap<>();

this.fieldIndex = new HashMap<>();
}

/**
Expand Down Expand Up @@ -427,7 +432,7 @@ Set<String> verifyInputColumns(
List<String> missingCols = new ArrayList<>();
for (String columnName : this.nonNullableFieldNames) {
if (!inputColNamesMap.containsKey(columnName)) {
missingCols.add(statsMap.get(columnName).getColumnDisplayName());
missingCols.add(fieldIndex.get(columnName).columnMetadata.getName());
}
}

Expand All @@ -447,7 +452,7 @@ Set<String> verifyInputColumns(
for (String columnName : this.nonNullableFieldNames) {
if (inputColNamesMap.containsKey(columnName)
&& row.get(inputColNamesMap.get(columnName)) == null) {
nullValueNotNullCols.add(statsMap.get(columnName).getColumnDisplayName());
nullValueNotNullCols.add(fieldIndex.get(columnName).columnMetadata.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import net.snowflake.ingest.utils.Cryptor;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.Utils;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.hadoop.ParquetFileWriter;

/**
Expand Down Expand Up @@ -91,7 +91,8 @@ static <T> Blob constructBlobAndMetadata(
final byte[] compressedChunkData;
final int chunkLength;
final int compressedChunkDataSize;
int extendedMetadataSize = -1;
long extendedMetadataSize = -1;
long metadataSize = -1;

if (internalParameterProvider.getEnableChunkEncryption()) {
Pair<byte[], Integer> paddedChunk =
Expand All @@ -115,7 +116,8 @@ static <T> Blob constructBlobAndMetadata(
compressedChunkDataSize = chunkLength;

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength);
metadataSize = getExtendedMetadataSize(compressedChunkData);
extendedMetadataSize = serializedChunk.extendedMetadataSize;
}
}

Expand Down Expand Up @@ -148,11 +150,12 @@ static <T> Blob constructBlobAndMetadata(

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION)
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
.setExtendedMetadataSize((long) extendedMetadataSize);
.setMetadataSize(metadataSize)
.setExtendedMetadataSize(extendedMetadataSize);
}

ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();
Expand Down Expand Up @@ -298,4 +301,22 @@ static class Blob {
this.blobStats = blobStats;
}
}

/**
* Get the metadata size (footer size) from a parquet file
*
* @param bytes the serialized parquet file
* @return the extended metadata size
*/
static long getExtendedMetadataSize(byte[] bytes) throws IOException {
final int magicOffset = bytes.length - ParquetFileWriter.MAGIC.length;
final int footerSizeOffset = magicOffset - Integer.BYTES;
if (footerSizeOffset < 0
|| !ParquetFileWriter.MAGIC_STR.equals(
new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) {
throw new IllegalArgumentException("Invalid parquet file");
}

return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -26,6 +26,7 @@ class ChunkMetadata {
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long metadataSize;
private Long extendedMetadataSize;

static Builder builder() {
Expand All @@ -51,6 +52,7 @@ static class Builder {
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long metadataSize;
private Long extendedMetadataSize;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
Expand Down Expand Up @@ -124,6 +126,11 @@ Builder setCreatedOn(Long createdOn) {
return this;
}

Builder setMetadataSize(Long metadataSize) {
this.metadataSize = metadataSize;
return this;
}

Builder setExtendedMetadataSize(Long extendedMetadataSize) {
this.extendedMetadataSize = extendedMetadataSize;
return this;
Expand Down Expand Up @@ -258,6 +265,12 @@ Long getCreatedOn() {
return this.createdOn;
}

@JsonProperty("metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getMetadataSize() {
return this.metadataSize;
}

@JsonProperty("ext_metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getExtendedMetadataSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class FileColumnProperties {
private long nullCount;

// for elements in repeated columns
private Long numberOfValues;
private long numberOfValues;

// for binary or string columns
private long maxLength;
Expand Down Expand Up @@ -289,12 +289,12 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
}

@JsonProperty("numberOfValues")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Long getNumberOfValues() {
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = IgnoreMinusOneFilter.class)
long getNumberOfValues() {
return numberOfValues;
}

void setNumberOfValues(Long numberOfValues) {
void setNumberOfValues(long numberOfValues) {
this.numberOfValues = numberOfValues;
}

Expand Down Expand Up @@ -360,4 +360,14 @@ public int hashCode() {
nullCount,
maxLength);
}

static class IgnoreMinusOneFilter {
@Override
public boolean equals(Object obj) {
if (obj instanceof Long) {
return (Long) obj == -1;
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ class SerializationResult {
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;
final long extendedMetadataSize;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
Pair<Long, Long> chunkMinMaxInsertTimeInMs,
long extendedMetadataSize) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
this.extendedMetadataSize = extendedMetadataSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.hadoop.SnowflakeParquetWriter;
import org.apache.parquet.schema.MessageType;

/**
Expand Down Expand Up @@ -66,7 +66,7 @@ private SerializationResult serializeFromJavaObjects(
String firstChannelFullyQualifiedTableName = null;
Map<String, RowBufferStats> columnEpStatsMapCombined = null;
List<List<Object>> rows = null;
BdecParquetWriter parquetWriter;
SnowflakeParquetWriter parquetWriter;
ByteArrayOutputStream mergedData = new ByteArrayOutputStream();
Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;

Expand Down Expand Up @@ -129,7 +129,7 @@ private SerializationResult serializeFromJavaObjects(
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
parquetWriter =
new BdecParquetWriter(
new SnowflakeParquetWriter(
mergedData,
schema,
metadata,
Expand All @@ -150,7 +150,8 @@ private SerializationResult serializeFromJavaObjects(
rowCount,
chunkEstimatedUncompressedSize,
mergedData,
chunkMinMaxInsertTimeInMs);
chunkMinMaxInsertTimeInMs,
parquetWriter.getExtendedMetadataSize());
}

/**
Expand All @@ -164,7 +165,7 @@ private SerializationResult serializeFromJavaObjects(
* Used only for logging purposes if there is a mismatch.
*/
private void verifyRowCounts(
BdecParquetWriter writer,
SnowflakeParquetWriter writer,
long totalMetadataRowCount,
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
long javaSerializationTotalRowCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
*/
public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {

private final Map<String, ParquetColumn> fieldIndex;

/* map that contains metadata like typeinfo for columns and other information needed by the server scanner */
private final Map<String, String> metadata;

Expand Down Expand Up @@ -72,7 +70,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
clientBufferParameters,
offsetTokenVerificationFunction,
telemetryService);
this.fieldIndex = new HashMap<>();
this.metadata = new HashMap<>();
this.data = new ArrayList<>();
this.tempData = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.Constants.EP_NDV_UNKNOWN;
import static net.snowflake.ingest.utils.Constants.EP_NV_UNKNOWN;

import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -48,7 +49,7 @@ class RowBufferStats {
private final boolean enableDistinctValuesCount;
private Set<Object> distinctValues;
private final boolean enableValuesCount;
private Long numberOfValues;
private long numberOfValues;

RowBufferStats(
String columnDisplayName,
Expand Down Expand Up @@ -310,9 +311,8 @@ long getDistinctValues() {
return enableDistinctValuesCount ? distinctValues.size() : EP_NDV_UNKNOWN;
}

// TODO: change default to -1 after Oct 17
Long getNumberOfValues() {
return enableValuesCount ? numberOfValues : null;
long getNumberOfValues() {
return enableValuesCount ? numberOfValues : EP_NV_UNKNOWN;
}

String getCollationDefinitionString() {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Constants {
public static final int MAX_STREAMING_INGEST_API_CHANNEL_RETRY = 3;
public static final int STREAMING_INGEST_TELEMETRY_UPLOAD_INTERVAL_IN_SEC = 10;
public static final long EP_NDV_UNKNOWN = -1L;
public static final long EP_NV_UNKNOWN = -1L;
public static final int MAX_OAUTH_REFRESH_TOKEN_RETRY = 3;
public static final int BINARY_COLUMN_MAX_SIZE = 8 * 1024 * 1024;
public static final int VARCHAR_COLUMN_MAX_SIZE = 16 * 1024 * 1024;
Expand All @@ -72,6 +73,7 @@ public class Constants {
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public static final int PARQUET_MAJOR_VERSION = 1;
public static final int PARQUET_MINOR_VERSION = 0;

/**
Expand Down
23 changes: 0 additions & 23 deletions src/main/java/net/snowflake/ingest/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import com.codahale.metrics.Timer;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.io.StringReader;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
Expand All @@ -30,8 +29,6 @@
import java.util.Properties;
import net.snowflake.client.core.SFSessionProperty;
import org.apache.commons.codec.binary.Base64;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
Expand Down Expand Up @@ -433,24 +430,4 @@ public static String concatDotPath(String... path) {
}
return sb.toString();
}

/**
* Get the extended metadata size (footer size) from a parquet file
*
* @param bytes the serialized parquet file
* @param length the length of the byte array without padding
* @return the extended metadata size
*/
public static int getExtendedMetadataSize(byte[] bytes, int length) throws IOException {
final int magicOffset = length - ParquetFileWriter.MAGIC.length;
final int footerSizeOffset = magicOffset - Integer.BYTES;
if (bytes.length < length
|| footerSizeOffset < 0
|| !ParquetFileWriter.MAGIC_STR.equals(
new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) {
throw new IllegalArgumentException("Invalid parquet file");
}

return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package org.apache.parquet.hadoop;
Expand Down Expand Up @@ -82,7 +82,7 @@ public void close() throws IOException {
* @param data input data to be read first and then written with outputWriter
* @param outputWriter output parquet writer
*/
public static void readFileIntoWriter(byte[] data, BdecParquetWriter outputWriter) {
public static void readFileIntoWriter(byte[] data, SnowflakeParquetWriter outputWriter) {
try (BdecParquetReader reader = new BdecParquetReader(data)) {
for (List<Object> record = reader.read(); record != null; record = reader.read()) {
outputWriter.writeRow(record);
Expand Down
Loading

0 comments on commit 19d66e4

Please sign in to comment.