Skip to content

Commit

Permalink
Merge branch 'master' into sfc-gh-xhuang-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-xhuang authored Oct 3, 2024
2 parents 864e84d + ef92ea9 commit 53b323b
Show file tree
Hide file tree
Showing 45 changed files with 2,365 additions and 411 deletions.
16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<commonstext.version>1.11.0</commonstext.version>
<fasterxml.version>2.17.2</fasterxml.version>
<guava.version>32.0.1-jre</guava.version>
<hadoop.version>3.3.6</hadoop.version>
<hadoop.version>3.4.0</hadoop.version>
<iceberg.version>1.5.2</iceberg.version>
<jacoco.skip.instrument>true</jacoco.skip.instrument>
<jacoco.version>0.8.5</jacoco.version>
Expand All @@ -61,7 +61,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<net.minidev.version>2.4.9</net.minidev.version>
<netty.version>4.1.94.Final</netty.version>
<netty.version>4.1.113.Final</netty.version>
<nimbusds.version>9.37.3</nimbusds.version>
<objenesis.version>3.1</objenesis.version>
<parquet.version>1.14.1</parquet.version>
Expand Down Expand Up @@ -240,6 +240,10 @@
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
Expand Down Expand Up @@ -385,6 +389,14 @@
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
Expand Down
1 change: 1 addition & 0 deletions scripts/check_content.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ if jar tvf $DIR/../target/snowflake-ingest-sdk.jar | awk '{print $8}' | \
grep -v PropertyList-1.0.dtd | \
grep -v properties.dtd | \
grep -v parquet.thrift | \
grep -v assets/org/apache/commons/math3/random/new-joe-kuo-6.1000 | \

# Native zstd libraries are allowed
grep -v -E '^darwin' | \
Expand Down
8 changes: 8 additions & 0 deletions scripts/process_licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@
"com.nimbusds:nimbus-jose-jwt": APACHE_LICENSE,
"com.github.stephenc.jcip:jcip-annotations": APACHE_LICENSE,
"io.netty:netty-common": APACHE_LICENSE,
"io.netty:netty-handler": APACHE_LICENSE,
"io.netty:netty-resolver": APACHE_LICENSE,
"io.netty:netty-buffer": APACHE_LICENSE,
"io.netty:netty-transport": APACHE_LICENSE,
"io.netty:netty-transport-native-unix-common": APACHE_LICENSE,
"io.netty:netty-codec": APACHE_LICENSE,
"io.netty:netty-transport-native-epoll": APACHE_LICENSE,
"io.netty:netty-transport-classes-epoll": APACHE_LICENSE,
"com.google.re2j:re2j": GO_LICENSE,
"com.google.protobuf:protobuf-java": BSD_3_CLAUSE_LICENSE,
"com.google.code.gson:gson": APACHE_LICENSE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public enum ApiName {
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
GENERATE_PRESIGNED_URLS("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static <T> Blob constructBlobAndMetadata(

Flusher<T> flusher = channelsDataPerTable.get(0).createFlusher();
Flusher.SerializationResult serializedChunk =
flusher.serialize(channelsDataPerTable, filePath, curDataSize);
flusher.serialize(channelsDataPerTable, filePath);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
final byte[] compressedChunkData;
Expand Down Expand Up @@ -117,7 +117,7 @@ static <T> Blob constructBlobAndMetadata(

// Create chunk metadata
long startOffset = curDataSize;
ChunkMetadata chunkMetadata =
ChunkMetadata.Builder chunkMetadataBuilder =
ChunkMetadata.builder()
.setOwningTableFromChannelContext(firstChannelFlushContext)
// The start offset will be updated later in BlobBuilder#build to include the blob
Expand All @@ -136,9 +136,18 @@ static <T> Blob constructBlobAndMetadata(
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setDefaultValuesInEp()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond())
.setMajorMinorVersionInEp(internalParameterProvider.setMajorMinorVersionInEp())
.build();
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
.setExtendedMetadataSize(-1L);
}

ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();

// Add chunk metadata and data to the list
chunksMetadataList.add(chunkMetadata);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Utils;

/** Metadata for a chunk that sends to Snowflake as part of the register blob request */
Expand All @@ -24,8 +23,10 @@ class ChunkMetadata {
private final Long encryptionKeyId;
private final Long firstInsertTimeInMs;
private final Long lastInsertTimeInMs;
private Integer parquetMajorVersion;
private Integer parquetMinorVersion;
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long extendedMetadataSize;

static Builder builder() {
return new Builder();
Expand All @@ -47,7 +48,10 @@ static class Builder {
private Long encryptionKeyId;
private Long firstInsertTimeInMs;
private Long lastInsertTimeInMs;
private boolean setMajorMinorVersionInEp;
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long extendedMetadataSize;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
this.dbName = channelFlushContext.getDbName();
Expand Down Expand Up @@ -105,8 +109,23 @@ Builder setLastInsertTimeInMs(Long lastInsertTimeInMs) {
return this;
}

Builder setMajorMinorVersionInEp(boolean setMajorMinorVersionInEp) {
this.setMajorMinorVersionInEp = setMajorMinorVersionInEp;
Builder setMajorVersion(Integer majorVersion) {
this.majorVersion = majorVersion;
return this;
}

Builder setMinorVersion(Integer minorVersion) {
this.minorVersion = minorVersion;
return this;
}

Builder setCreatedOn(Long createdOn) {
this.createdOn = createdOn;
return this;
}

Builder setExtendedMetadataSize(Long extendedMetadataSize) {
this.extendedMetadataSize = extendedMetadataSize;
return this;
}

Expand Down Expand Up @@ -141,10 +160,12 @@ private ChunkMetadata(Builder builder) {
this.firstInsertTimeInMs = builder.firstInsertTimeInMs;
this.lastInsertTimeInMs = builder.lastInsertTimeInMs;

if (builder.setMajorMinorVersionInEp) {
this.parquetMajorVersion = Constants.PARQUET_MAJOR_VERSION;
this.parquetMinorVersion = Constants.PARQUET_MINOR_VERSION;
}
// iceberg-specific fields, no need for conditional since both sides are nullable and the
// caller of ChunkMetadata.Builder only sets these fields when we're in iceberg mode
this.majorVersion = builder.majorVersion;
this.minorVersion = builder.minorVersion;
this.createdOn = builder.createdOn;
this.extendedMetadataSize = builder.extendedMetadataSize;
}

/**
Expand Down Expand Up @@ -217,16 +238,29 @@ Long getLastInsertTimeInMs() {
}

// Snowflake service had a bug that did not allow the client to add new json fields in some
// contracts; thus these new fields have a NON_DEFAULT attribute.
// contracts; thus these new fields have a NON_NULL attribute. NON_DEFAULT will ignore an explicit
// zero value, thus NON_NULL is a better fit.
@JsonProperty("major_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@JsonInclude(JsonInclude.Include.NON_NULL)
Integer getMajorVersion() {
return this.parquetMajorVersion;
return this.majorVersion;
}

@JsonProperty("minor_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@JsonInclude(JsonInclude.Include.NON_NULL)
Integer getMinorVersion() {
return this.parquetMinorVersion;
return this.minorVersion;
}

@JsonProperty("created")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getCreatedOn() {
return this.createdOn;
}

@JsonProperty("ext_metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getExtendedMetadataSize() {
return this.extendedMetadataSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
private static final String BDEC_PARQUET_MESSAGE_TYPE_NAME = "bdec";
private static final String PARQUET_MESSAGE_TYPE_NAME = "schema";

private long maxChunkSizeInBytes;

Expand Down Expand Up @@ -118,4 +120,8 @@ public boolean getIsIcebergMode() {
public Optional<Integer> getMaxRowGroups() {
return maxRowGroups;
}

public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public String toString() {
map.put("byte_length", this.byteLength);
map.put("length", this.length);
map.put("nullable", this.nullable);
map.put("source_iceberg_datatype", this.sourceIcebergDataType);
map.put("source_iceberg_data_type", this.sourceIcebergDataType);
return map.toString();
}
}
Loading

0 comments on commit 53b323b

Please sign in to comment.