diff --git a/e2e-jar-test/pom.xml b/e2e-jar-test/pom.xml
index 0d36340f7..325517bba 100644
--- a/e2e-jar-test/pom.xml
+++ b/e2e-jar-test/pom.xml
@@ -29,7 +29,7 @@
net.snowflake
snowflake-ingest-sdk
- 2.2.1-SNAPSHOT
+ 2.2.2
diff --git a/pom.xml b/pom.xml
index b7763631a..243bbce34 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
net.snowflake
snowflake-ingest-sdk
- 2.2.1-SNAPSHOT
+ 2.2.2
jar
Snowflake Ingest SDK
Snowflake Ingest SDK
@@ -784,8 +784,8 @@
true
+ to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
+ the dependency is unused, so we ignore it here-->
org.apache.commons:commons-compress
org.apache.commons:commons-configuration2
@@ -880,9 +880,9 @@
failFast
+ The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
+ check your dependencies", verify the conditions of the license and add the reference to it here.
+ -->
Apache License 2.0
BSD 2-Clause License
@@ -1195,9 +1195,9 @@
+ Plugin executes license processing Python script, which copies third party license files into the directory
+ target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
+ -->
org.codehaus.mojo
exec-maven-plugin
diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
index 25359ea62..0c03e7e40 100644
--- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
+++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
@@ -110,7 +110,7 @@ public class RequestBuilder {
// Don't change!
public static final String CLIENT_NAME = "SnowpipeJavaSDK";
- public static final String DEFAULT_VERSION = "2.2.1-SNAPSHOT";
+ public static final String DEFAULT_VERSION = "2.2.2";
public static final String JAVA_USER_AGENT = "JAVA";
diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java
index 6f0b7b764..e77279ab9 100644
--- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java
+++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java
@@ -31,6 +31,9 @@ public static class Builder {
// Indicates whether it's under test mode
private boolean isTestMode;
+ // Whether we are going to ingest into iceberg tables
+ private boolean isIceberg;
+
private Builder(String name) {
this.name = name;
}
@@ -50,6 +53,12 @@ public Builder setIsTestMode(boolean isTestMode) {
return this;
}
+ // do not make public until the feature is ready
+ Builder setIsIceberg(boolean isIceberg) {
+ this.isIceberg = isIceberg;
+ return this;
+ }
+
public SnowflakeStreamingIngestClient build() {
Utils.assertStringNotNullOrEmpty("client name", this.name);
Utils.assertNotNull("connection properties", this.prop);
@@ -58,7 +67,7 @@ public SnowflakeStreamingIngestClient build() {
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
return new SnowflakeStreamingIngestClientInternal<>(
- this.name, accountURL, prop, this.parameterOverrides, false, this.isTestMode);
+ this.name, accountURL, prop, this.parameterOverrides, this.isIceberg, this.isTestMode);
}
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java
new file mode 100644
index 000000000..8de78460a
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+/**
+ * Class to manage blob path strings that might have an embedded security token if its a presigned
+ * url
+ */
+public class BlobPath {
+ public final String blobPath;
+ public final Boolean hasToken;
+ public final String fileName;
+
+ private BlobPath(String fileName, String blobPath, Boolean hasToken) {
+ this.blobPath = blobPath;
+ this.hasToken = hasToken;
+ this.fileName = fileName;
+ }
+
+ public static BlobPath fileNameWithoutToken(String fileName) {
+ return new BlobPath(fileName, fileName, false);
+ }
+
+ public static BlobPath presignedUrlWithToken(String fileName, String url) {
+ return new BlobPath(fileName, url, true);
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
index ac05c814e..c73337d1e 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
@@ -10,8 +10,6 @@
/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
- private boolean enableParquetInternalBuffering;
-
private long maxChunkSizeInBytes;
private long maxAllowedRowSizeInBytes;
@@ -23,18 +21,14 @@ public class ClientBufferParameters {
/**
* Private constructor used for test methods
*
- * @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
- * enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
*/
private ClientBufferParameters(
- boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
- this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
@@ -43,10 +37,6 @@ private ClientBufferParameters(
/** @param clientInternal reference to the client object where the relevant parameters are set */
public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) {
- this.enableParquetInternalBuffering =
- clientInternal != null
- ? clientInternal.getParameterProvider().getEnableParquetInternalBuffering()
- : ParameterProvider.ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT;
this.maxChunkSizeInBytes =
clientInternal != null
? clientInternal.getParameterProvider().getMaxChunkSizeInBytes()
@@ -67,30 +57,22 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
}
/**
- * @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
- * enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
- boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
return new ClientBufferParameters(
- enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic);
}
- public boolean getEnableParquetInternalBuffering() {
- return enableParquetInternalBuffering;
- }
-
public long getMaxChunkSizeInBytes() {
return maxChunkSizeInBytes;
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java
new file mode 100644
index 000000000..0f1c1a934
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java
@@ -0,0 +1,9 @@
+package net.snowflake.ingest.streaming.internal;
+
+/** Handles uploading files to the Iceberg Table's external volume's table data path */
+class ExternalVolume implements IStorage {
+ @Override
+ public void put(BlobPath blobPath, byte[] blob) {
+ throw new RuntimeException("not implemented");
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java
index 7965c2f8b..3c6bf3f9d 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java
@@ -5,45 +5,37 @@
package net.snowflake.ingest.streaming.internal;
import java.io.IOException;
-import java.util.Calendar;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
-import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.ErrorCode;
+import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
-import net.snowflake.ingest.utils.Utils;
-
-class ExternalVolumeLocation {
- public final String dbName;
- public final String schemaName;
- public final String tableName;
-
- public ExternalVolumeLocation(String dbName, String schemaName, String tableName) {
- this.dbName = dbName;
- this.schemaName = schemaName;
- this.tableName = tableName;
- }
-}
/** Class to manage multiple external volumes */
-class ExternalVolumeManager implements IStorageManager {
+class ExternalVolumeManager implements IStorageManager {
+ // TODO: Rename all logger members to LOGGER and checkin code formatting rules
+ private static final Logging logger = new Logging(ExternalVolumeManager.class);
+
// Reference to the external volume per table
- private final Map> externalVolumeMap;
+ private final Map externalVolumeMap;
// name of the owning client
private final String clientName;
- // role of the owning client
private final String role;
// Reference to the Snowflake service client used for configure calls
- private final SnowflakeServiceClient snowflakeServiceClient;
+ private final SnowflakeServiceClient serviceClient;
// Client prefix generated by the Snowflake server
private final String clientPrefix;
+ // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if
+ // openChannel is called
+ // multiple times concurrently)
+ private final Object registerTableLock = new Object();
+
/**
* Constructor for ExternalVolumeManager
*
@@ -57,20 +49,24 @@ class ExternalVolumeManager implements IStorageManager();
try {
this.clientPrefix =
isTestMode
? "testPrefix"
- : this.snowflakeServiceClient
+ : this.serviceClient
.clientConfigure(new ClientConfigureRequest(role))
.getClientPrefix();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
+
+ logger.logDebug(
+ "Created ExternalVolumeManager with clientName=%s and clientPrefix=%s",
+ clientName, clientPrefix);
}
/**
@@ -80,88 +76,55 @@ class ExternalVolumeManager implements IStorageManager getStorage(
- String fullyQualifiedTableName) {
+ public IStorage getStorage(String fullyQualifiedTableName) {
// Only one chunk per blob in Iceberg mode.
- StreamingIngestStorage stage =
- this.externalVolumeMap.get(fullyQualifiedTableName);
-
- if (stage == null) {
- throw new SFException(
- ErrorCode.INTERNAL_ERROR,
- String.format("No external volume found for table %s", fullyQualifiedTableName));
- }
-
- return stage;
+ return getVolumeSafe(fullyQualifiedTableName);
}
- /**
- * Add a storage to the manager by looking up the table name from the open channel response
- *
- * @param dbName the database name
- * @param schemaName the schema name
- * @param tableName the table name
- * @param fileLocationInfo response from open channel
- */
+ /** Informs the storage manager about a new table that's being ingested into by the client. */
@Override
- public void addStorage(
- String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {
- String fullyQualifiedTableName =
- Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);
-
- try {
- this.externalVolumeMap.put(
- fullyQualifiedTableName,
- new StreamingIngestStorage(
- this,
- this.clientName,
- fileLocationInfo,
- new ExternalVolumeLocation(dbName, schemaName, tableName),
- DEFAULT_MAX_UPLOAD_RETRIES));
- } catch (SnowflakeSQLException | IOException err) {
- throw new SFException(
- err,
- ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
- String.format("fullyQualifiedTableName=%s", fullyQualifiedTableName));
+ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {
+ if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) {
+ logger.logInfo(
+ "Skip registering table since its already been registered with the VolumeManager."
+ + " tableRef=%s",
+ tableRef);
+ return;
}
- }
- /**
- * Gets the latest file location info (with a renewed short-lived access token) for the specified
- * location
- *
- * @param location A reference to the target location
- * @param fileName optional filename for single-file signed URL fetch from server
- * @return the new location information
- */
- @Override
- public FileLocationInfo getRefreshedLocation(
- ExternalVolumeLocation location, Optional fileName) {
- try {
- ChannelConfigureRequest request =
- new ChannelConfigureRequest(
- this.role, location.dbName, location.schemaName, location.tableName);
- fileName.ifPresent(request::setFileName);
- ChannelConfigureResponse response = this.snowflakeServiceClient.channelConfigure(request);
- return response.getStageLocation();
- } catch (IngestResponseException | IOException e) {
- throw new SFException(e, ErrorCode.CHANNEL_CONFIGURE_FAILURE, e.getMessage());
+ // future enhancement - per table locks instead of per-client lock
+ synchronized (registerTableLock) {
+ if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) {
+ logger.logInfo(
+ "Skip registering table since its already been registered with the VolumeManager."
+ + " tableRef=%s",
+ tableRef);
+ return;
+ }
+
+ try {
+ ExternalVolume externalVolume = new ExternalVolume();
+ this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume);
+ } catch (SFException ex) {
+ logger.logError(
+ "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex);
+ // allow external volume ctor's SFExceptions to bubble up directly
+ throw ex;
+ } catch (Exception err) {
+ logger.logError(
+ "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err);
+
+ throw new SFException(
+ err,
+ ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
+ String.format("fullyQualifiedTableName=%s", tableRef));
+ }
}
}
- // TODO: SNOW-1502887 Blob path generation for external volume
@Override
- public String generateBlobPath() {
- return "snow_dummy_file_name.parquet";
- }
-
- // TODO: SNOW-1502887 Blob path generation for iceberg table
- @Override
- public void decrementBlobSequencer() {}
-
- // TODO: SNOW-1502887 Blob path generation for iceberg table
- public String getBlobPath(Calendar calendar, String clientPrefix) {
- return "";
+ public BlobPath generateBlobPath(String fullyQualifiedTableName) {
+ throw new RuntimeException("not implemented");
}
/**
@@ -173,4 +136,16 @@ public String getBlobPath(Calendar calendar, String clientPrefix) {
public String getClientPrefix() {
return this.clientPrefix;
}
+
+ private ExternalVolume getVolumeSafe(String fullyQualifiedTableName) {
+ ExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName);
+
+ if (volume == null) {
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ String.format("No external volume found for tableRef=%s", fullyQualifiedTableName));
+ }
+
+ return volume;
+ }
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
index cb594076f..0cd9d797d 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
@@ -32,6 +32,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
@@ -55,10 +56,6 @@
* @param type of column data ({@link ParquetChunkData})
*/
class FlushService {
-
- // The max number of upload retry attempts to the stage
- private static final int DEFAULT_MAX_UPLOAD_RETRIES = 5;
-
// Static class to save the list of channels that are used to build a blob, which is mainly used
// to invalidate all the channels when there is a failure
static class BlobData {
@@ -97,7 +94,7 @@ List>> getData() {
private final ChannelCache channelCache;
// Reference to the Streaming Ingest storage manager
- private final IStorageManager storageManager;
+ private final IStorageManager storageManager;
// Reference to register service
private final RegisterService registerService;
@@ -132,7 +129,7 @@ List>> getData() {
FlushService(
SnowflakeStreamingIngestClientInternal client,
ChannelCache cache,
- IStorageManager storageManager,
+ IStorageManager storageManager,
boolean isTestMode) {
this.owningClient = client;
this.channelCache = cache;
@@ -402,7 +399,6 @@ void distributeFlushTasks(Set tablesToFlush) {
while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
List>> blobData = new ArrayList<>();
float totalBufferSizeInBytes = 0F;
- final String blobPath = this.storageManager.generateBlobPath();
// Distribute work at table level, split the blob if reaching the blob size limit or the
// channel has different encryption key ids
@@ -416,10 +412,9 @@ void distributeFlushTasks(Set tablesToFlush) {
// Create a new blob if the current one already contains max allowed number of chunks
logger.logInfo(
"Max allowed number of chunks in the current blob reached. chunkCount={}"
- + " maxChunkCount={} currentBlobPath={}",
+ + " maxChunkCount={}",
blobData.size(),
- this.owningClient.getParameterProvider().getMaxChunksInBlob(),
- blobPath);
+ this.owningClient.getParameterProvider().getMaxChunksInBlob());
break;
} else {
ConcurrentHashMap> table =
@@ -480,85 +475,87 @@ && shouldStopProcessing(
}
}
- // Kick off a build job
if (blobData.isEmpty()) {
- // we decrement the blob sequencer so that we do not have gaps in the blob names created by
- // this client.
- this.storageManager.decrementBlobSequencer();
- } else {
- long flushStartMs = System.currentTimeMillis();
- if (this.owningClient.flushLatency != null) {
- latencyTimerContextMap.putIfAbsent(blobPath, this.owningClient.flushLatency.time());
- }
+ continue;
+ }
- // Copy encryptionKeysPerTable from owning client
- Map encryptionKeysPerTable =
- new ConcurrentHashMap<>();
- this.owningClient
- .getEncryptionKeysPerTable()
- .forEach((k, v) -> encryptionKeysPerTable.put(k, new EncryptionKey(v)));
-
- blobs.add(
- new Pair<>(
- new BlobData<>(blobPath, blobData),
- CompletableFuture.supplyAsync(
- () -> {
- try {
- // Get the fully qualified table name from the first channel in the blob.
- // This only matters when the client is in Iceberg mode. In Iceberg mode,
- // all channels in the blob belong to the same table.
- String fullyQualifiedTableName =
- blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName();
- BlobMetadata blobMetadata =
- buildAndUpload(
- blobPath,
- blobData,
- fullyQualifiedTableName,
- encryptionKeysPerTable);
- blobMetadata.getBlobStats().setFlushStartMs(flushStartMs);
- return blobMetadata;
- } catch (Throwable e) {
- Throwable ex = e.getCause() == null ? e : e.getCause();
- String errorMessage =
- String.format(
- "Building blob failed, client=%s, blob=%s, exception=%s,"
- + " detail=%s, trace=%s, all channels in the blob will be"
- + " invalidated",
- this.owningClient.getName(),
- blobPath,
- ex,
- ex.getMessage(),
- getStackTrace(ex));
- logger.logError(errorMessage);
- if (this.owningClient.getTelemetryService() != null) {
- this.owningClient
- .getTelemetryService()
- .reportClientFailure(this.getClass().getSimpleName(), errorMessage);
- }
-
- if (e instanceof IOException) {
- invalidateAllChannelsInBlob(blobData, errorMessage);
- return null;
- } else if (e instanceof NoSuchAlgorithmException) {
- throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE);
- } else if (e instanceof InvalidAlgorithmParameterException
- | e instanceof NoSuchPaddingException
- | e instanceof IllegalBlockSizeException
- | e instanceof BadPaddingException
- | e instanceof InvalidKeyException) {
- throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE);
- } else {
- throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage());
- }
- }
- },
- this.buildUploadWorkers)));
- logger.logInfo(
- "buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}",
- this.owningClient.getName(),
- blobPath,
- this.buildUploadWorkers.toString());
+ // Kick off a build job
+
+ // Get the fully qualified table name from the first channel in the blob.
+ // This only matters when the client is in Iceberg mode. In Iceberg mode,
+ // all channels in the blob belong to the same table.
+ String fullyQualifiedTableName =
+ blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName();
+
+ final BlobPath blobPath = this.storageManager.generateBlobPath(fullyQualifiedTableName);
+
+ long flushStartMs = System.currentTimeMillis();
+ if (this.owningClient.flushLatency != null) {
+ latencyTimerContextMap.putIfAbsent(
+ blobPath.fileName, this.owningClient.flushLatency.time());
}
+
+ // Copy encryptionKeysPerTable from owning client
+ Map encryptionKeysPerTable =
+ new ConcurrentHashMap<>();
+ this.owningClient
+ .getEncryptionKeysPerTable()
+ .forEach((k, v) -> encryptionKeysPerTable.put(k, new EncryptionKey(v)));
+
+ Supplier supplier =
+ () -> {
+ try {
+ BlobMetadata blobMetadata =
+ buildAndUpload(
+ blobPath, blobData, fullyQualifiedTableName, encryptionKeysPerTable);
+ blobMetadata.getBlobStats().setFlushStartMs(flushStartMs);
+ return blobMetadata;
+ } catch (Throwable e) {
+ Throwable ex = e.getCause() == null ? e : e.getCause();
+ String errorMessage =
+ String.format(
+ "Building blob failed, client=%s, blob=%s, exception=%s,"
+ + " detail=%s, trace=%s, all channels in the blob will be"
+ + " invalidated",
+ this.owningClient.getName(),
+ blobPath.fileName,
+ ex,
+ ex.getMessage(),
+ getStackTrace(ex));
+ logger.logError(errorMessage);
+ if (this.owningClient.getTelemetryService() != null) {
+ this.owningClient
+ .getTelemetryService()
+ .reportClientFailure(this.getClass().getSimpleName(), errorMessage);
+ }
+
+ if (e instanceof IOException) {
+ invalidateAllChannelsInBlob(blobData, errorMessage);
+ return null;
+ } else if (e instanceof NoSuchAlgorithmException) {
+ throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE);
+ } else if (e instanceof InvalidAlgorithmParameterException
+ | e instanceof NoSuchPaddingException
+ | e instanceof IllegalBlockSizeException
+ | e instanceof BadPaddingException
+ | e instanceof InvalidKeyException) {
+ throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE);
+ } else {
+ throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+ };
+
+ blobs.add(
+ new Pair<>(
+ new BlobData<>(blobPath.fileName, blobData),
+ CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers)));
+
+ logger.logInfo(
+ "buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}",
+ this.owningClient.getName(),
+ blobPath,
+ this.buildUploadWorkers.toString());
}
// Add the flush task futures to the register service
@@ -600,7 +597,7 @@ private boolean shouldStopProcessing(
* @return BlobMetadata for FlushService.upload
*/
BlobMetadata buildAndUpload(
- String blobPath,
+ BlobPath blobPath,
List>> blobData,
String fullyQualifiedTableName,
Map encryptionKeysPerTable)
@@ -612,7 +609,7 @@ BlobMetadata buildAndUpload(
// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
- blobPath,
+ blobPath.fileName,
blobData,
bdecVersion,
encryptionKeysPerTable,
@@ -639,13 +636,13 @@ BlobMetadata buildAndUpload(
* @return BlobMetadata object used to create the register blob request
*/
BlobMetadata upload(
- StreamingIngestStorage storage,
- String blobPath,
+ IStorage storage,
+ BlobPath blobPath,
byte[] blob,
List metadata,
BlobStats blobStats)
throws NoSuchAlgorithmException {
- logger.logInfo("Start uploading blob={}, size={}", blobPath, blob.length);
+ logger.logInfo("Start uploading blob={}, size={}", blobPath.fileName, blob.length);
long startTime = System.currentTimeMillis();
Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
@@ -661,14 +658,14 @@ BlobMetadata upload(
logger.logInfo(
"Finish uploading blob={}, size={}, timeInMillis={}",
- blobPath,
+ blobPath.fileName,
blob.length,
System.currentTimeMillis() - startTime);
// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
// spans mixed tables or not
return BlobMetadata.createBlobMetadata(
- blobPath,
+ blobPath.fileName,
BlobBuilder.computeMD5(blob),
bdecVersion,
metadata,
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java
new file mode 100644
index 000000000..3a41f38f1
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+/**
+ * Interface that represents a storage location to which we should upload data files. It is the
+ * account's internal stage for snowflake tables, and the table's external volume for iceberg
+ * tables.
+ */
+interface IStorage {
+ /**
+ * Writes out the byte[] to the path passed in.
+ *
+ * @param blobPath
+ * @param blob
+ */
+ void put(BlobPath blobPath, byte[] blob);
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java
index 51f4a82de..edd92f939 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java
@@ -4,15 +4,9 @@
package net.snowflake.ingest.streaming.internal;
-import java.util.Optional;
+/** Interface to manage {@link InternalStage} and {@link ExternalVolume} for {@link FlushService} */
+interface IStorageManager {
-/**
- * Interface to manage {@link StreamingIngestStorage} for {@link FlushService}
- *
- * @param The type of chunk data
- * @param the type of location that's being managed (internal stage / external volume)
- */
-interface IStorageManager {
/** Default max upload retries for streaming ingest storage */
int DEFAULT_MAX_UPLOAD_RETRIES = 5;
@@ -22,41 +16,22 @@ interface IStorageManager {
* @param fullyQualifiedTableName the target fully qualified table name
* @return target stage
*/
- StreamingIngestStorage getStorage(String fullyQualifiedTableName);
+ // TODO: Use TableRef everywhere instead of constructing strings and passing them around
+ // everywhere
+ IStorage getStorage(String fullyQualifiedTableName);
- /**
- * Add a storage to the manager
- *
- * @param dbName the database name
- * @param schemaName the schema name
- * @param tableName the table name
- * @param fileLocationInfo file location info from configure response
- */
- void addStorage(
- String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo);
-
- /**
- * Gets the latest file location info (with a renewed short-lived access token) for the specified
- * location
- *
- * @param location A reference to the target location
- * @param fileName optional filename for single-file signed URL fetch from server
- * @return the new location information
- */
- FileLocationInfo getRefreshedLocation(TLocation location, Optional fileName);
+ /** Informs the storage manager about a new table that's being ingested into by the client. */
+ void registerTable(TableRef tableRef, FileLocationInfo locationInfo);
/**
* Generate a unique blob path and increment the blob sequencer
*
+ * @param fullyQualifiedTableName The table for which the path must be generated
* @return the blob path
*/
- String generateBlobPath();
-
- /**
- * Decrement the blob sequencer, this method is needed to prevent gap between file name sequencer.
- * See {@link IStorageManager#generateBlobPath()} for more details.
- */
- void decrementBlobSequencer();
+ // TODO: Use TableRef everywhere instead of constructing strings and passing them around
+ // everywhere
+ BlobPath generateBlobPath(String fullyQualifiedTableName);
/**
* Get the unique client prefix generated by the Snowflake server
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java
similarity index 78%
rename from src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java
rename to src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java
index 242b5cc43..984201555 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java
@@ -35,7 +35,7 @@
import net.snowflake.ingest.utils.Utils;
/** Handles uploading files to the Snowflake Streaming Ingest Storage */
-class StreamingIngestStorage {
+class InternalStage implements IStorage {
private static final ObjectMapper mapper = new ObjectMapper();
/**
@@ -57,40 +57,10 @@ class StreamingIngestStorage {
private static final Duration refreshDuration = Duration.ofMinutes(58);
private static Instant prevRefresh = Instant.EPOCH;
- private static final Logging logger = new Logging(StreamingIngestStorage.class);
-
- /**
- * Wrapper class containing SnowflakeFileTransferMetadata and the timestamp at which the metadata
- * was refreshed
- */
- static class SnowflakeFileTransferMetadataWithAge {
- SnowflakeFileTransferMetadataV1 fileTransferMetadata;
- private final boolean isLocalFS;
- private final String localLocation;
-
- /* Do not always know the age of the metadata, so we use the empty
- state to record unknown age.
- */
- Optional timestamp;
-
- SnowflakeFileTransferMetadataWithAge(
- SnowflakeFileTransferMetadataV1 fileTransferMetadata, Optional timestamp) {
- this.isLocalFS = false;
- this.fileTransferMetadata = fileTransferMetadata;
- this.timestamp = timestamp;
- this.localLocation = null;
- }
-
- SnowflakeFileTransferMetadataWithAge(String localLocation, Optional timestamp) {
- this.isLocalFS = true;
- this.localLocation = localLocation;
- this.timestamp = timestamp;
- }
- }
+ private static final Logging logger = new Logging(InternalStage.class);
private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;
- private final IStorageManager owningManager;
- private final TLocation location;
+ private final InternalStageManager owningManager;
private final String clientName;
private final int maxUploadRetries;
@@ -104,23 +74,17 @@ state to record unknown age.
* @param owningManager the storage manager owning this storage
* @param clientName The client name
* @param fileLocationInfo The file location information from open channel response
- * @param location A reference to the target location
* @param maxUploadRetries The maximum number of retries to attempt
*/
- StreamingIngestStorage(
- IStorageManager owningManager,
+ InternalStage(
+ InternalStageManager owningManager,
String clientName,
FileLocationInfo fileLocationInfo,
- TLocation location,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
- this(
- owningManager,
- clientName,
- (SnowflakeFileTransferMetadataWithAge) null,
- location,
- maxUploadRetries);
- createFileTransferMetadataWithAge(fileLocationInfo);
+ this(owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries);
+ Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix());
+ this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo);
}
/**
@@ -129,35 +93,21 @@ state to record unknown age.
* @param owningManager the storage manager owning this storage
* @param clientName the client name
* @param testMetadata SnowflakeFileTransferMetadataWithAge to test with
- * @param location A reference to the target location
* @param maxUploadRetries the maximum number of retries to attempt
*/
- StreamingIngestStorage(
- IStorageManager owningManager,
+ InternalStage(
+ InternalStageManager owningManager,
String clientName,
SnowflakeFileTransferMetadataWithAge testMetadata,
- TLocation location,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
this.owningManager = owningManager;
this.clientName = clientName;
this.maxUploadRetries = maxUploadRetries;
this.proxyProperties = generateProxyPropertiesForJDBC();
- this.location = location;
this.fileTransferMetadataWithAge = testMetadata;
}
- /**
- * Upload file to internal stage with previously cached credentials. Will refetch and cache
- * credentials if they've expired.
- *
- * @param fullFilePath Full file name to be uploaded
- * @param data Data string to be uploaded
- */
- void putRemote(String fullFilePath, byte[] data) throws SnowflakeSQLException, IOException {
- this.putRemote(fullFilePath, data, 0);
- }
-
private void putRemote(String fullFilePath, byte[] data, int retryCount)
throws SnowflakeSQLException, IOException {
SnowflakeFileTransferMetadataV1 fileTransferMetadataCopy;
@@ -194,7 +144,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
// Proactively refresh the credential if it's going to expire, to avoid the token expiration
// error from JDBC which confuses customer
if (Instant.now().isAfter(prevRefresh.plus(refreshDuration))) {
- refreshSnowflakeMetadata();
+ refreshSnowflakeMetadata(false /* force */);
}
SnowflakeFileTransferAgent.uploadWithoutConnection(
@@ -211,7 +161,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
} catch (Exception e) {
if (retryCount == 0) {
// for the first exception, we always perform a metadata refresh.
- this.refreshSnowflakeMetadata();
+ this.refreshSnowflakeMetadata(false /* force */);
}
if (retryCount >= maxUploadRetries) {
logger.logError(
@@ -233,12 +183,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
}
}
- SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata()
- throws SnowflakeSQLException, IOException {
- logger.logInfo("Refresh Snowflake metadata, client={}", clientName);
- return refreshSnowflakeMetadata(false);
- }
-
/**
* Gets new stage credentials and other metadata from Snowflake. Synchronized to prevent multiple
* calls to putRemote from trying to refresh at the same time
@@ -250,6 +194,8 @@ SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata()
*/
synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boolean force)
throws SnowflakeSQLException, IOException {
+ logger.logInfo("Refresh Snowflake metadata, client={} force={}", clientName, force);
+
if (!force
&& fileTransferMetadataWithAge != null
&& fileTransferMetadataWithAge.timestamp.isPresent()
@@ -258,24 +204,25 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole
return fileTransferMetadataWithAge;
}
- FileLocationInfo location =
- this.owningManager.getRefreshedLocation(this.location, Optional.empty());
- return createFileTransferMetadataWithAge(location);
+ FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.empty());
+ SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location);
+ this.fileTransferMetadataWithAge = metadata;
+ return metadata;
}
- private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
+ static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
FileLocationInfo fileLocationInfo)
throws JsonProcessingException,
net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException,
SnowflakeSQLException {
- Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix());
+ final SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;
if (fileLocationInfo
.getLocationType()
.replaceAll(
"^[\"]|[\"]$", "") // Replace the first and last character if they're double quotes
.equals(StageInfo.StageType.LOCAL_FS.name())) {
- this.fileTransferMetadataWithAge =
+ fileTransferMetadataWithAge =
new SnowflakeFileTransferMetadataWithAge(
fileLocationInfo
.getLocation()
@@ -284,7 +231,7 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
""), // Replace the first and last character if they're double quotes
Optional.of(System.currentTimeMillis()));
} else {
- this.fileTransferMetadataWithAge =
+ fileTransferMetadataWithAge =
new SnowflakeFileTransferMetadataWithAge(
(SnowflakeFileTransferMetadataV1)
SnowflakeFileTransferAgent.getFileTransferMetadatas(
@@ -293,8 +240,20 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
Optional.of(System.currentTimeMillis()));
}
+ /*
+ this is not used thus commented out, but technically we are not copying over some info from fileLocationInfo
+ to fileTransferMetadata.
+
+ String presignedUrl = fileLocationInfo.getPresignedUrl();
+ if (presignedUrl != null) {
+ fileTransferMetadataWithAge.fileTransferMetadata.setPresignedUrl(presignedUrl);
+ String[] parts = presignedUrl.split("/");
+ fileTransferMetadataWithAge.fileTransferMetadata.setPresignedUrlFileName(parts[parts.length - 1]);
+ }
+ */
+
prevRefresh = Instant.now();
- return this.fileTransferMetadataWithAge;
+ return fileTransferMetadataWithAge;
}
/**
@@ -306,8 +265,7 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)
throws SnowflakeSQLException, IOException {
- FileLocationInfo location =
- this.owningManager.getRefreshedLocation(this.location, Optional.of(fileName));
+ FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.of(fileName));
SnowflakeFileTransferMetadataV1 metadata =
(SnowflakeFileTransferMetadataV1)
@@ -318,7 +276,7 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)
return metadata;
}
- private net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode
+ static net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode
parseFileLocationInfo(FileLocationInfo fileLocationInfo)
throws JsonProcessingException,
net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException {
@@ -341,18 +299,14 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)
return parseConfigureResponseMapper.readTree(responseString);
}
- /**
- * Upload file to internal stage
- *
- * @param filePath
- * @param blob
- */
- void put(String filePath, byte[] blob) {
+ /** Upload file to internal stage */
+ public void put(BlobPath blobPath, byte[] blob) {
+ String filePath = blobPath.fileName;
if (this.isLocalFS()) {
- putLocal(filePath, blob);
+ putLocal(this.fileTransferMetadataWithAge.localLocation, filePath, blob);
} else {
try {
- putRemote(filePath, blob);
+ putRemote(filePath, blob, 0);
} catch (SnowflakeSQLException | IOException e) {
throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE);
}
@@ -370,14 +324,13 @@ boolean isLocalFS() {
* @param data
*/
@VisibleForTesting
- void putLocal(String fullFilePath, byte[] data) {
+ static void putLocal(String stageLocation, String fullFilePath, byte[] data) {
if (fullFilePath == null || fullFilePath.isEmpty() || fullFilePath.endsWith("/")) {
throw new SFException(ErrorCode.BLOB_UPLOAD_FAILURE);
}
InputStream input = new ByteArrayInputStream(data);
try {
- String stageLocation = this.fileTransferMetadataWithAge.localLocation;
File destFile = Paths.get(stageLocation, fullFilePath).toFile();
FileUtils.copyInputStreamToFile(input, destFile);
} catch (Exception ex) {
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java
index d33a80738..14ca18822 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java
@@ -19,14 +19,10 @@
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
-class InternalStageLocation {
- public InternalStageLocation() {}
-}
-
/** Class to manage single Snowflake internal stage */
-class InternalStageManager implements IStorageManager {
+class InternalStageManager implements IStorageManager {
/** Target stage for the client */
- private final StreamingIngestStorage targetStage;
+ private final InternalStage targetStage;
/** Increasing counter to generate a unique blob name per client */
private final AtomicLong counter;
@@ -74,21 +70,16 @@ class InternalStageManager implements IStorageManager(
- this,
- clientName,
- response.getStageLocation(),
- new InternalStageLocation(),
- DEFAULT_MAX_UPLOAD_RETRIES);
+ new InternalStage(
+ this, clientName, response.getStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES);
} else {
this.clientPrefix = null;
this.deploymentId = null;
this.targetStage =
- new StreamingIngestStorage(
+ new InternalStage(
this,
"testClient",
- (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null,
- new InternalStageLocation(),
+ (SnowflakeFileTransferMetadataWithAge) null,
DEFAULT_MAX_UPLOAD_RETRIES);
}
} catch (IngestResponseException | IOException e) {
@@ -107,28 +98,26 @@ class InternalStageManager implements IStorageManager getStorage(
- String fullyQualifiedTableName) {
+ public InternalStage getStorage(String fullyQualifiedTableName) {
// There's always only one stage for the client in non-iceberg mode
return targetStage;
}
- /** Add storage to the manager. Do nothing as there's only one stage in non-Iceberg mode. */
+ /**
+ * Informs the storage manager about a new table that's being ingested into by the client. Do
+ * nothing as there's no per-table state yet for FDN tables (that use internal stages).
+ */
@Override
- public void addStorage(
- String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {}
+ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {}
/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
*
- * @param location A reference to the target location
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
- @Override
- public FileLocationInfo getRefreshedLocation(
- InternalStageLocation location, Optional fileName) {
+ FileLocationInfo getRefreshedLocation(Optional fileName) {
try {
ClientConfigureRequest request = new ClientConfigureRequest(this.role);
fileName.ifPresent(request::setFileName);
@@ -157,19 +146,19 @@ public FileLocationInfo getRefreshedLocation(
* @return the generated blob file path
*/
@Override
- public String generateBlobPath() {
+ public BlobPath generateBlobPath(String fullyQualifiedTableName) {
+ // the table name argument is not going to be used in internal stages since we don't have per
+ // table paths.
+ // For external volumes (in iceberg), the blob path has a per-table element in it, thus the
+ // other implementation
+ // of IStorageManager does end up using this argument.
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- return getBlobPath(calendar, this.clientPrefix);
- }
-
- @Override
- public void decrementBlobSequencer() {
- this.counter.decrementAndGet();
+ return BlobPath.fileNameWithoutToken(getNextFileName(calendar, this.clientPrefix));
}
/** For TESTING */
@VisibleForTesting
- public String getBlobPath(Calendar calendar, String clientPrefix) {
+ public String getNextFileName(Calendar calendar, String clientPrefix) {
if (this.isTestMode && clientPrefix == null) {
clientPrefix = "testPrefix";
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java
index 9950c44aa..e3fe97dfb 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java
@@ -4,37 +4,24 @@
package net.snowflake.ingest.streaming.internal;
-import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.parquet.hadoop.BdecParquetWriter;
/** Parquet data holder to buffer rows. */
public class ParquetChunkData {
// buffered rows serialized into Java objects. Needed for the Parquet w/o memory optimization.
final List> rows;
-
- final BdecParquetWriter parquetWriter;
- final ByteArrayOutputStream output;
final Map metadata;
/**
* Construct parquet data chunk.
*
* @param rows buffered row data as a list
- * @param parquetWriter buffered parquet row data
- * @param output byte array file output
* @param metadata chunk metadata
*/
- public ParquetChunkData(
- List> rows,
- BdecParquetWriter parquetWriter,
- ByteArrayOutputStream output,
- Map metadata) {
+ public ParquetChunkData(List> rows, Map metadata) {
this.rows = rows;
- this.parquetWriter = parquetWriter;
- this.output = output;
// create a defensive copy of the parameter map because the argument map passed here
// may currently be shared across multiple threads.
this.metadata = createDefensiveCopy(metadata);
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
index d338a6a7b..ddfca4a42 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
@@ -15,7 +15,6 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
-import org.apache.parquet.hadoop.BdecParquetReader;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;
@@ -26,22 +25,16 @@
public class ParquetFlusher implements Flusher {
private static final Logging logger = new Logging(ParquetFlusher.class);
private final MessageType schema;
- private final boolean enableParquetInternalBuffering;
private final long maxChunkSizeInBytes;
private final Constants.BdecParquetCompression bdecParquetCompression;
- /**
- * Construct parquet flusher from its schema and set flag that indicates whether Parquet memory
- * optimization is enabled, i.e. rows will be buffered in internal Parquet buffer.
- */
+ /** Construct parquet flusher from its schema. */
public ParquetFlusher(
MessageType schema,
- boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
this.schema = schema;
- this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
}
@@ -50,97 +43,9 @@ public ParquetFlusher(
public SerializationResult serialize(
List> channelsDataPerTable, String filePath)
throws IOException {
- if (enableParquetInternalBuffering) {
- return serializeFromParquetWriteBuffers(channelsDataPerTable, filePath);
- }
return serializeFromJavaObjects(channelsDataPerTable, filePath);
}
- private SerializationResult serializeFromParquetWriteBuffers(
- List> channelsDataPerTable, String filePath)
- throws IOException {
- List channelsMetadataList = new ArrayList<>();
- long rowCount = 0L;
- float chunkEstimatedUncompressedSize = 0f;
- String firstChannelFullyQualifiedTableName = null;
- Map columnEpStatsMapCombined = null;
- BdecParquetWriter mergedChannelWriter = null;
- ByteArrayOutputStream mergedChunkData = new ByteArrayOutputStream();
- Pair chunkMinMaxInsertTimeInMs = null;
-
- for (ChannelData data : channelsDataPerTable) {
- // Create channel metadata
- ChannelMetadata channelMetadata =
- ChannelMetadata.builder()
- .setOwningChannelFromContext(data.getChannelContext())
- .setRowSequencer(data.getRowSequencer())
- .setOffsetToken(data.getEndOffsetToken())
- .setStartOffsetToken(data.getStartOffsetToken())
- .build();
- // Add channel metadata to the metadata list
- channelsMetadataList.add(channelMetadata);
-
- logger.logDebug(
- "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}",
- data.getChannelContext().getFullyQualifiedName(),
- data.getRowCount(),
- data.getBufferSize(),
- filePath);
-
- if (mergedChannelWriter == null) {
- columnEpStatsMapCombined = data.getColumnEps();
- mergedChannelWriter = data.getVectors().parquetWriter;
- mergedChunkData = data.getVectors().output;
- firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
- chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
- } else {
- // This method assumes that channelsDataPerTable is grouped by table. We double check
- // here and throw an error if the assumption is violated
- if (!data.getChannelContext()
- .getFullyQualifiedTableName()
- .equals(firstChannelFullyQualifiedTableName)) {
- throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK);
- }
-
- columnEpStatsMapCombined =
- ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps());
- data.getVectors().parquetWriter.close();
- BdecParquetReader.readFileIntoWriter(
- data.getVectors().output.toByteArray(), mergedChannelWriter);
- chunkMinMaxInsertTimeInMs =
- ChannelData.getCombinedMinMaxInsertTimeInMs(
- chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
- }
-
- rowCount += data.getRowCount();
- chunkEstimatedUncompressedSize += data.getBufferSize();
-
- logger.logDebug(
- "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
- data.getChannelContext().getFullyQualifiedName(),
- data.getRowCount(),
- data.getBufferSize(),
- filePath);
- }
-
- if (mergedChannelWriter != null) {
- mergedChannelWriter.close();
- this.verifyRowCounts(
- "serializeFromParquetWriteBuffers",
- mergedChannelWriter,
- rowCount,
- channelsDataPerTable,
- -1);
- }
- return new SerializationResult(
- channelsMetadataList,
- columnEpStatsMapCombined,
- rowCount,
- chunkEstimatedUncompressedSize,
- mergedChunkData,
- chunkMinMaxInsertTimeInMs);
- }
-
private SerializationResult serializeFromJavaObjects(
List> channelsDataPerTable, String filePath)
throws IOException {
@@ -167,13 +72,11 @@ private SerializationResult serializeFromJavaObjects(
channelsMetadataList.add(channelMetadata);
logger.logDebug(
- "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={},"
- + " enableParquetMemoryOptimization={}",
+ "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
- filePath,
- enableParquetInternalBuffering);
+ filePath);
if (rows == null) {
columnEpStatsMapCombined = data.getColumnEps();
@@ -181,7 +84,7 @@ private SerializationResult serializeFromJavaObjects(
firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
} else {
- // This method assumes that channelsDataPerTable is grouped by table. We double check
+ // This method assumes that channelsDataPerTable is grouped by table. We double-check
// here and throw an error if the assumption is violated
if (!data.getChannelContext()
.getFullyQualifiedTableName()
@@ -202,13 +105,11 @@ private SerializationResult serializeFromJavaObjects(
chunkEstimatedUncompressedSize += data.getBufferSize();
logger.logDebug(
- "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={},"
- + " enableParquetMemoryOptimization={}",
+ "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
- filePath,
- enableParquetInternalBuffering);
+ filePath);
}
Map metadata = channelsDataPerTable.get(0).getVectors().metadata;
@@ -227,8 +128,7 @@ private SerializationResult serializeFromJavaObjects(
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();
- this.verifyRowCounts(
- "serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size());
+ this.verifyRowCounts(parquetWriter, rowCount, channelsDataPerTable, rows.size());
return new SerializationResult(
channelsMetadataList,
@@ -243,7 +143,6 @@ private SerializationResult serializeFromJavaObjects(
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
*
- * @param serializationType Serialization type, used for logging purposes only
* @param writer Parquet writer writing the data
* @param channelsDataPerTable Channel data
* @param totalMetadataRowCount Row count calculated during metadata collection
@@ -251,7 +150,6 @@ private SerializationResult serializeFromJavaObjects(
* Used only for logging purposes if there is a mismatch.
*/
private void verifyRowCounts(
- String serializationType,
BdecParquetWriter writer,
long totalMetadataRowCount,
List> channelsDataPerTable,
@@ -285,7 +183,7 @@ private void verifyRowCounts(
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
- "[%s]The number of rows in Parquet does not match the number of rows in metadata. "
+ "The number of rows in Parquet does not match the number of rows in metadata. "
+ "parquetTotalRowsInFooter=%d "
+ "totalMetadataRowCount=%d "
+ "parquetTotalRowsWritten=%d "
@@ -294,7 +192,6 @@ private void verifyRowCounts(
+ "channelsCountInMetadata=%d "
+ "countOfSerializedJavaObjects=%d "
+ "channelNames=%s",
- serializationType,
parquetTotalRowsInFooter,
totalMetadataRowCount,
parquetTotalRowsWritten,
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
index 30851c274..5ada286e5 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
@@ -4,8 +4,6 @@
package net.snowflake.ingest.streaming.internal;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
@@ -24,7 +22,6 @@
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
-import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
@@ -43,11 +40,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer {
/* Unflushed rows as Java objects. Needed for the Parquet w/o memory optimization. */
private final List> data;
-
- /* BDEC Parquet writer. It is used to buffer unflushed data in Parquet internal buffers instead of using Java objects */
- private BdecParquetWriter bdecParquetWriter;
-
- private ByteArrayOutputStream fileOutput;
private final List> tempData;
private MessageType schema;
@@ -111,33 +103,10 @@ public void setupSchema(List columns) {
id++;
}
schema = new MessageType(PARQUET_MESSAGE_TYPE_NAME, parquetTypes);
- createFileWriter();
tempData.clear();
data.clear();
}
- /** Create BDEC file writer if Parquet memory optimization is enabled. */
- private void createFileWriter() {
- fileOutput = new ByteArrayOutputStream();
- try {
- if (clientBufferParameters.getEnableParquetInternalBuffering()) {
- bdecParquetWriter =
- new BdecParquetWriter(
- fileOutput,
- schema,
- metadata,
- channelFullyQualifiedName,
- clientBufferParameters.getMaxChunkSizeInBytes(),
- clientBufferParameters.getBdecParquetCompression());
- } else {
- this.bdecParquetWriter = null;
- }
- data.clear();
- } catch (IOException e) {
- throw new SFException(ErrorCode.INTERNAL_ERROR, "cannot create parquet writer", e);
- }
- }
-
@Override
boolean hasColumn(String name) {
return fieldIndex.containsKey(name);
@@ -154,11 +123,7 @@ float addRow(
}
void writeRow(List