diff --git a/pom.xml b/pom.xml
index 9485a61da..2867ff47c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@
1.11.0
2.17.2
32.0.1-jre
- 3.3.6
+ 3.4.0
1.5.2
true
0.8.5
@@ -61,7 +61,7 @@
1.8
1.8
2.4.9
- 4.1.94.Final
+ 4.1.113.Final
9.37.3
3.1
1.14.1
@@ -240,6 +240,10 @@
org.apache.zookeeper
zookeeper
+
+ org.bouncycastle
+ bcprov-jdk15on
+
org.eclipse.jetty
jetty-server
@@ -385,6 +389,14 @@
javax.xml.bind
jaxb-api
+
+ org.apache.hadoop
+ hadoop-yarn-common
+
+
+ org.bouncycastle
+ bcprov-jdk15on
+
org.slf4j
slf4j-reload4j
diff --git a/scripts/check_content.sh b/scripts/check_content.sh
index e4d3e2076..7608c23ec 100755
--- a/scripts/check_content.sh
+++ b/scripts/check_content.sh
@@ -28,6 +28,7 @@ if jar tvf $DIR/../target/snowflake-ingest-sdk.jar | awk '{print $8}' | \
grep -v PropertyList-1.0.dtd | \
grep -v properties.dtd | \
grep -v parquet.thrift | \
+ grep -v assets/org/apache/commons/math3/random/new-joe-kuo-6.1000 | \
# Native zstd libraries are allowed
grep -v -E '^darwin' | \
diff --git a/scripts/process_licenses.py b/scripts/process_licenses.py
index 9f715abd6..b5181bce1 100644
--- a/scripts/process_licenses.py
+++ b/scripts/process_licenses.py
@@ -50,6 +50,14 @@
"com.nimbusds:nimbus-jose-jwt": APACHE_LICENSE,
"com.github.stephenc.jcip:jcip-annotations": APACHE_LICENSE,
"io.netty:netty-common": APACHE_LICENSE,
+ "io.netty:netty-handler": APACHE_LICENSE,
+ "io.netty:netty-resolver": APACHE_LICENSE,
+ "io.netty:netty-buffer": APACHE_LICENSE,
+ "io.netty:netty-transport": APACHE_LICENSE,
+ "io.netty:netty-transport-native-unix-common": APACHE_LICENSE,
+ "io.netty:netty-codec": APACHE_LICENSE,
+ "io.netty:netty-transport-native-epoll": APACHE_LICENSE,
+ "io.netty:netty-transport-classes-epoll": APACHE_LICENSE,
"com.google.re2j:re2j": GO_LICENSE,
"com.google.protobuf:protobuf-java": BSD_3_CLAUSE_LICENSE,
"com.google.code.gson:gson": APACHE_LICENSE,
diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java
index 034b4a6f0..822c969a1 100644
--- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java
+++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java
@@ -43,7 +43,7 @@ public enum ApiName {
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST"),
- STREAMING_CHANNEL_CONFIGURE("POST");
+ GENERATE_PRESIGNED_URLS("POST");
private final String httpMethod;
private ApiName(String httpMethod) {
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
index 7ad11dc3a..edc8fd4c9 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
@@ -83,7 +83,7 @@ static Blob constructBlobAndMetadata(
Flusher flusher = channelsDataPerTable.get(0).createFlusher();
Flusher.SerializationResult serializedChunk =
- flusher.serialize(channelsDataPerTable, filePath, curDataSize);
+ flusher.serialize(channelsDataPerTable, filePath);
if (!serializedChunk.channelsMetadataList.isEmpty()) {
final byte[] compressedChunkData;
@@ -117,7 +117,7 @@ static Blob constructBlobAndMetadata(
// Create chunk metadata
long startOffset = curDataSize;
- ChunkMetadata chunkMetadata =
+ ChunkMetadata.Builder chunkMetadataBuilder =
ChunkMetadata.builder()
.setOwningTableFromChannelContext(firstChannelFlushContext)
// The start offset will be updated later in BlobBuilder#build to include the blob
@@ -136,9 +136,18 @@ static Blob constructBlobAndMetadata(
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setDefaultValuesInEp()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
- .setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond())
- .setMajorMinorVersionInEp(internalParameterProvider.setMajorMinorVersionInEp())
- .build();
+ .setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());
+
+ if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
+ chunkMetadataBuilder
+ .setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
+ .setMinorVersion(Constants.PARQUET_MINOR_VERSION)
+ // set createdOn in seconds
+ .setCreatedOn(System.currentTimeMillis() / 1000)
+ .setExtendedMetadataSize(-1L);
+ }
+
+ ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();
// Add chunk metadata and data to the list
chunksMetadataList.add(chunkMetadata);
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java
deleted file mode 100644
index f6ea570ec..000000000
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
- */
-
-package net.snowflake.ingest.streaming.internal;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/** Class used to serialize the channel configure request. */
-class ChannelConfigureRequest extends ClientConfigureRequest {
- @JsonProperty("database")
- private String database;
-
- @JsonProperty("schema")
- private String schema;
-
- @JsonProperty("table")
- private String table;
-
- /**
- * Constructor for channel configure request
- *
- * @param role Role to be used for the request.
- * @param database Database name.
- * @param schema Schema name.
- * @param table Table name.
- */
- ChannelConfigureRequest(String role, String database, String schema, String table) {
- super(role);
- this.database = database;
- this.schema = schema;
- this.table = table;
- }
-
- String getDatabase() {
- return database;
- }
-
- String getSchema() {
- return schema;
- }
-
- String getTable() {
- return table;
- }
-
- @Override
- public String getStringForLogging() {
- return String.format(
- "ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)",
- getRole(), database, schema, table, getFileName());
- }
-}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java
deleted file mode 100644
index da65960b4..000000000
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
- */
-
-package net.snowflake.ingest.streaming.internal;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/** Class used to deserialize responses from channel configure endpoint */
-@JsonIgnoreProperties(ignoreUnknown = true)
-class ChannelConfigureResponse extends StreamingIngestResponse {
- @JsonProperty("status_code")
- private Long statusCode;
-
- @JsonProperty("message")
- private String message;
-
- @JsonProperty("stage_location")
- private FileLocationInfo stageLocation;
-
- @Override
- Long getStatusCode() {
- return statusCode;
- }
-
- void setStatusCode(Long statusCode) {
- this.statusCode = statusCode;
- }
-
- String getMessage() {
- return message;
- }
-
- void setMessage(String message) {
- this.message = message;
- }
-
- FileLocationInfo getStageLocation() {
- return stageLocation;
- }
-
- void setStageLocation(FileLocationInfo stageLocation) {
- this.stageLocation = stageLocation;
- }
-}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java
index c0cb218ac..006782d25 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java
@@ -7,7 +7,6 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
-import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Utils;
/** Metadata for a chunk that sends to Snowflake as part of the register blob request */
@@ -24,8 +23,10 @@ class ChunkMetadata {
private final Long encryptionKeyId;
private final Long firstInsertTimeInMs;
private final Long lastInsertTimeInMs;
- private Integer parquetMajorVersion;
- private Integer parquetMinorVersion;
+ private Integer majorVersion;
+ private Integer minorVersion;
+ private Long createdOn;
+ private Long extendedMetadataSize;
static Builder builder() {
return new Builder();
@@ -47,7 +48,10 @@ static class Builder {
private Long encryptionKeyId;
private Long firstInsertTimeInMs;
private Long lastInsertTimeInMs;
- private boolean setMajorMinorVersionInEp;
+ private Integer majorVersion;
+ private Integer minorVersion;
+ private Long createdOn;
+ private Long extendedMetadataSize;
Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
this.dbName = channelFlushContext.getDbName();
@@ -105,8 +109,23 @@ Builder setLastInsertTimeInMs(Long lastInsertTimeInMs) {
return this;
}
- Builder setMajorMinorVersionInEp(boolean setMajorMinorVersionInEp) {
- this.setMajorMinorVersionInEp = setMajorMinorVersionInEp;
+ Builder setMajorVersion(Integer majorVersion) {
+ this.majorVersion = majorVersion;
+ return this;
+ }
+
+ Builder setMinorVersion(Integer minorVersion) {
+ this.minorVersion = minorVersion;
+ return this;
+ }
+
+ Builder setCreatedOn(Long createdOn) {
+ this.createdOn = createdOn;
+ return this;
+ }
+
+ Builder setExtendedMetadataSize(Long extendedMetadataSize) {
+ this.extendedMetadataSize = extendedMetadataSize;
return this;
}
@@ -141,10 +160,12 @@ private ChunkMetadata(Builder builder) {
this.firstInsertTimeInMs = builder.firstInsertTimeInMs;
this.lastInsertTimeInMs = builder.lastInsertTimeInMs;
- if (builder.setMajorMinorVersionInEp) {
- this.parquetMajorVersion = Constants.PARQUET_MAJOR_VERSION;
- this.parquetMinorVersion = Constants.PARQUET_MINOR_VERSION;
- }
+ // iceberg-specific fields, no need for conditional since both sides are nullable and the
+ // caller of ChunkMetadata.Builder only sets these fields when we're in iceberg mode
+ this.majorVersion = builder.majorVersion;
+ this.minorVersion = builder.minorVersion;
+ this.createdOn = builder.createdOn;
+ this.extendedMetadataSize = builder.extendedMetadataSize;
}
/**
@@ -217,16 +238,29 @@ Long getLastInsertTimeInMs() {
}
// Snowflake service had a bug that did not allow the client to add new json fields in some
- // contracts; thus these new fields have a NON_DEFAULT attribute.
+ // contracts; thus these new fields have a NON_NULL attribute. NON_DEFAULT will ignore an explicit
+ // zero value, thus NON_NULL is a better fit.
@JsonProperty("major_vers")
- @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
Integer getMajorVersion() {
- return this.parquetMajorVersion;
+ return this.majorVersion;
}
@JsonProperty("minor_vers")
- @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
Integer getMinorVersion() {
- return this.parquetMinorVersion;
+ return this.minorVersion;
+ }
+
+ @JsonProperty("created")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ Long getCreatedOn() {
+ return this.createdOn;
+ }
+
+ @JsonProperty("ext_metadata_size")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ Long getExtendedMetadataSize() {
+ return this.extendedMetadataSize;
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
index dfadd029a..0a9711ee8 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
@@ -10,6 +10,8 @@
/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
+ private static final String BDEC_PARQUET_MESSAGE_TYPE_NAME = "bdec";
+ private static final String PARQUET_MESSAGE_TYPE_NAME = "schema";
private long maxChunkSizeInBytes;
@@ -118,4 +120,8 @@ public boolean getIsIcebergMode() {
public Optional getMaxRowGroups() {
return maxRowGroups;
}
+
+ public String getParquetMessageTypeName() {
+ return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
+ }
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java
index 1231247b5..0f19922fe 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java
@@ -160,7 +160,7 @@ public String toString() {
map.put("byte_length", this.byteLength);
map.put("length", this.length);
map.put("nullable", this.nullable);
- map.put("source_iceberg_datatype", this.sourceIcebergDataType);
+ map.put("source_iceberg_data_type", this.sourceIcebergDataType);
return map.toString();
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
index 6e3281997..8d8bff3f5 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
@@ -1055,6 +1055,83 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
insertRowIndex);
}
+ /**
+ * Validate and cast Iceberg struct column to Map. Allowed Java type:
+ *
+ *
+ *
+ * @param columnName Column name, used in validation error messages
+ * @param input Object to validate and parse
+ * @param insertRowIndex Row index for error reporting
+ * @return Object cast to Map
+ */
+ static Map validateAndParseIcebergStruct(
+ String columnName, Object input, long insertRowIndex) {
+ if (!(input instanceof Map)) {
+ throw typeNotAllowedException(
+ columnName,
+ input.getClass(),
+ "STRUCT",
+ new String[] {"Map"},
+ insertRowIndex);
+ }
+ for (Object key : ((Map, ?>) input).keySet()) {
+ if (!(key instanceof String)) {
+ throw new SFException(
+ ErrorCode.INVALID_FORMAT_ROW,
+ String.format(
+ "Field name of struct %s must be of type String, rowIndex:%d",
+ columnName, insertRowIndex));
+ }
+ }
+
+ return (Map) input;
+ }
+
+ /**
+ * Validate and parse Iceberg list column to an Iterable. Allowed Java type:
+ *
+ *
+ *
+ * @param columnName Column name, used in validation error messages
+ * @param input Object to validate and parse
+ * @param insertRowIndex Row index for error reporting
+ * @return Object cast to Iterable
+ */
+ static Iterable> validateAndParseIcebergList(
+ String columnName, Object input, long insertRowIndex) {
+ if (!(input instanceof Iterable)) {
+ throw typeNotAllowedException(
+ columnName, input.getClass(), "LIST", new String[] {"Iterable"}, insertRowIndex);
+ }
+ return (Iterable>) input;
+ }
+
+ /**
+ * Validate and parse Iceberg map column to a map. Allowed Java type:
+ *
+ *
+ *
+ * @param columnName Column name, used in validation error messages
+ * @param input Object to validate and parse
+ * @param insertRowIndex Row index for error reporting
+ * @return Object cast to Map
+ */
+ static Map, ?> validateAndParseIcebergMap(
+ String columnName, Object input, long insertRowIndex) {
+ if (!(input instanceof Map)) {
+ throw typeNotAllowedException(
+ columnName, input.getClass(), "MAP", new String[] {"Map"}, insertRowIndex);
+ }
+ return (Map, ?>) input;
+ }
+
static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
BigDecimal comparand =
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java
index 322b53acf..d199531b7 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java
@@ -33,6 +33,9 @@ class DropChannelRequestInternal implements IStreamingIngestRequest {
@JsonProperty("client_sequencer")
Long clientSequencer;
+ @JsonProperty("is_iceberg")
+ boolean isIceberg;
+
DropChannelRequestInternal(
String requestId,
String role,
@@ -40,6 +43,7 @@ class DropChannelRequestInternal implements IStreamingIngestRequest {
String schema,
String table,
String channel,
+ boolean isIceberg,
Long clientSequencer) {
this.requestId = requestId;
this.role = role;
@@ -47,6 +51,7 @@ class DropChannelRequestInternal implements IStreamingIngestRequest {
this.schema = schema;
this.table = table;
this.channel = channel;
+ this.isIceberg = isIceberg;
this.clientSequencer = clientSequencer;
}
@@ -74,6 +79,10 @@ String getSchema() {
return schema;
}
+ boolean isIceberg() {
+ return isIceberg;
+ }
+
Long getClientSequencer() {
return clientSequencer;
}
@@ -86,7 +95,7 @@ String getFullyQualifiedTableName() {
public String getStringForLogging() {
return String.format(
"DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
- + " clientSequencer=%s)",
- requestId, role, database, schema, table, channel, clientSequencer);
+ + " isIceberg=%s, clientSequencer=%s)",
+ requestId, role, database, schema, table, channel, isIceberg, clientSequencer);
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java
index 0f1c1a934..6c36c4651 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java
@@ -1,9 +1,405 @@
package net.snowflake.ingest.streaming.internal;
+import static net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse.PresignedUrlInfo;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import net.snowflake.client.core.ExecTimeTelemetryData;
+import net.snowflake.client.core.HttpClientSettingsKey;
+import net.snowflake.client.core.OCSPMode;
+import net.snowflake.client.jdbc.RestRequest;
+import net.snowflake.client.jdbc.SnowflakeSQLException;
+import net.snowflake.client.jdbc.SnowflakeUtil;
+import net.snowflake.client.jdbc.cloud.storage.SnowflakeStorageClient;
+import net.snowflake.client.jdbc.cloud.storage.StageInfo;
+import net.snowflake.client.jdbc.cloud.storage.StorageClientFactory;
+import net.snowflake.client.jdbc.internal.apache.http.client.HttpResponseException;
+import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse;
+import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPut;
+import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder;
+import net.snowflake.client.jdbc.internal.apache.http.entity.ByteArrayEntity;
+import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
+import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils;
+import net.snowflake.client.jdbc.internal.google.api.client.http.HttpStatusCodes;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.utils.ErrorCode;
+import net.snowflake.ingest.utils.HttpUtil;
+import net.snowflake.ingest.utils.Logging;
+import net.snowflake.ingest.utils.SFException;
+
/** Handles uploading files to the Iceberg Table's external volume's table data path */
class ExternalVolume implements IStorage {
+ // TODO everywhere: static final should be named in all capitals
+ private static final Logging logger = new Logging(ExternalVolume.class);
+ private static final int DEFAULT_PRESIGNED_URL_COUNT = 10;
+ private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900;
+
+ // Allowing concurrent generate URL requests is a weak form of adapting to high throughput
+ // clients.
+ // The low watermark ideally should be adaptive too for such clients,will wait for perf runs to
+ // show its necessary.
+ private static final int MAX_CONCURRENT_GENERATE_URLS_REQUESTS = 10;
+ private static final int LOW_WATERMARK_FOR_EARLY_REFRESH = 5;
+
+ private final String clientName;
+ private final String clientPrefix;
+ private final Long deploymentId;
+ private final String role;
+
+ // The db name, schema name and table name for this storage location
+ private final TableRef tableRef;
+
+ // The RPC client for snowflake cloud service
+ private final SnowflakeServiceClient serviceClient;
+
+ // semaphore to limit how many RPCs go out for one location concurrently
+ private final Semaphore generateUrlsSemaphore;
+
+ // thread-safe queue of unused URLs, to be disbursed whenever flush codepath is cutting the next
+ // file
+ private final ConcurrentLinkedQueue presignedUrlInfos;
+
+ // sometimes-stale counter of how many URLs are remaining, to avoid calling presignedUrls.size()
+ // and increasing lock contention / volatile reads on the internal data structures inside
+ // ConcurrentLinkedQueue
+ private final AtomicInteger numUrlsInQueue;
+
+ private final FileLocationInfo locationInfo;
+ private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata;
+
+ ExternalVolume(
+ String clientName,
+ String clientPrefix,
+ Long deploymentId,
+ String role,
+ TableRef tableRef,
+ FileLocationInfo locationInfo,
+ SnowflakeServiceClient serviceClient) {
+ this.clientName = clientName;
+ this.clientPrefix = clientPrefix;
+ this.deploymentId = deploymentId;
+ this.role = role;
+ this.tableRef = tableRef;
+ this.serviceClient = serviceClient;
+ this.locationInfo = locationInfo;
+ this.presignedUrlInfos = new ConcurrentLinkedQueue<>();
+ this.numUrlsInQueue = new AtomicInteger(0);
+ this.generateUrlsSemaphore = new Semaphore(MAX_CONCURRENT_GENERATE_URLS_REQUESTS);
+
+ if (this.locationInfo.getIsClientSideEncrypted()) {
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ "Cannot ingest into an external volume that requests client side encryption");
+ }
+ if ("S3".equalsIgnoreCase(this.locationInfo.getLocationType())) {
+ // add dummy values so that JDBC's S3 client creation doesn't barf on initialization.
+ this.locationInfo.getCredentials().put("AWS_KEY_ID", "key");
+ this.locationInfo.getCredentials().put("AWS_SECRET_KEY", "secret");
+ }
+
+ try {
+ this.fileTransferMetadata =
+ InternalStage.createFileTransferMetadataWithAge(this.locationInfo);
+ } catch (JsonProcessingException
+ | SnowflakeSQLException
+ | net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException e) {
+ throw new SFException(e, ErrorCode.INTERNAL_ERROR);
+ }
+
+ generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH);
+ }
+
+ // TODO : Add timing ; add logging ; add retries ; add http exception handling better than
+ // client.handleEx?
@Override
public void put(BlobPath blobPath, byte[] blob) {
- throw new RuntimeException("not implemented");
+ if (this.fileTransferMetadata.isLocalFS) {
+ InternalStage.putLocal(this.fileTransferMetadata.localLocation, blobPath.fileName, blob);
+ return;
+ }
+
+ try {
+ putRemote(blobPath.blobPath, blob);
+ } catch (Throwable e) {
+ throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE);
+ }
+ }
+
+ private void putRemote(String blobPath, byte[] blob)
+ throws SnowflakeSQLException, URISyntaxException, IOException {
+ // TODO: Add a backlog item for somehow doing multipart upload with presigned URLs (each part
+ // has its own URL) for large files
+
+ // already verified that client side encryption is disabled, in the ctor's call to generateUrls
+ final Properties proxyProperties = HttpUtil.generateProxyPropertiesForJDBC();
+ final HttpClientSettingsKey key =
+ SnowflakeUtil.convertProxyPropertiesToHttpClientKey(OCSPMode.FAIL_OPEN, proxyProperties);
+
+ StageInfo stageInfo = fileTransferMetadata.fileTransferMetadata.getStageInfo();
+ SnowflakeStorageClient client =
+ StorageClientFactory.getFactory().createClient(stageInfo, 1, null, null);
+
+ URIBuilder uriBuilder = new URIBuilder(blobPath);
+ HttpPut httpRequest = new HttpPut(uriBuilder.build());
+ httpRequest.setEntity(new ByteArrayEntity(blob));
+
+ addHeadersToHttpRequest(httpRequest, blob, stageInfo, client);
+
+ if (stageInfo.getStageType().equals(StageInfo.StageType.AZURE)) {
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR, "Azure based external volumes are not yet supported.");
+
+ /* commenting out unverified code, will fix this when server changes are in preprod / some test deplo
+ URI storageEndpoint =
+ new URI(
+ "https",
+ stageInfo.getStorageAccount() + "." + stageInfo.getEndPoint() + "/",
+ null,
+ null);
+ String sasToken = blobPath.substring(blobPath.indexOf("?"));
+ StorageCredentials azCreds = new StorageCredentialsSharedAccessSignature(sasToken);
+ CloudBlobClient azClient = new CloudBlobClient(storageEndpoint, azCreds);
+
+ CloudBlobContainer container = azClient.getContainerReference(stageInfo.getLocation().substring(0, stageInfo.getLocation().indexOf("/")));
+ CloudBlockBlob azBlob = container.getBlockBlobReference();
+ azBlob.setMetadata((HashMap) meta.getUserMetadata());
+
+ OperationContext opContext = new OperationContext();
+ net.snowflake.client.core.HttpUtil.setSessionlessProxyForAzure(proxyProperties, opContext);
+
+ BlobRequestOptions options = new BlobRequestOptions();
+
+ try {
+ azBlob.uploadFromByteArray(blob, 0, blob.length, null, options, opContext);
+ } catch (Exception ex) {
+ ((SnowflakeAzureClient) client).handleStorageException(ex, 0, "upload", null, null, null);
+ }
+ */
+ }
+
+ CloseableHttpClient httpClient = net.snowflake.client.core.HttpUtil.getHttpClient(key);
+ CloseableHttpResponse response =
+ RestRequest.execute(
+ httpClient,
+ httpRequest,
+ 0, // retry timeout
+ 0, // auth timeout
+ (int)
+ net.snowflake.client.core.HttpUtil.getSocketTimeout()
+ .toMillis(), // socket timeout in ms
+ 1, // max retries
+ 0, // no socket timeout injection
+ null, // no canceling signaler, TODO: wire up thread interrupt with setting this
+ // AtomicBoolean to avoid retries/sleeps
+ false, // no cookie
+ false, // no url retry query parameters
+ false, // no request_guid
+ true, // retry on HTTP 403
+ true, // no retry
+ new ExecTimeTelemetryData());
+
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (!HttpStatusCodes.isSuccess(statusCode)) {
+ Exception ex =
+ new HttpResponseException(
+ response.getStatusLine().getStatusCode(),
+ String.format(
+ "%s, body: %s",
+ response.getStatusLine().getReasonPhrase(),
+ EntityUtils.toString(response.getEntity())));
+
+ client.handleStorageException(ex, 0, "upload", null, null, null);
+ }
+ }
+
+ private void addHeadersToHttpRequest(
+ HttpPut httpRequest, byte[] blob, StageInfo stageInfo, SnowflakeStorageClient client) {
+ // no need to set this as it causes a Content-length header is already set error in apache's
+ // http client.
+ // httpRequest.setHeader("Content-Length", "" + blob.length);
+
+ // TODO: These custom headers need to be a part of the presigned URL HMAC computation in S3,
+ // we'll disable them for now until we can do presigned URL generation AFTER we have the digest.
+
+ /*
+ final String clientKey = this.clientPrefix;
+ final String clientName = this.clientName;
+
+ final byte[] digestBytes;
+ try {
+ digestBytes = MessageDigest.getInstance("SHA-256").digest(blob);
+ } catch (NoSuchAlgorithmException e) {
+ throw new SFException(e, ErrorCode.INTERNAL_ERROR);
+ }
+
+ final String digest = Base64.getEncoder().encodeToString(digestBytes);
+
+ StorageObjectMetadata meta = StorageClientFactory.getFactory().createStorageMetadataObj(stageInfo.getStageType());
+
+ client.addDigestMetadata(meta, digest);
+ client.addStreamingIngestMetadata(meta, clientName, clientKey);
+
+ switch (stageInfo.getStageType()) {
+ case S3:
+ httpRequest.setHeader("x-amz-server-side-encryption", ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+ httpRequest.setHeader("x-amz-checksum-sha256", digest); // TODO why does JDBC use a custom x–amz-meta-sfc-digest header for this
+ for (Map.Entry entry : meta.getUserMetadata().entrySet()) {
+ httpRequest.setHeader("x-amz-meta-" + entry.getKey(), entry.getValue());
+ }
+
+ break;
+
+ case AZURE:
+ for (Map.Entry entry : meta.getUserMetadata().entrySet()) {
+ httpRequest.setHeader("x-ms-meta-" + entry.getKey(), entry.getValue());
+ }
+ break;
+
+ case GCS:
+ for (Map.Entry entry : meta.getUserMetadata().entrySet()) {
+ httpRequest.setHeader("x-goog-meta-" + entry.getKey(), entry.getValue());
+ }
+ break;
+ }
+ */
+ }
+
+ PresignedUrlInfo dequeueUrlInfo() {
+ PresignedUrlInfo info = this.presignedUrlInfos.poll();
+ boolean generate = false;
+ if (info == null) {
+ generate = true;
+ } else {
+ // Since the queue had a non-null entry, there is no way numUrlsInQueue is <=0
+ int remainingUrlsInQueue = this.numUrlsInQueue.decrementAndGet();
+ if (remainingUrlsInQueue <= LOW_WATERMARK_FOR_EARLY_REFRESH) {
+ generate = true;
+ // assert remaininUrlsInQueue >= 0
+ }
+ }
+ if (generate) {
+ // TODO: do this generation on a background thread to allow the current thread to make
+ // progress ? Will wait for perf runs to know this is an issue that needs addressal.
+ generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH);
+ }
+ return info;
+ }
+
+ // NOTE : We are intentionally NOT re-enqueuing unused URLs here as that can cause correctness
+ // issues by accidentally enqueuing a URL that was actually used to write data out. Its okay to
+ // allow an unused URL to go waste as we'll just go out and generate new URLs.
+ // Do NOT add an enqueueUrl() method for this reason.
+
+ private void generateUrls(int minCountToSkipGeneration) {
+ int numAcquireAttempts = 0;
+ boolean acquired = false;
+
+ while (!acquired && numAcquireAttempts++ < 300) {
+ // Use an aggressive timeout value as its possible that the other requests finished and added
+ // enough
+ // URLs to the queue. If we use a higher timeout value, this calling thread's flush is going
+ // to
+ // unnecessarily be blocked when URLs have already been added to the queue.
+ try {
+ acquired = this.generateUrlsSemaphore.tryAcquire(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // if the thread was interrupted there's nothing we can do about it, definitely shouldn't
+ // continue processing.
+
+ // reset the interrupted flag on the thread in case someone in the callstack wants to
+ // gracefully continue processing.
+ boolean interrupted = Thread.interrupted();
+ String message =
+ String.format(
+ "Semaphore acquisition in ExternalVolume.generateUrls was interrupted, likely"
+ + " because the process is shutting down. TableRef=%s Thread.interrupted=%s",
+ tableRef, interrupted);
+ logger.logError(message);
+ throw new SFException(ErrorCode.INTERNAL_ERROR, message);
+ }
+
+ // In case Acquire took time because no permits were available, it implies we already had N
+ // other threads
+ // fetching more URLs. In that case we should be content with what's in the buffer instead of
+ // doing another RPC
+ // potentially unnecessarily.
+ if (this.numUrlsInQueue.get() >= minCountToSkipGeneration) {
+ // release the semaphore before early-exiting to avoid a leak in semaphore permits.
+ if (acquired) {
+ this.generateUrlsSemaphore.release();
+ }
+
+ return;
+ }
+ }
+
+ // if we're here without acquiring, that implies the numAcquireAttempts went over 300. We're at
+ // an impasse
+ // and so there's nothing more to be done except error out, as that gives the client a chance to
+ // restart.
+ if (!acquired) {
+ String message =
+ String.format("Could not acquire semaphore to generate URLs. TableRef=%s", tableRef);
+ logger.logError(message);
+ throw new SFException(ErrorCode.INTERNAL_ERROR, message);
+ }
+
+ // we have acquired a semaphore permit at this point, must release before returning
+
+ try {
+ GeneratePresignedUrlsResponse response = doGenerateUrls();
+ List urlInfos = response.getPresignedUrlInfos();
+ urlInfos =
+ urlInfos.stream()
+ .filter(
+ info -> {
+ if (info == null
+ || info.url == null
+ || info.fileName == null
+ || info.url.isEmpty()) {
+ logger.logError(
+ "Received unexpected null or empty URL in externalVolume.generateUrls"
+ + " tableRef=%s",
+ this.tableRef);
+ return false;
+ }
+
+ return true;
+ })
+ .collect(Collectors.toList());
+
+ // these are both thread-safe operations individually, and there is no need to do them inside
+ // a lock.
+ // For an infinitesimal time the numUrlsInQueue will under represent the number of entries in
+ // the queue.
+ this.presignedUrlInfos.addAll(urlInfos);
+ this.numUrlsInQueue.addAndGet(urlInfos.size());
+ } finally {
+ this.generateUrlsSemaphore.release();
+ }
+ }
+
+ private GeneratePresignedUrlsResponse doGenerateUrls() {
+ try {
+ return this.serviceClient.generatePresignedUrls(
+ new GeneratePresignedUrlsRequest(
+ tableRef,
+ role,
+ DEFAULT_PRESIGNED_URL_COUNT,
+ DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS,
+ deploymentId,
+ true));
+
+ } catch (IngestResponseException | IOException e) {
+ throw new SFException(e, ErrorCode.GENERATE_PRESIGNED_URLS_FAILURE, e.getMessage());
+ }
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java
index 3c6bf3f9d..556d02b9b 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java
@@ -4,6 +4,8 @@
package net.snowflake.ingest.streaming.internal;
+import static net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse.PresignedUrlInfo;
+
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -16,7 +18,6 @@
class ExternalVolumeManager implements IStorageManager {
// TODO: Rename all logger members to LOGGER and checkin code formatting rules
private static final Logging logger = new Logging(ExternalVolumeManager.class);
-
// Reference to the external volume per table
private final Map externalVolumeMap;
@@ -31,6 +32,9 @@ class ExternalVolumeManager implements IStorageManager {
// Client prefix generated by the Snowflake server
private final String clientPrefix;
+ // Deployment ID returned by the Snowflake server
+ private final Long deploymentId;
+
// concurrency control to avoid creating multiple ExternalVolume objects for the same table (if
// openChannel is called
// multiple times concurrently)
@@ -54,16 +58,13 @@ class ExternalVolumeManager implements IStorageManager {
this.serviceClient = snowflakeServiceClient;
this.externalVolumeMap = new ConcurrentHashMap<>();
try {
- this.clientPrefix =
- isTestMode
- ? "testPrefix"
- : this.serviceClient
- .clientConfigure(new ClientConfigureRequest(role))
- .getClientPrefix();
+ ClientConfigureResponse response =
+ this.serviceClient.clientConfigure(new ClientConfigureRequest(role));
+ this.clientPrefix = isTestMode ? "testPrefix" : response.getClientPrefix();
+ this.deploymentId = response.getDeploymentId();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
-
logger.logDebug(
"Created ExternalVolumeManager with clientName=%s and clientPrefix=%s",
clientName, clientPrefix);
@@ -76,7 +77,7 @@ class ExternalVolumeManager implements IStorageManager {
* @return target storage
*/
@Override
- public IStorage getStorage(String fullyQualifiedTableName) {
+ public ExternalVolume getStorage(String fullyQualifiedTableName) {
// Only one chunk per blob in Iceberg mode.
return getVolumeSafe(fullyQualifiedTableName);
}
@@ -103,7 +104,15 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {
}
try {
- ExternalVolume externalVolume = new ExternalVolume();
+ ExternalVolume externalVolume =
+ new ExternalVolume(
+ clientName,
+ getClientPrefix(),
+ deploymentId,
+ role,
+ tableRef,
+ locationInfo,
+ serviceClient);
this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume);
} catch (SFException ex) {
logger.logError(
@@ -113,7 +122,6 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {
} catch (Exception err) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err);
-
throw new SFException(
err,
ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
@@ -124,7 +132,9 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {
@Override
public BlobPath generateBlobPath(String fullyQualifiedTableName) {
- throw new RuntimeException("not implemented");
+ ExternalVolume volume = getVolumeSafe(fullyQualifiedTableName);
+ PresignedUrlInfo urlInfo = volume.dequeueUrlInfo();
+ return BlobPath.presignedUrlWithToken(urlInfo.fileName, urlInfo.url);
}
/**
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
index 0ec671bf7..3a8dbc2b6 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
@@ -1,7 +1,12 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
package net.snowflake.ingest.streaming.internal;
import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;
@@ -9,6 +14,7 @@
/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
private int columnOrdinal;
+ private Integer fieldId;
private String minStrValue;
private String maxStrValue;
@@ -46,6 +52,7 @@ class FileColumnProperties {
FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
this.setColumnOrdinal(stats.getOrdinal());
+ this.setFieldId(stats.getFieldId());
this.setCollation(stats.getCollationDefinitionString());
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
@@ -93,6 +100,16 @@ public void setColumnOrdinal(int columnOrdinal) {
this.columnOrdinal = columnOrdinal;
}
+ @JsonProperty("fieldId")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Integer getFieldId() {
+ return fieldId;
+ }
+
+ public void setFieldId(Integer fieldId) {
+ this.fieldId = fieldId;
+ }
+
// Annotation required in order to have package private fields serialized
@JsonProperty("minStrValue")
String getMinStrValue() {
@@ -206,6 +223,7 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"columnOrdinal\": ").append(columnOrdinal);
+ sb.append(", \"fieldId\": ").append(fieldId);
if (minIntValue != null) {
sb.append(", \"minIntValue\": ").append(minIntValue);
sb.append(", \"maxIntValue\": ").append(maxIntValue);
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java
index 0cf8220bb..241defdfc 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
+ * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/
package net.snowflake.ingest.streaming.internal;
@@ -20,15 +20,12 @@ public interface Flusher {
/**
* Serialize buffered rows into the underlying format.
*
- * @param fullyQualifiedTableName
* @param channelsDataPerTable buffered rows
* @param filePath file path
- * @param chunkStartOffset
* @return {@link SerializationResult}
* @throws IOException
*/
- SerializationResult serialize(
- List> channelsDataPerTable, String filePath, long chunkStartOffset)
+ SerializationResult serialize(List> channelsDataPerTable, String filePath)
throws IOException;
/** Holds result of the buffered rows conversion: channel metadata and stats. */
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsRequest.java
new file mode 100644
index 000000000..05a085ed6
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsRequest.java
@@ -0,0 +1,93 @@
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+class GeneratePresignedUrlsRequest implements IStreamingIngestRequest {
+ @JsonProperty("database")
+ private String dbName;
+
+ @JsonProperty("schema")
+ private String schemaName;
+
+ @JsonProperty("table")
+ private String tableName;
+
+ @JsonProperty("role")
+ private String role;
+
+ @JsonProperty("count")
+ private Integer count;
+
+ @JsonProperty("timeout_in_seconds")
+ private Integer timeoutInSeconds;
+
+ @JsonProperty("deployment_global_id")
+ private Long deploymentGlobalId;
+
+ @JsonProperty("is_iceberg")
+ private boolean isIceberg;
+
+ public GeneratePresignedUrlsRequest(
+ TableRef tableRef,
+ String role,
+ int count,
+ int timeoutInSeconds,
+ Long deploymentGlobalId,
+ boolean isIceberg) {
+ this.dbName = tableRef.dbName;
+ this.schemaName = tableRef.schemaName;
+ this.tableName = tableRef.tableName;
+ this.count = count;
+ this.role = role;
+ this.timeoutInSeconds = timeoutInSeconds;
+ this.deploymentGlobalId = deploymentGlobalId;
+ this.isIceberg = isIceberg;
+ }
+
+ String getDBName() {
+ return this.dbName;
+ }
+
+ String getSchemaName() {
+ return this.schemaName;
+ }
+
+ String getTableName() {
+ return this.tableName;
+ }
+
+ String getRole() {
+ return this.role;
+ }
+
+ Integer getCount() {
+ return this.count;
+ }
+
+ Long getDeploymentGlobalId() {
+ return this.deploymentGlobalId;
+ }
+
+ Integer getTimeoutInSeconds() {
+ return this.timeoutInSeconds;
+ }
+
+ boolean getIsIceberg() {
+ return this.isIceberg;
+ }
+
+ @Override
+ public String getStringForLogging() {
+ return String.format(
+ "GetPresignedUrlsRequest(db=%s, schema=%s, table=%s, count=%s, timeoutInSeconds=%s"
+ + " deploymentGlobalId=%s role=%s, isIceberg=%s)",
+ dbName,
+ schemaName,
+ tableName,
+ count,
+ timeoutInSeconds,
+ deploymentGlobalId,
+ role,
+ isIceberg);
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsResponse.java
new file mode 100644
index 000000000..32bf24104
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsResponse.java
@@ -0,0 +1,47 @@
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+class GeneratePresignedUrlsResponse extends StreamingIngestResponse {
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class PresignedUrlInfo {
+ @JsonProperty("file_name")
+ public String fileName;
+
+ @JsonProperty("url")
+ public String url;
+
+ // default constructor for jackson deserialization
+ public PresignedUrlInfo() {}
+
+ public PresignedUrlInfo(String fileName, String url) {
+ this.fileName = fileName;
+ this.url = url;
+ }
+ }
+
+ @JsonProperty("status_code")
+ private Long statusCode;
+
+ @JsonProperty("message")
+ private String message;
+
+ @JsonProperty("presigned_url_infos")
+ private List presignedUrlInfos;
+
+ @Override
+ Long getStatusCode() {
+ return this.statusCode;
+ }
+
+ String getMessage() {
+ return this.message;
+ }
+
+ List getPresignedUrlInfos() {
+ return this.presignedUrlInfos;
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
index 18a66f4d5..963dbf188 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
@@ -5,18 +5,28 @@
package net.snowflake.ingest.streaming.internal;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray;
+import static net.snowflake.ingest.utils.Utils.concatDotPath;
+import static net.snowflake.ingest.utils.Utils.isNullOrEmpty;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
+import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
@@ -30,12 +40,15 @@
/** Parses a user Iceberg column value into Parquet internal representation for buffering. */
class IcebergParquetValueParser {
+ static final String THREE_LEVEL_MAP_GROUP_NAME = "key_value";
+ static final String THREE_LEVEL_LIST_GROUP_NAME = "list";
+
/**
* Parses a user column value into Parquet internal representation for buffering.
*
* @param value column value provided by user in a row
* @param type Parquet column type
- * @param stats column stats to update
+ * @param statsMap column stats map to update
* @param defaultTimezone default timezone to use for timestamp parsing
* @param insertRowsCurrIndex Row index corresponding the row to parse (w.r.t input rows in
* insertRows API, and not buffered row)
@@ -44,78 +57,116 @@ class IcebergParquetValueParser {
static ParquetBufferValue parseColumnValueToParquet(
Object value,
Type type,
- RowBufferStats stats,
+ Map statsMap,
ZoneId defaultTimezone,
long insertRowsCurrIndex) {
- Utils.assertNotNull("Parquet column stats", stats);
+ Utils.assertNotNull("Parquet column stats map", statsMap);
+ return parseColumnValueToParquet(
+ value, type, statsMap, defaultTimezone, insertRowsCurrIndex, null, false);
+ }
+
+ private static ParquetBufferValue parseColumnValueToParquet(
+ Object value,
+ Type type,
+ Map statsMap,
+ ZoneId defaultTimezone,
+ long insertRowsCurrIndex,
+ String path,
+ boolean isDescendantsOfRepeatingGroup) {
+ path = isNullOrEmpty(path) ? type.getName() : concatDotPath(path, type.getName());
float estimatedParquetSize = 0F;
+
+ if (type.isPrimitive()) {
+ if (!statsMap.containsKey(path)) {
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR, String.format("Stats not found for column: %s", path));
+ }
+ }
+
if (value != null) {
- estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN;
- PrimitiveType primitiveType = type.asPrimitiveType();
- switch (primitiveType.getPrimitiveTypeName()) {
- case BOOLEAN:
- int intValue =
- DataValidationUtil.validateAndParseBoolean(
- type.getName(), value, insertRowsCurrIndex);
- value = intValue > 0;
- stats.addIntValue(BigInteger.valueOf(intValue));
- estimatedParquetSize += ParquetBufferValue.BIT_ENCODING_BYTE_LEN;
- break;
- case INT32:
- int intVal = getInt32Value(value, primitiveType, insertRowsCurrIndex);
- value = intVal;
- stats.addIntValue(BigInteger.valueOf(intVal));
- estimatedParquetSize += 4;
- break;
- case INT64:
- long longVal = getInt64Value(value, primitiveType, defaultTimezone, insertRowsCurrIndex);
- value = longVal;
- stats.addIntValue(BigInteger.valueOf(longVal));
- estimatedParquetSize += 8;
- break;
- case FLOAT:
- float floatVal =
- (float)
- DataValidationUtil.validateAndParseReal(
- type.getName(), value, insertRowsCurrIndex);
- value = floatVal;
- stats.addRealValue((double) floatVal);
- estimatedParquetSize += 4;
- break;
- case DOUBLE:
- double doubleVal =
- DataValidationUtil.validateAndParseReal(type.getName(), value, insertRowsCurrIndex);
- value = doubleVal;
- stats.addRealValue(doubleVal);
- estimatedParquetSize += 8;
- break;
- case BINARY:
- byte[] byteVal = getBinaryValue(value, primitiveType, stats, insertRowsCurrIndex);
- value = byteVal;
- estimatedParquetSize +=
- ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + byteVal.length;
- break;
- case FIXED_LEN_BYTE_ARRAY:
- byte[] fixedLenByteArrayVal =
- getFixedLenByteArrayValue(value, primitiveType, stats, insertRowsCurrIndex);
- value = fixedLenByteArrayVal;
- estimatedParquetSize +=
- ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + fixedLenByteArrayVal.length;
- break;
- default:
- throw new SFException(
- ErrorCode.UNKNOWN_DATA_TYPE,
- type.getLogicalTypeAnnotation(),
- primitiveType.getPrimitiveTypeName());
+ if (type.isPrimitive()) {
+ RowBufferStats stats = statsMap.get(path);
+ estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN;
+ estimatedParquetSize +=
+ isDescendantsOfRepeatingGroup
+ ? ParquetBufferValue.REPETITION_LEVEL_ENCODING_BYTE_LEN
+ : 0;
+ PrimitiveType primitiveType = type.asPrimitiveType();
+ switch (primitiveType.getPrimitiveTypeName()) {
+ case BOOLEAN:
+ int intValue =
+ DataValidationUtil.validateAndParseBoolean(path, value, insertRowsCurrIndex);
+ value = intValue > 0;
+ stats.addIntValue(BigInteger.valueOf(intValue));
+ estimatedParquetSize += ParquetBufferValue.BIT_ENCODING_BYTE_LEN;
+ break;
+ case INT32:
+ int intVal = getInt32Value(value, primitiveType, path, insertRowsCurrIndex);
+ value = intVal;
+ stats.addIntValue(BigInteger.valueOf(intVal));
+ estimatedParquetSize += 4;
+ break;
+ case INT64:
+ long longVal =
+ getInt64Value(value, primitiveType, defaultTimezone, path, insertRowsCurrIndex);
+ value = longVal;
+ stats.addIntValue(BigInteger.valueOf(longVal));
+ estimatedParquetSize += 8;
+ break;
+ case FLOAT:
+ float floatVal =
+ (float) DataValidationUtil.validateAndParseReal(path, value, insertRowsCurrIndex);
+ value = floatVal;
+ stats.addRealValue((double) floatVal);
+ estimatedParquetSize += 4;
+ break;
+ case DOUBLE:
+ double doubleVal =
+ DataValidationUtil.validateAndParseReal(path, value, insertRowsCurrIndex);
+ value = doubleVal;
+ stats.addRealValue(doubleVal);
+ estimatedParquetSize += 8;
+ break;
+ case BINARY:
+ byte[] byteVal = getBinaryValue(value, primitiveType, stats, path, insertRowsCurrIndex);
+ value = byteVal;
+ estimatedParquetSize +=
+ ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + byteVal.length;
+ break;
+ case FIXED_LEN_BYTE_ARRAY:
+ byte[] fixedLenByteArrayVal =
+ getFixedLenByteArrayValue(value, primitiveType, stats, path, insertRowsCurrIndex);
+ value = fixedLenByteArrayVal;
+ estimatedParquetSize +=
+ ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN
+ + fixedLenByteArrayVal.length;
+ break;
+ default:
+ throw new SFException(
+ ErrorCode.UNKNOWN_DATA_TYPE,
+ type.getLogicalTypeAnnotation(),
+ primitiveType.getPrimitiveTypeName());
+ }
+ } else {
+ return getGroupValue(
+ value,
+ type.asGroupType(),
+ statsMap,
+ defaultTimezone,
+ insertRowsCurrIndex,
+ path,
+ isDescendantsOfRepeatingGroup);
}
}
if (value == null) {
if (type.isRepetition(Repetition.REQUIRED)) {
throw new SFException(
- ErrorCode.INVALID_FORMAT_ROW, type.getName(), "Passed null to non nullable field");
+ ErrorCode.INVALID_FORMAT_ROW, path, "Passed null to non nullable field");
+ }
+ if (type.isPrimitive()) {
+ statsMap.get(path).incCurrentNullCount();
}
- stats.incCurrentNullCount();
}
return new ParquetBufferValue(value, estimatedParquetSize);
@@ -126,21 +177,21 @@ static ParquetBufferValue parseColumnValueToParquet(
*
* @param value column value provided by user in a row
* @param type Parquet column type
+ * @param path column path, used for logging
* @param insertRowsCurrIndex Used for logging the row of index given in insertRows API
* @return parsed int32 value
*/
private static int getInt32Value(
- Object value, PrimitiveType type, final long insertRowsCurrIndex) {
+ Object value, PrimitiveType type, String path, final long insertRowsCurrIndex) {
LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
if (logicalTypeAnnotation == null) {
- return DataValidationUtil.validateAndParseIcebergInt(
- type.getName(), value, insertRowsCurrIndex);
+ return DataValidationUtil.validateAndParseIcebergInt(path, value, insertRowsCurrIndex);
}
if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) {
- return getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue().intValue();
+ return getDecimalValue(value, type, path, insertRowsCurrIndex).unscaledValue().intValue();
}
if (logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) {
- return DataValidationUtil.validateAndParseDate(type.getName(), value, insertRowsCurrIndex);
+ return DataValidationUtil.validateAndParseDate(path, value, insertRowsCurrIndex);
}
throw new SFException(
ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getPrimitiveTypeName());
@@ -151,22 +202,26 @@ private static int getInt32Value(
*
* @param value column value provided by user in a row
* @param type Parquet column type
+ * @param path column path, used for logging
* @param insertRowsCurrIndex Used for logging the row of index given in insertRows API
* @return parsed int64 value
*/
private static long getInt64Value(
- Object value, PrimitiveType type, ZoneId defaultTimezone, final long insertRowsCurrIndex) {
+ Object value,
+ PrimitiveType type,
+ ZoneId defaultTimezone,
+ String path,
+ final long insertRowsCurrIndex) {
LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
if (logicalTypeAnnotation == null) {
- return DataValidationUtil.validateAndParseIcebergLong(
- type.getName(), value, insertRowsCurrIndex);
+ return DataValidationUtil.validateAndParseIcebergLong(path, value, insertRowsCurrIndex);
}
if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) {
- return getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue().longValue();
+ return getDecimalValue(value, type, path, insertRowsCurrIndex).unscaledValue().longValue();
}
if (logicalTypeAnnotation instanceof TimeLogicalTypeAnnotation) {
return DataValidationUtil.validateAndParseTime(
- type.getName(),
+ path,
value,
timeUnitToScale(((TimeLogicalTypeAnnotation) logicalTypeAnnotation).getUnit()),
insertRowsCurrIndex)
@@ -197,29 +252,28 @@ private static long getInt64Value(
* @param value value to parse
* @param type Parquet column type
* @param stats column stats to update
+ * @param path column path, used for logging
* @param insertRowsCurrIndex Used for logging the row of index given in insertRows API
* @return string representation
*/
private static byte[] getBinaryValue(
- Object value, PrimitiveType type, RowBufferStats stats, final long insertRowsCurrIndex) {
+ Object value,
+ PrimitiveType type,
+ RowBufferStats stats,
+ String path,
+ final long insertRowsCurrIndex) {
LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
if (logicalTypeAnnotation == null) {
byte[] bytes =
DataValidationUtil.validateAndParseBinary(
- type.getName(),
- value,
- Optional.of(Constants.BINARY_COLUMN_MAX_SIZE),
- insertRowsCurrIndex);
+ path, value, Optional.of(Constants.BINARY_COLUMN_MAX_SIZE), insertRowsCurrIndex);
stats.addBinaryValue(bytes);
return bytes;
}
if (logicalTypeAnnotation instanceof StringLogicalTypeAnnotation) {
String string =
DataValidationUtil.validateAndParseString(
- type.getName(),
- value,
- Optional.of(Constants.VARCHAR_COLUMN_MAX_SIZE),
- insertRowsCurrIndex);
+ path, value, Optional.of(Constants.VARCHAR_COLUMN_MAX_SIZE), insertRowsCurrIndex);
stats.addStrValue(string);
return string.getBytes(StandardCharsets.UTF_8);
}
@@ -233,22 +287,28 @@ private static byte[] getBinaryValue(
* @param value value to parse
* @param type Parquet column type
* @param stats column stats to update
+ * @param path column path, used for logging
* @param insertRowsCurrIndex Used for logging the row of index given in insertRows API
* @return string representation
*/
private static byte[] getFixedLenByteArrayValue(
- Object value, PrimitiveType type, RowBufferStats stats, final long insertRowsCurrIndex) {
+ Object value,
+ PrimitiveType type,
+ RowBufferStats stats,
+ String path,
+ final long insertRowsCurrIndex) {
LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
int length = type.getTypeLength();
byte[] bytes = null;
if (logicalTypeAnnotation == null) {
bytes =
DataValidationUtil.validateAndParseBinary(
- type.getName(), value, Optional.of(length), insertRowsCurrIndex);
+ path, value, Optional.of(length), insertRowsCurrIndex);
stats.addBinaryValue(bytes);
}
if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) {
- BigInteger bigIntegerVal = getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue();
+ BigInteger bigIntegerVal =
+ getDecimalValue(value, type, path, insertRowsCurrIndex).unscaledValue();
stats.addIntValue(bigIntegerVal);
bytes = bigIntegerVal.toByteArray();
if (bytes.length < length) {
@@ -271,15 +331,16 @@ private static byte[] getFixedLenByteArrayValue(
*
* @param value value to parse
* @param type Parquet column type
+ * @param path column path, used for logging
* @param insertRowsCurrIndex Used for logging the row of index given in insertRows API
* @return BigDecimal representation
*/
private static BigDecimal getDecimalValue(
- Object value, PrimitiveType type, final long insertRowsCurrIndex) {
+ Object value, PrimitiveType type, String path, final long insertRowsCurrIndex) {
int scale = ((DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation()).getScale();
int precision = ((DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation()).getPrecision();
BigDecimal bigDecimalValue =
- DataValidationUtil.validateAndParseBigDecimal(type.getName(), value, insertRowsCurrIndex);
+ DataValidationUtil.validateAndParseBigDecimal(path, value, insertRowsCurrIndex);
bigDecimalValue = bigDecimalValue.setScale(scale, RoundingMode.HALF_UP);
DataValidationUtil.checkValueInRange(bigDecimalValue, scale, precision, insertRowsCurrIndex);
return bigDecimalValue;
@@ -298,4 +359,169 @@ private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) {
ErrorCode.INTERNAL_ERROR, String.format("Unknown time unit: %s", timeUnit));
}
}
+
+ /**
+ * Parses a group value based on Parquet group logical type.
+ *
+ * @param value value to parse
+ * @param type Parquet column type
+ * @param statsMap column stats map to update
+ * @param defaultTimezone default timezone to use for timestamp parsing
+ * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API
+ * @param path dot path of the column
+ * @param isDescendantsOfRepeatingGroup true if the column is a descendant of a repeating group,
+ * @return list of parsed values
+ */
+ private static ParquetBufferValue getGroupValue(
+ Object value,
+ GroupType type,
+ Map statsMap,
+ ZoneId defaultTimezone,
+ final long insertRowsCurrIndex,
+ String path,
+ boolean isDescendantsOfRepeatingGroup) {
+ LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+ if (logicalTypeAnnotation == null) {
+ return getStructValue(
+ value,
+ type,
+ statsMap,
+ defaultTimezone,
+ insertRowsCurrIndex,
+ path,
+ isDescendantsOfRepeatingGroup);
+ }
+ if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+ return get3LevelListValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path);
+ }
+ if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
+ return get3LevelMapValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path);
+ }
+ throw new SFException(
+ ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getClass().getSimpleName());
+ }
+
+ /**
+ * Parses a struct value based on Parquet group logical type. The parsed value is a list of
+ * values, where each element represents a field in the group. For example, an input {@code
+ * {"field1": 1, "field2": 2}} will be parsed as {@code [1, 2]}.
+ */
+ private static ParquetBufferValue getStructValue(
+ Object value,
+ GroupType type,
+ Map statsMap,
+ ZoneId defaultTimezone,
+ final long insertRowsCurrIndex,
+ String path,
+ boolean isDescendantsOfRepeatingGroup) {
+ Map structVal =
+ DataValidationUtil.validateAndParseIcebergStruct(path, value, insertRowsCurrIndex);
+ Set extraFields = new HashSet<>(structVal.keySet());
+ List