Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1675591 Fill in ExternalVolume and ExternalVolumeManager to do presigned url retrieval + blobname population #837

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public enum ApiName {
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
GENERATE_PRESIGNED_URLS("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static <T> Blob constructBlobAndMetadata(

// Create chunk metadata
long startOffset = curDataSize;
ChunkMetadata chunkMetadata =
ChunkMetadata.Builder chunkMetadataBuilder =
ChunkMetadata.builder()
.setOwningTableFromChannelContext(firstChannelFlushContext)
// The start offset will be updated later in BlobBuilder#build to include the blob
Expand All @@ -136,9 +136,17 @@ static <T> Blob constructBlobAndMetadata(
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setDefaultValuesInEp()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond())
.setMajorMinorVersionInEp(internalParameterProvider.setMajorMinorVersionInEp())
.build();
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
.setCreatedOn(0L)
.setExtendedMetadataSize(-1L);
}

ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();

// Add chunk metadata and data to the list
chunksMetadataList.add(chunkMetadata);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Utils;

/** Metadata for a chunk that sends to Snowflake as part of the register blob request */
Expand All @@ -24,8 +23,10 @@ class ChunkMetadata {
private final Long encryptionKeyId;
private final Long firstInsertTimeInMs;
private final Long lastInsertTimeInMs;
private Integer parquetMajorVersion;
private Integer parquetMinorVersion;
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long extendedMetadataSize;

static Builder builder() {
return new Builder();
Expand All @@ -47,7 +48,10 @@ static class Builder {
private Long encryptionKeyId;
private Long firstInsertTimeInMs;
private Long lastInsertTimeInMs;
private boolean setMajorMinorVersionInEp;
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long extendedMetadataSize;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
this.dbName = channelFlushContext.getDbName();
Expand Down Expand Up @@ -105,8 +109,23 @@ Builder setLastInsertTimeInMs(Long lastInsertTimeInMs) {
return this;
}

Builder setMajorMinorVersionInEp(boolean setMajorMinorVersionInEp) {
this.setMajorMinorVersionInEp = setMajorMinorVersionInEp;
Builder setMajorVersion(Integer majorVersion) {
this.majorVersion = majorVersion;
return this;
}

Builder setMinorVersion(Integer minorVersion) {
this.minorVersion = minorVersion;
return this;
}

Builder setCreatedOn(Long createdOn) {
this.createdOn = createdOn;
return this;
}

Builder setExtendedMetadataSize(Long extendedMetadataSize) {
this.extendedMetadataSize = extendedMetadataSize;
return this;
}

Expand Down Expand Up @@ -141,10 +160,12 @@ private ChunkMetadata(Builder builder) {
this.firstInsertTimeInMs = builder.firstInsertTimeInMs;
this.lastInsertTimeInMs = builder.lastInsertTimeInMs;

if (builder.setMajorMinorVersionInEp) {
this.parquetMajorVersion = Constants.PARQUET_MAJOR_VERSION;
this.parquetMinorVersion = Constants.PARQUET_MINOR_VERSION;
}
// iceberg-specific fields, no need for conditional since both sides are nullable and the
// caller of ChunkMetadata.Builder only sets these fields when we're in iceberg mode
this.majorVersion = builder.majorVersion;
this.minorVersion = builder.minorVersion;
this.createdOn = builder.createdOn;
this.extendedMetadataSize = builder.extendedMetadataSize;
}

/**
Expand Down Expand Up @@ -217,16 +238,29 @@ Long getLastInsertTimeInMs() {
}

// Snowflake service had a bug that did not allow the client to add new json fields in some
// contracts; thus these new fields have a NON_DEFAULT attribute.
// contracts; thus these new fields have a NON_NULL attribute. NON_DEFAULT will ignore an explicit
// zero value, thus NON_NULL is a better fit.
@JsonProperty("major_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
sfc-gh-hmadan marked this conversation as resolved.
Show resolved Hide resolved
@JsonInclude(JsonInclude.Include.NON_NULL)
Integer getMajorVersion() {
return this.parquetMajorVersion;
return this.majorVersion;
}

@JsonProperty("minor_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@JsonInclude(JsonInclude.Include.NON_NULL)
Integer getMinorVersion() {
return this.parquetMinorVersion;
return this.minorVersion;
}

@JsonProperty("created")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getCreatedOn() {
return this.createdOn;
}

@JsonProperty("ext_metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getExtendedMetadataSize() {
return this.extendedMetadataSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public String toString() {
map.put("byte_length", this.byteLength);
map.put("length", this.length);
map.put("nullable", this.nullable);
map.put("source_iceberg_datatype", this.sourceIcebergDataType);
map.put("source_iceberg_data_type", this.sourceIcebergDataType);
return map.toString();
}
}
Loading
Loading