Skip to content

Commit

Permalink
SNOW-1675591 Fill in ExternalVolume and ExternalVolumeManager to do p…
Browse files Browse the repository at this point in the history
…resigned url retrieval + blobname population (#837)

- Add a GeneratePresignedUrls API call (new contracts + snowflakeClient API)
- Enhance ExternalVolume.java to do presigned url file uploads to S3 (GCP and Azure testing pending thus commented out)
- fix json field names where applicable
- Add @JsonIgnoreProperties(ignoreUnknown = true) for forward compatiblity with service side evolution to some contracts that were missing this tag
  • Loading branch information
sfc-gh-hmadan authored Sep 25, 2024
1 parent 0919151 commit 7b3881b
Show file tree
Hide file tree
Showing 21 changed files with 796 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public enum ApiName {
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
GENERATE_PRESIGNED_URLS("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static <T> Blob constructBlobAndMetadata(

// Create chunk metadata
long startOffset = curDataSize;
ChunkMetadata chunkMetadata =
ChunkMetadata.Builder chunkMetadataBuilder =
ChunkMetadata.builder()
.setOwningTableFromChannelContext(firstChannelFlushContext)
// The start offset will be updated later in BlobBuilder#build to include the blob
Expand All @@ -136,9 +136,17 @@ static <T> Blob constructBlobAndMetadata(
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setDefaultValuesInEp()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond())
.setMajorMinorVersionInEp(internalParameterProvider.setMajorMinorVersionInEp())
.build();
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
.setCreatedOn(0L)
.setExtendedMetadataSize(-1L);
}

ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();

// Add chunk metadata and data to the list
chunksMetadataList.add(chunkMetadata);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Utils;

/** Metadata for a chunk that sends to Snowflake as part of the register blob request */
Expand All @@ -24,8 +23,10 @@ class ChunkMetadata {
private final Long encryptionKeyId;
private final Long firstInsertTimeInMs;
private final Long lastInsertTimeInMs;
private Integer parquetMajorVersion;
private Integer parquetMinorVersion;
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long extendedMetadataSize;

static Builder builder() {
return new Builder();
Expand All @@ -47,7 +48,10 @@ static class Builder {
private Long encryptionKeyId;
private Long firstInsertTimeInMs;
private Long lastInsertTimeInMs;
private boolean setMajorMinorVersionInEp;
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long extendedMetadataSize;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
this.dbName = channelFlushContext.getDbName();
Expand Down Expand Up @@ -105,8 +109,23 @@ Builder setLastInsertTimeInMs(Long lastInsertTimeInMs) {
return this;
}

Builder setMajorMinorVersionInEp(boolean setMajorMinorVersionInEp) {
this.setMajorMinorVersionInEp = setMajorMinorVersionInEp;
Builder setMajorVersion(Integer majorVersion) {
this.majorVersion = majorVersion;
return this;
}

Builder setMinorVersion(Integer minorVersion) {
this.minorVersion = minorVersion;
return this;
}

Builder setCreatedOn(Long createdOn) {
this.createdOn = createdOn;
return this;
}

Builder setExtendedMetadataSize(Long extendedMetadataSize) {
this.extendedMetadataSize = extendedMetadataSize;
return this;
}

Expand Down Expand Up @@ -141,10 +160,12 @@ private ChunkMetadata(Builder builder) {
this.firstInsertTimeInMs = builder.firstInsertTimeInMs;
this.lastInsertTimeInMs = builder.lastInsertTimeInMs;

if (builder.setMajorMinorVersionInEp) {
this.parquetMajorVersion = Constants.PARQUET_MAJOR_VERSION;
this.parquetMinorVersion = Constants.PARQUET_MINOR_VERSION;
}
// iceberg-specific fields, no need for conditional since both sides are nullable and the
// caller of ChunkMetadata.Builder only sets these fields when we're in iceberg mode
this.majorVersion = builder.majorVersion;
this.minorVersion = builder.minorVersion;
this.createdOn = builder.createdOn;
this.extendedMetadataSize = builder.extendedMetadataSize;
}

/**
Expand Down Expand Up @@ -217,16 +238,29 @@ Long getLastInsertTimeInMs() {
}

// Snowflake service had a bug that did not allow the client to add new json fields in some
// contracts; thus these new fields have a NON_DEFAULT attribute.
// contracts; thus these new fields have a NON_NULL attribute. NON_DEFAULT will ignore an explicit
// zero value, thus NON_NULL is a better fit.
@JsonProperty("major_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@JsonInclude(JsonInclude.Include.NON_NULL)
Integer getMajorVersion() {
return this.parquetMajorVersion;
return this.majorVersion;
}

@JsonProperty("minor_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@JsonInclude(JsonInclude.Include.NON_NULL)
Integer getMinorVersion() {
return this.parquetMinorVersion;
return this.minorVersion;
}

@JsonProperty("created")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getCreatedOn() {
return this.createdOn;
}

@JsonProperty("ext_metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getExtendedMetadataSize() {
return this.extendedMetadataSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public String toString() {
map.put("byte_length", this.byteLength);
map.put("length", this.length);
map.put("nullable", this.nullable);
map.put("source_iceberg_datatype", this.sourceIcebergDataType);
map.put("source_iceberg_data_type", this.sourceIcebergDataType);
return map.toString();
}
}
Loading

0 comments on commit 7b3881b

Please sign in to comment.