From d69adfdade3ebde7e73301744f0286fe09713fb6 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Thu, 12 Sep 2024 12:08:24 -0700 Subject: [PATCH 1/3] V2.2.2 Release (#826) V2.2.2 Release --- e2e-jar-test/pom.xml | 2 +- pom.xml | 18 +++++++++--------- .../ingest/connection/RequestBuilder.java | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/e2e-jar-test/pom.xml b/e2e-jar-test/pom.xml index 0d36340f7..325517bba 100644 --- a/e2e-jar-test/pom.xml +++ b/e2e-jar-test/pom.xml @@ -29,7 +29,7 @@ net.snowflake snowflake-ingest-sdk - 2.2.1-SNAPSHOT + 2.2.2 diff --git a/pom.xml b/pom.xml index b7763631a..243bbce34 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.snowflake snowflake-ingest-sdk - 2.2.1-SNAPSHOT + 2.2.2 jar Snowflake Ingest SDK Snowflake Ingest SDK @@ -784,8 +784,8 @@ true + to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that + the dependency is unused, so we ignore it here--> org.apache.commons:commons-compress org.apache.commons:commons-configuration2 @@ -880,9 +880,9 @@ failFast + The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please + check your dependencies", verify the conditions of the license and add the reference to it here. + --> Apache License 2.0 BSD 2-Clause License @@ -1195,9 +1195,9 @@ + Plugin executes license processing Python script, which copies third party license files into the directory + target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR. + --> org.codehaus.mojo exec-maven-plugin diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java index 25359ea62..0c03e7e40 100644 --- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java +++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java @@ -110,7 +110,7 @@ public class RequestBuilder { // Don't change! public static final String CLIENT_NAME = "SnowpipeJavaSDK"; - public static final String DEFAULT_VERSION = "2.2.1-SNAPSHOT"; + public static final String DEFAULT_VERSION = "2.2.2"; public static final String JAVA_USER_AGENT = "JAVA"; From c063036af1bbb74129066a8f835374be8564248f Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Sat, 14 Sep 2024 23:09:23 -0700 Subject: [PATCH 2/3] Iceberg Ingestion in CloudStorage mode - Carve out IStorage and ExternalVolume from InternalStage (#828) * carve out IStorage and ExternalVolume * fix formatting * early exit in some testcases, to be reenabled in the next PR when ExternalVolume is filled in * minor PR feedback, retrigger snyk merge gate too --- ...SnowflakeStreamingIngestClientFactory.java | 11 +- .../ingest/streaming/internal/BlobPath.java | 29 +++ .../streaming/internal/ExternalVolume.java | 9 + .../internal/ExternalVolumeManager.java | 165 ++++++++---------- .../streaming/internal/FlushService.java | 165 +++++++++--------- .../ingest/streaming/internal/IStorage.java | 20 +++ .../streaming/internal/IStorageManager.java | 47 ++--- ...gIngestStorage.java => InternalStage.java} | 133 +++++--------- .../internal/InternalStageManager.java | 53 +++--- .../SnowflakeFileTransferMetadataWithAge.java | 38 ++++ ...nowflakeStreamingIngestClientInternal.java | 12 +- .../ingest/streaming/internal/TableRef.java | 56 ++++++ .../streaming/internal/FlushServiceTest.java | 60 ++++--- ...torageTest.java => InternalStageTest.java} | 106 +++++------ 14 files changed, 478 insertions(+), 426 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java rename src/main/java/net/snowflake/ingest/streaming/internal/{StreamingIngestStorage.java => InternalStage.java} (78%) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeFileTransferMetadataWithAge.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/TableRef.java rename src/test/java/net/snowflake/ingest/streaming/internal/{StreamingIngestStorageTest.java => InternalStageTest.java} (91%) diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index 6f0b7b764..e77279ab9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -31,6 +31,9 @@ public static class Builder { // Indicates whether it's under test mode private boolean isTestMode; + // Whether we are going to ingest into iceberg tables + private boolean isIceberg; + private Builder(String name) { this.name = name; } @@ -50,6 +53,12 @@ public Builder setIsTestMode(boolean isTestMode) { return this; } + // do not make public until the feature is ready + Builder setIsIceberg(boolean isIceberg) { + this.isIceberg = isIceberg; + return this; + } + public SnowflakeStreamingIngestClient build() { Utils.assertStringNotNullOrEmpty("client name", this.name); Utils.assertNotNull("connection properties", this.prop); @@ -58,7 +67,7 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, accountURL, prop, this.parameterOverrides, false, this.isTestMode); + this.name, accountURL, prop, this.parameterOverrides, this.isIceberg, this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java new file mode 100644 index 000000000..8de78460a --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** + * Class to manage blob path strings that might have an embedded security token if its a presigned + * url + */ +public class BlobPath { + public final String blobPath; + public final Boolean hasToken; + public final String fileName; + + private BlobPath(String fileName, String blobPath, Boolean hasToken) { + this.blobPath = blobPath; + this.hasToken = hasToken; + this.fileName = fileName; + } + + public static BlobPath fileNameWithoutToken(String fileName) { + return new BlobPath(fileName, fileName, false); + } + + public static BlobPath presignedUrlWithToken(String fileName, String url) { + return new BlobPath(fileName, url, true); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java new file mode 100644 index 000000000..0f1c1a934 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java @@ -0,0 +1,9 @@ +package net.snowflake.ingest.streaming.internal; + +/** Handles uploading files to the Iceberg Table's external volume's table data path */ +class ExternalVolume implements IStorage { + @Override + public void put(BlobPath blobPath, byte[] blob) { + throw new RuntimeException("not implemented"); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java index 7965c2f8b..3c6bf3f9d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java @@ -5,45 +5,37 @@ package net.snowflake.ingest.streaming.internal; import java.io.IOException; -import java.util.Calendar; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; -import net.snowflake.ingest.utils.Utils; - -class ExternalVolumeLocation { - public final String dbName; - public final String schemaName; - public final String tableName; - - public ExternalVolumeLocation(String dbName, String schemaName, String tableName) { - this.dbName = dbName; - this.schemaName = schemaName; - this.tableName = tableName; - } -} /** Class to manage multiple external volumes */ -class ExternalVolumeManager implements IStorageManager { +class ExternalVolumeManager implements IStorageManager { + // TODO: Rename all logger members to LOGGER and checkin code formatting rules + private static final Logging logger = new Logging(ExternalVolumeManager.class); + // Reference to the external volume per table - private final Map> externalVolumeMap; + private final Map externalVolumeMap; // name of the owning client private final String clientName; - // role of the owning client private final String role; // Reference to the Snowflake service client used for configure calls - private final SnowflakeServiceClient snowflakeServiceClient; + private final SnowflakeServiceClient serviceClient; // Client prefix generated by the Snowflake server private final String clientPrefix; + // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if + // openChannel is called + // multiple times concurrently) + private final Object registerTableLock = new Object(); + /** * Constructor for ExternalVolumeManager * @@ -57,20 +49,24 @@ class ExternalVolumeManager implements IStorageManager(); try { this.clientPrefix = isTestMode ? "testPrefix" - : this.snowflakeServiceClient + : this.serviceClient .clientConfigure(new ClientConfigureRequest(role)) .getClientPrefix(); } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } + + logger.logDebug( + "Created ExternalVolumeManager with clientName=%s and clientPrefix=%s", + clientName, clientPrefix); } /** @@ -80,88 +76,55 @@ class ExternalVolumeManager implements IStorageManager getStorage( - String fullyQualifiedTableName) { + public IStorage getStorage(String fullyQualifiedTableName) { // Only one chunk per blob in Iceberg mode. - StreamingIngestStorage stage = - this.externalVolumeMap.get(fullyQualifiedTableName); - - if (stage == null) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - String.format("No external volume found for table %s", fullyQualifiedTableName)); - } - - return stage; + return getVolumeSafe(fullyQualifiedTableName); } - /** - * Add a storage to the manager by looking up the table name from the open channel response - * - * @param dbName the database name - * @param schemaName the schema name - * @param tableName the table name - * @param fileLocationInfo response from open channel - */ + /** Informs the storage manager about a new table that's being ingested into by the client. */ @Override - public void addStorage( - String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) { - String fullyQualifiedTableName = - Utils.getFullyQualifiedTableName(dbName, schemaName, tableName); - - try { - this.externalVolumeMap.put( - fullyQualifiedTableName, - new StreamingIngestStorage( - this, - this.clientName, - fileLocationInfo, - new ExternalVolumeLocation(dbName, schemaName, tableName), - DEFAULT_MAX_UPLOAD_RETRIES)); - } catch (SnowflakeSQLException | IOException err) { - throw new SFException( - err, - ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, - String.format("fullyQualifiedTableName=%s", fullyQualifiedTableName)); + public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { + if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { + logger.logInfo( + "Skip registering table since its already been registered with the VolumeManager." + + " tableRef=%s", + tableRef); + return; } - } - /** - * Gets the latest file location info (with a renewed short-lived access token) for the specified - * location - * - * @param location A reference to the target location - * @param fileName optional filename for single-file signed URL fetch from server - * @return the new location information - */ - @Override - public FileLocationInfo getRefreshedLocation( - ExternalVolumeLocation location, Optional fileName) { - try { - ChannelConfigureRequest request = - new ChannelConfigureRequest( - this.role, location.dbName, location.schemaName, location.tableName); - fileName.ifPresent(request::setFileName); - ChannelConfigureResponse response = this.snowflakeServiceClient.channelConfigure(request); - return response.getStageLocation(); - } catch (IngestResponseException | IOException e) { - throw new SFException(e, ErrorCode.CHANNEL_CONFIGURE_FAILURE, e.getMessage()); + // future enhancement - per table locks instead of per-client lock + synchronized (registerTableLock) { + if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { + logger.logInfo( + "Skip registering table since its already been registered with the VolumeManager." + + " tableRef=%s", + tableRef); + return; + } + + try { + ExternalVolume externalVolume = new ExternalVolume(); + this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume); + } catch (SFException ex) { + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); + // allow external volume ctor's SFExceptions to bubble up directly + throw ex; + } catch (Exception err) { + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); + + throw new SFException( + err, + ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, + String.format("fullyQualifiedTableName=%s", tableRef)); + } } } - // TODO: SNOW-1502887 Blob path generation for external volume @Override - public String generateBlobPath() { - return "snow_dummy_file_name.parquet"; - } - - // TODO: SNOW-1502887 Blob path generation for iceberg table - @Override - public void decrementBlobSequencer() {} - - // TODO: SNOW-1502887 Blob path generation for iceberg table - public String getBlobPath(Calendar calendar, String clientPrefix) { - return ""; + public BlobPath generateBlobPath(String fullyQualifiedTableName) { + throw new RuntimeException("not implemented"); } /** @@ -173,4 +136,16 @@ public String getBlobPath(Calendar calendar, String clientPrefix) { public String getClientPrefix() { return this.clientPrefix; } + + private ExternalVolume getVolumeSafe(String fullyQualifiedTableName) { + ExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName); + + if (volume == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("No external volume found for tableRef=%s", fullyQualifiedTableName)); + } + + return volume; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 2e7c6507c..75bbb6bd8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; @@ -55,10 +56,6 @@ * @param type of column data ({@link ParquetChunkData}) */ class FlushService { - - // The max number of upload retry attempts to the stage - private static final int DEFAULT_MAX_UPLOAD_RETRIES = 5; - // Static class to save the list of channels that are used to build a blob, which is mainly used // to invalidate all the channels when there is a failure static class BlobData { @@ -97,7 +94,7 @@ List>> getData() { private final ChannelCache channelCache; // Reference to the Streaming Ingest storage manager - private final IStorageManager storageManager; + private final IStorageManager storageManager; // Reference to register service private final RegisterService registerService; @@ -132,7 +129,7 @@ List>> getData() { FlushService( SnowflakeStreamingIngestClientInternal client, ChannelCache cache, - IStorageManager storageManager, + IStorageManager storageManager, boolean isTestMode) { this.owningClient = client; this.channelCache = cache; @@ -402,7 +399,6 @@ void distributeFlushTasks(Set tablesToFlush) { while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) { List>> blobData = new ArrayList<>(); float totalBufferSizeInBytes = 0F; - final String blobPath = this.storageManager.generateBlobPath(); // Distribute work at table level, split the blob if reaching the blob size limit or the // channel has different encryption key ids @@ -416,10 +412,9 @@ void distributeFlushTasks(Set tablesToFlush) { // Create a new blob if the current one already contains max allowed number of chunks logger.logInfo( "Max allowed number of chunks in the current blob reached. chunkCount={}" - + " maxChunkCount={} currentBlobPath={}", + + " maxChunkCount={}", blobData.size(), - this.owningClient.getParameterProvider().getMaxChunksInBlob(), - blobPath); + this.owningClient.getParameterProvider().getMaxChunksInBlob()); break; } else { ConcurrentHashMap> table = @@ -480,73 +475,79 @@ && shouldStopProcessing( } } - // Kick off a build job if (blobData.isEmpty()) { - // we decrement the blob sequencer so that we do not have gaps in the blob names created by - // this client. - this.storageManager.decrementBlobSequencer(); - } else { - long flushStartMs = System.currentTimeMillis(); - if (this.owningClient.flushLatency != null) { - latencyTimerContextMap.putIfAbsent(blobPath, this.owningClient.flushLatency.time()); - } - blobs.add( - new Pair<>( - new BlobData<>(blobPath, blobData), - CompletableFuture.supplyAsync( - () -> { - try { - // Get the fully qualified table name from the first channel in the blob. - // This only matters when the client is in Iceberg mode. In Iceberg mode, - // all channels in the blob belong to the same table. - String fullyQualifiedTableName = - blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName(); - BlobMetadata blobMetadata = - buildAndUpload(blobPath, blobData, fullyQualifiedTableName); - blobMetadata.getBlobStats().setFlushStartMs(flushStartMs); - return blobMetadata; - } catch (Throwable e) { - Throwable ex = e.getCause() == null ? e : e.getCause(); - String errorMessage = - String.format( - "Building blob failed, client=%s, blob=%s, exception=%s," - + " detail=%s, trace=%s, all channels in the blob will be" - + " invalidated", - this.owningClient.getName(), - blobPath, - ex, - ex.getMessage(), - getStackTrace(ex)); - logger.logError(errorMessage); - if (this.owningClient.getTelemetryService() != null) { - this.owningClient - .getTelemetryService() - .reportClientFailure(this.getClass().getSimpleName(), errorMessage); - } - - if (e instanceof IOException) { - invalidateAllChannelsInBlob(blobData, errorMessage); - return null; - } else if (e instanceof NoSuchAlgorithmException) { - throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE); - } else if (e instanceof InvalidAlgorithmParameterException - | e instanceof NoSuchPaddingException - | e instanceof IllegalBlockSizeException - | e instanceof BadPaddingException - | e instanceof InvalidKeyException) { - throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE); - } else { - throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()); - } - } - }, - this.buildUploadWorkers))); - logger.logInfo( - "buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}", - this.owningClient.getName(), - blobPath, - this.buildUploadWorkers.toString()); + continue; + } + + // Kick off a build job + + // Get the fully qualified table name from the first channel in the blob. + // This only matters when the client is in Iceberg mode. In Iceberg mode, + // all channels in the blob belong to the same table. + String fullyQualifiedTableName = + blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName(); + + final BlobPath blobPath = this.storageManager.generateBlobPath(fullyQualifiedTableName); + + long flushStartMs = System.currentTimeMillis(); + if (this.owningClient.flushLatency != null) { + latencyTimerContextMap.putIfAbsent( + blobPath.fileName, this.owningClient.flushLatency.time()); } + + Supplier supplier = + () -> { + try { + BlobMetadata blobMetadata = + buildAndUpload(blobPath, blobData, fullyQualifiedTableName); + blobMetadata.getBlobStats().setFlushStartMs(flushStartMs); + return blobMetadata; + } catch (Throwable e) { + Throwable ex = e.getCause() == null ? e : e.getCause(); + String errorMessage = + String.format( + "Building blob failed, client=%s, blob=%s, exception=%s," + + " detail=%s, trace=%s, all channels in the blob will be" + + " invalidated", + this.owningClient.getName(), + blobPath.fileName, + ex, + ex.getMessage(), + getStackTrace(ex)); + logger.logError(errorMessage); + if (this.owningClient.getTelemetryService() != null) { + this.owningClient + .getTelemetryService() + .reportClientFailure(this.getClass().getSimpleName(), errorMessage); + } + + if (e instanceof IOException) { + invalidateAllChannelsInBlob(blobData, errorMessage); + return null; + } else if (e instanceof NoSuchAlgorithmException) { + throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE); + } else if (e instanceof InvalidAlgorithmParameterException + | e instanceof NoSuchPaddingException + | e instanceof IllegalBlockSizeException + | e instanceof BadPaddingException + | e instanceof InvalidKeyException) { + throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE); + } else { + throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + }; + + blobs.add( + new Pair<>( + new BlobData<>(blobPath.fileName, blobData), + CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers))); + + logger.logInfo( + "buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}", + this.owningClient.getName(), + blobPath, + this.buildUploadWorkers.toString()); } // Add the flush task futures to the register service @@ -590,7 +591,7 @@ private boolean shouldStopProcessing( * @return BlobMetadata for FlushService.upload */ BlobMetadata buildAndUpload( - String blobPath, List>> blobData, String fullyQualifiedTableName) + BlobPath blobPath, List>> blobData, String fullyQualifiedTableName) throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException { @@ -599,7 +600,7 @@ BlobMetadata buildAndUpload( // Construct the blob along with the metadata of the blob BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata( - blobPath, + blobPath.fileName, blobData, bdecVersion, this.owningClient.getInternalParameterProvider().getEnableChunkEncryption()); @@ -625,13 +626,13 @@ BlobMetadata buildAndUpload( * @return BlobMetadata object used to create the register blob request */ BlobMetadata upload( - StreamingIngestStorage storage, - String blobPath, + IStorage storage, + BlobPath blobPath, byte[] blob, List metadata, BlobStats blobStats) throws NoSuchAlgorithmException { - logger.logInfo("Start uploading blob={}, size={}", blobPath, blob.length); + logger.logInfo("Start uploading blob={}, size={}", blobPath.fileName, blob.length); long startTime = System.currentTimeMillis(); Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency); @@ -647,14 +648,14 @@ BlobMetadata upload( logger.logInfo( "Finish uploading blob={}, size={}, timeInMillis={}", - blobPath, + blobPath.fileName, blob.length, System.currentTimeMillis() - startTime); // at this point we know for sure if the BDEC file has data for more than one chunk, i.e. // spans mixed tables or not return BlobMetadata.createBlobMetadata( - blobPath, + blobPath.fileName, BlobBuilder.computeMD5(blob), bdecVersion, metadata, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java new file mode 100644 index 000000000..3a41f38f1 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** + * Interface that represents a storage location to which we should upload data files. It is the + * account's internal stage for snowflake tables, and the table's external volume for iceberg + * tables. + */ +interface IStorage { + /** + * Writes out the byte[] to the path passed in. + * + * @param blobPath + * @param blob + */ + void put(BlobPath blobPath, byte[] blob); +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java index 51f4a82de..edd92f939 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java @@ -4,15 +4,9 @@ package net.snowflake.ingest.streaming.internal; -import java.util.Optional; +/** Interface to manage {@link InternalStage} and {@link ExternalVolume} for {@link FlushService} */ +interface IStorageManager { -/** - * Interface to manage {@link StreamingIngestStorage} for {@link FlushService} - * - * @param The type of chunk data - * @param the type of location that's being managed (internal stage / external volume) - */ -interface IStorageManager { /** Default max upload retries for streaming ingest storage */ int DEFAULT_MAX_UPLOAD_RETRIES = 5; @@ -22,41 +16,22 @@ interface IStorageManager { * @param fullyQualifiedTableName the target fully qualified table name * @return target stage */ - StreamingIngestStorage getStorage(String fullyQualifiedTableName); + // TODO: Use TableRef everywhere instead of constructing strings and passing them around + // everywhere + IStorage getStorage(String fullyQualifiedTableName); - /** - * Add a storage to the manager - * - * @param dbName the database name - * @param schemaName the schema name - * @param tableName the table name - * @param fileLocationInfo file location info from configure response - */ - void addStorage( - String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo); - - /** - * Gets the latest file location info (with a renewed short-lived access token) for the specified - * location - * - * @param location A reference to the target location - * @param fileName optional filename for single-file signed URL fetch from server - * @return the new location information - */ - FileLocationInfo getRefreshedLocation(TLocation location, Optional fileName); + /** Informs the storage manager about a new table that's being ingested into by the client. */ + void registerTable(TableRef tableRef, FileLocationInfo locationInfo); /** * Generate a unique blob path and increment the blob sequencer * + * @param fullyQualifiedTableName The table for which the path must be generated * @return the blob path */ - String generateBlobPath(); - - /** - * Decrement the blob sequencer, this method is needed to prevent gap between file name sequencer. - * See {@link IStorageManager#generateBlobPath()} for more details. - */ - void decrementBlobSequencer(); + // TODO: Use TableRef everywhere instead of constructing strings and passing them around + // everywhere + BlobPath generateBlobPath(String fullyQualifiedTableName); /** * Get the unique client prefix generated by the Snowflake server diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java similarity index 78% rename from src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java rename to src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java index 242b5cc43..984201555 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java @@ -35,7 +35,7 @@ import net.snowflake.ingest.utils.Utils; /** Handles uploading files to the Snowflake Streaming Ingest Storage */ -class StreamingIngestStorage { +class InternalStage implements IStorage { private static final ObjectMapper mapper = new ObjectMapper(); /** @@ -57,40 +57,10 @@ class StreamingIngestStorage { private static final Duration refreshDuration = Duration.ofMinutes(58); private static Instant prevRefresh = Instant.EPOCH; - private static final Logging logger = new Logging(StreamingIngestStorage.class); - - /** - * Wrapper class containing SnowflakeFileTransferMetadata and the timestamp at which the metadata - * was refreshed - */ - static class SnowflakeFileTransferMetadataWithAge { - SnowflakeFileTransferMetadataV1 fileTransferMetadata; - private final boolean isLocalFS; - private final String localLocation; - - /* Do not always know the age of the metadata, so we use the empty - state to record unknown age. - */ - Optional timestamp; - - SnowflakeFileTransferMetadataWithAge( - SnowflakeFileTransferMetadataV1 fileTransferMetadata, Optional timestamp) { - this.isLocalFS = false; - this.fileTransferMetadata = fileTransferMetadata; - this.timestamp = timestamp; - this.localLocation = null; - } - - SnowflakeFileTransferMetadataWithAge(String localLocation, Optional timestamp) { - this.isLocalFS = true; - this.localLocation = localLocation; - this.timestamp = timestamp; - } - } + private static final Logging logger = new Logging(InternalStage.class); private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; - private final IStorageManager owningManager; - private final TLocation location; + private final InternalStageManager owningManager; private final String clientName; private final int maxUploadRetries; @@ -104,23 +74,17 @@ state to record unknown age. * @param owningManager the storage manager owning this storage * @param clientName The client name * @param fileLocationInfo The file location information from open channel response - * @param location A reference to the target location * @param maxUploadRetries The maximum number of retries to attempt */ - StreamingIngestStorage( - IStorageManager owningManager, + InternalStage( + InternalStageManager owningManager, String clientName, FileLocationInfo fileLocationInfo, - TLocation location, int maxUploadRetries) throws SnowflakeSQLException, IOException { - this( - owningManager, - clientName, - (SnowflakeFileTransferMetadataWithAge) null, - location, - maxUploadRetries); - createFileTransferMetadataWithAge(fileLocationInfo); + this(owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries); + Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix()); + this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo); } /** @@ -129,35 +93,21 @@ state to record unknown age. * @param owningManager the storage manager owning this storage * @param clientName the client name * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with - * @param location A reference to the target location * @param maxUploadRetries the maximum number of retries to attempt */ - StreamingIngestStorage( - IStorageManager owningManager, + InternalStage( + InternalStageManager owningManager, String clientName, SnowflakeFileTransferMetadataWithAge testMetadata, - TLocation location, int maxUploadRetries) throws SnowflakeSQLException, IOException { this.owningManager = owningManager; this.clientName = clientName; this.maxUploadRetries = maxUploadRetries; this.proxyProperties = generateProxyPropertiesForJDBC(); - this.location = location; this.fileTransferMetadataWithAge = testMetadata; } - /** - * Upload file to internal stage with previously cached credentials. Will refetch and cache - * credentials if they've expired. - * - * @param fullFilePath Full file name to be uploaded - * @param data Data string to be uploaded - */ - void putRemote(String fullFilePath, byte[] data) throws SnowflakeSQLException, IOException { - this.putRemote(fullFilePath, data, 0); - } - private void putRemote(String fullFilePath, byte[] data, int retryCount) throws SnowflakeSQLException, IOException { SnowflakeFileTransferMetadataV1 fileTransferMetadataCopy; @@ -194,7 +144,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) // Proactively refresh the credential if it's going to expire, to avoid the token expiration // error from JDBC which confuses customer if (Instant.now().isAfter(prevRefresh.plus(refreshDuration))) { - refreshSnowflakeMetadata(); + refreshSnowflakeMetadata(false /* force */); } SnowflakeFileTransferAgent.uploadWithoutConnection( @@ -211,7 +161,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) } catch (Exception e) { if (retryCount == 0) { // for the first exception, we always perform a metadata refresh. - this.refreshSnowflakeMetadata(); + this.refreshSnowflakeMetadata(false /* force */); } if (retryCount >= maxUploadRetries) { logger.logError( @@ -233,12 +183,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) } } - SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata() - throws SnowflakeSQLException, IOException { - logger.logInfo("Refresh Snowflake metadata, client={}", clientName); - return refreshSnowflakeMetadata(false); - } - /** * Gets new stage credentials and other metadata from Snowflake. Synchronized to prevent multiple * calls to putRemote from trying to refresh at the same time @@ -250,6 +194,8 @@ SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata() */ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boolean force) throws SnowflakeSQLException, IOException { + logger.logInfo("Refresh Snowflake metadata, client={} force={}", clientName, force); + if (!force && fileTransferMetadataWithAge != null && fileTransferMetadataWithAge.timestamp.isPresent() @@ -258,24 +204,25 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - FileLocationInfo location = - this.owningManager.getRefreshedLocation(this.location, Optional.empty()); - return createFileTransferMetadataWithAge(location); + FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.empty()); + SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location); + this.fileTransferMetadataWithAge = metadata; + return metadata; } - private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( + static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( FileLocationInfo fileLocationInfo) throws JsonProcessingException, net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException, SnowflakeSQLException { - Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix()); + final SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; if (fileLocationInfo .getLocationType() .replaceAll( "^[\"]|[\"]$", "") // Replace the first and last character if they're double quotes .equals(StageInfo.StageType.LOCAL_FS.name())) { - this.fileTransferMetadataWithAge = + fileTransferMetadataWithAge = new SnowflakeFileTransferMetadataWithAge( fileLocationInfo .getLocation() @@ -284,7 +231,7 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( ""), // Replace the first and last character if they're double quotes Optional.of(System.currentTimeMillis())); } else { - this.fileTransferMetadataWithAge = + fileTransferMetadataWithAge = new SnowflakeFileTransferMetadataWithAge( (SnowflakeFileTransferMetadataV1) SnowflakeFileTransferAgent.getFileTransferMetadatas( @@ -293,8 +240,20 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( Optional.of(System.currentTimeMillis())); } + /* + this is not used thus commented out, but technically we are not copying over some info from fileLocationInfo + to fileTransferMetadata. + + String presignedUrl = fileLocationInfo.getPresignedUrl(); + if (presignedUrl != null) { + fileTransferMetadataWithAge.fileTransferMetadata.setPresignedUrl(presignedUrl); + String[] parts = presignedUrl.split("/"); + fileTransferMetadataWithAge.fileTransferMetadata.setPresignedUrlFileName(parts[parts.length - 1]); + } + */ + prevRefresh = Instant.now(); - return this.fileTransferMetadataWithAge; + return fileTransferMetadataWithAge; } /** @@ -306,8 +265,7 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { - FileLocationInfo location = - this.owningManager.getRefreshedLocation(this.location, Optional.of(fileName)); + FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.of(fileName)); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) @@ -318,7 +276,7 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) return metadata; } - private net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode + static net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode parseFileLocationInfo(FileLocationInfo fileLocationInfo) throws JsonProcessingException, net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException { @@ -341,18 +299,14 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) return parseConfigureResponseMapper.readTree(responseString); } - /** - * Upload file to internal stage - * - * @param filePath - * @param blob - */ - void put(String filePath, byte[] blob) { + /** Upload file to internal stage */ + public void put(BlobPath blobPath, byte[] blob) { + String filePath = blobPath.fileName; if (this.isLocalFS()) { - putLocal(filePath, blob); + putLocal(this.fileTransferMetadataWithAge.localLocation, filePath, blob); } else { try { - putRemote(filePath, blob); + putRemote(filePath, blob, 0); } catch (SnowflakeSQLException | IOException e) { throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE); } @@ -370,14 +324,13 @@ boolean isLocalFS() { * @param data */ @VisibleForTesting - void putLocal(String fullFilePath, byte[] data) { + static void putLocal(String stageLocation, String fullFilePath, byte[] data) { if (fullFilePath == null || fullFilePath.isEmpty() || fullFilePath.endsWith("/")) { throw new SFException(ErrorCode.BLOB_UPLOAD_FAILURE); } InputStream input = new ByteArrayInputStream(data); try { - String stageLocation = this.fileTransferMetadataWithAge.localLocation; File destFile = Paths.get(stageLocation, fullFilePath).toFile(); FileUtils.copyInputStreamToFile(input, destFile); } catch (Exception ex) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index d33a80738..14ca18822 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -19,14 +19,10 @@ import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; -class InternalStageLocation { - public InternalStageLocation() {} -} - /** Class to manage single Snowflake internal stage */ -class InternalStageManager implements IStorageManager { +class InternalStageManager implements IStorageManager { /** Target stage for the client */ - private final StreamingIngestStorage targetStage; + private final InternalStage targetStage; /** Increasing counter to generate a unique blob name per client */ private final AtomicLong counter; @@ -74,21 +70,16 @@ class InternalStageManager implements IStorageManager( - this, - clientName, - response.getStageLocation(), - new InternalStageLocation(), - DEFAULT_MAX_UPLOAD_RETRIES); + new InternalStage( + this, clientName, response.getStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES); } else { this.clientPrefix = null; this.deploymentId = null; this.targetStage = - new StreamingIngestStorage( + new InternalStage( this, "testClient", - (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, - new InternalStageLocation(), + (SnowflakeFileTransferMetadataWithAge) null, DEFAULT_MAX_UPLOAD_RETRIES); } } catch (IngestResponseException | IOException e) { @@ -107,28 +98,26 @@ class InternalStageManager implements IStorageManager getStorage( - String fullyQualifiedTableName) { + public InternalStage getStorage(String fullyQualifiedTableName) { // There's always only one stage for the client in non-iceberg mode return targetStage; } - /** Add storage to the manager. Do nothing as there's only one stage in non-Iceberg mode. */ + /** + * Informs the storage manager about a new table that's being ingested into by the client. Do + * nothing as there's no per-table state yet for FDN tables (that use internal stages). + */ @Override - public void addStorage( - String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {} + public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {} /** * Gets the latest file location info (with a renewed short-lived access token) for the specified * location * - * @param location A reference to the target location * @param fileName optional filename for single-file signed URL fetch from server * @return the new location information */ - @Override - public FileLocationInfo getRefreshedLocation( - InternalStageLocation location, Optional fileName) { + FileLocationInfo getRefreshedLocation(Optional fileName) { try { ClientConfigureRequest request = new ClientConfigureRequest(this.role); fileName.ifPresent(request::setFileName); @@ -157,19 +146,19 @@ public FileLocationInfo getRefreshedLocation( * @return the generated blob file path */ @Override - public String generateBlobPath() { + public BlobPath generateBlobPath(String fullyQualifiedTableName) { + // the table name argument is not going to be used in internal stages since we don't have per + // table paths. + // For external volumes (in iceberg), the blob path has a per-table element in it, thus the + // other implementation + // of IStorageManager does end up using this argument. Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - return getBlobPath(calendar, this.clientPrefix); - } - - @Override - public void decrementBlobSequencer() { - this.counter.decrementAndGet(); + return BlobPath.fileNameWithoutToken(getNextFileName(calendar, this.clientPrefix)); } /** For TESTING */ @VisibleForTesting - public String getBlobPath(Calendar calendar, String clientPrefix) { + public String getNextFileName(Calendar calendar, String clientPrefix) { if (this.isTestMode && clientPrefix == null) { clientPrefix = "testPrefix"; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeFileTransferMetadataWithAge.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeFileTransferMetadataWithAge.java new file mode 100644 index 000000000..4d9ba1af9 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeFileTransferMetadataWithAge.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.util.Optional; +import net.snowflake.client.jdbc.SnowflakeFileTransferMetadataV1; + +/** + * Wrapper class containing SnowflakeFileTransferMetadata and the timestamp at which the metadata + * was refreshed + */ +class SnowflakeFileTransferMetadataWithAge { + final SnowflakeFileTransferMetadataV1 fileTransferMetadata; + final boolean isLocalFS; + final String localLocation; + + /* Do not always know the age of the metadata, so we use the empty + state to record unknown age. + */ + Optional timestamp; + + SnowflakeFileTransferMetadataWithAge( + SnowflakeFileTransferMetadataV1 fileTransferMetadata, Optional timestamp) { + this.isLocalFS = false; + this.fileTransferMetadata = fileTransferMetadata; + this.timestamp = timestamp; + this.localLocation = null; + } + + SnowflakeFileTransferMetadataWithAge(String localLocation, Optional timestamp) { + this.isLocalFS = true; + this.fileTransferMetadata = null; + this.localLocation = localLocation; + this.timestamp = timestamp; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index bfe0c8f3c..be6b192b0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -109,7 +109,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea private final FlushService flushService; // Reference to storage manager - private final IStorageManager storageManager; + private final IStorageManager storageManager; // Indicates whether the client has closed private volatile boolean isClosed; @@ -235,9 +235,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.storageManager = isIcebergMode - ? new ExternalVolumeManager<>( + ? new ExternalVolumeManager( isTestMode, this.role, this.name, this.snowflakeServiceClient) - : new InternalStageManager<>( + : new InternalStageManager( isTestMode, this.role, this.name, this.snowflakeServiceClient); try { @@ -379,10 +379,8 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest // Add channel to the channel cache this.channelCache.addChannel(channel); - this.storageManager.addStorage( - response.getDBName(), - response.getSchemaName(), - response.getTableName(), + this.storageManager.registerTable( + new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), response.getExternalVolumeLocation()); return channel; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/TableRef.java b/src/main/java/net/snowflake/ingest/streaming/internal/TableRef.java new file mode 100644 index 000000000..1d15d38a7 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/TableRef.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import javax.annotation.Nonnull; +import net.snowflake.ingest.utils.Utils; + +/** + * Class to carry around the table pointer across the SDK codebase. This is being retrofitted into + * places that used to work with a fullyQualifiedTableName string. Can be used as a key in maps (has + * equals/hashcode implemented) + */ +class TableRef { + final String dbName; + final String schemaName; + final String tableName; + final String fullyQualifiedName; + + TableRef(@Nonnull String dbName, @Nonnull String schemaName, @Nonnull String tableName) { + this.dbName = dbName; + this.schemaName = schemaName; + this.tableName = tableName; + this.fullyQualifiedName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName); + } + + @Override + public int hashCode() { + return dbName.hashCode() ^ schemaName.hashCode() ^ tableName.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (this == obj) { + return true; + } + + if (TableRef.class != obj.getClass()) { + return false; + } + + final TableRef other = (TableRef) obj; + return this.dbName.equals(other.dbName) + && this.schemaName.equals(other.schemaName) + && this.tableName.equals(other.tableName); + } + + @Override + public String toString() { + return this.fullyQualifiedName; + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 303893bf0..f85cfbc56 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -95,15 +95,15 @@ private abstract static class TestContext implements AutoCloseable { ChannelCache channelCache; final Map> channels = new HashMap<>(); FlushService flushService; - IStorageManager storageManager; - StreamingIngestStorage storage; + IStorageManager storageManager; + InternalStage storage; ParameterProvider parameterProvider; RegisterService registerService; final List> channelData = new ArrayList<>(); TestContext() { - storage = Mockito.mock(StreamingIngestStorage.class); + storage = Mockito.mock(InternalStage.class); parameterProvider = new ParameterProvider(isIcebergMode); InternalParameterProvider internalParameterProvider = new InternalParameterProvider(isIcebergMode); @@ -113,8 +113,8 @@ private abstract static class TestContext implements AutoCloseable { storageManager = Mockito.spy( isIcebergMode - ? new ExternalVolumeManager<>(true, "role", "client", null) - : new InternalStageManager<>(true, "role", "client", null)); + ? new ExternalVolumeManager(true, "role", "client", null) + : new InternalStageManager(true, "role", "client", null)); Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any()); Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); Mockito.when(client.getParameterProvider()) @@ -140,7 +140,7 @@ ChannelData flushChannel(String name) { BlobMetadata buildAndUpload() throws Exception { List>> blobData = Collections.singletonList(channelData); return flushService.buildAndUpload( - "file_name", + BlobPath.fileNameWithoutToken("file_name"), blobData, blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName()); } @@ -434,7 +434,7 @@ public void testGetFilePath() { Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; String outputString = - ((InternalStageManager) storageManager).getBlobPath(calendar, clientPrefix); + ((InternalStageManager) storageManager).getNextFileName(calendar, clientPrefix); Path outputPath = Paths.get(outputString); Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); Assert.assertTrue( @@ -623,9 +623,11 @@ public void testBlobCreation() throws Exception { FlushService flushService = testContext.flushService; // Force = true flushes - flushService.flush(true).get(); - Mockito.verify(flushService, Mockito.atLeast(2)) - .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); + if (!isIcebergMode) { + flushService.flush(true).get(); + Mockito.verify(flushService, Mockito.atLeast(2)) + .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); + } } @Test @@ -672,10 +674,12 @@ public void testBlobSplitDueToDifferentSchema() throws Exception { FlushService flushService = testContext.flushService; - // Force = true flushes - flushService.flush(true).get(); - Mockito.verify(flushService, Mockito.atLeast(2)) - .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); + if (!isIcebergMode) { + // Force = true flushes + flushService.flush(true).get(); + Mockito.verify(flushService, Mockito.atLeast(2)) + .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); + } } @Test @@ -707,14 +711,20 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception { FlushService flushService = testContext.flushService; - // Force = true flushes - flushService.flush(true).get(); - Mockito.verify(flushService, Mockito.times(2)) - .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); + if (!isIcebergMode) { + // Force = true flushes + flushService.flush(true).get(); + Mockito.verify(flushService, Mockito.times(2)) + .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); + } } @Test public void testBlobSplitDueToNumberOfChunks() throws Exception { + if (isIcebergMode) { + return; + } + for (int rowCount : Arrays.asList(0, 1, 30, 111, 159, 287, 1287, 1599, 4496)) { runTestBlobSplitDueToNumberOfChunks(rowCount); } @@ -789,6 +799,10 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except channel3.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); channel3.insertRow(Collections.singletonMap("C1", 0), ""); + if (isIcebergMode) { + return; + } + FlushService>> flushService = testContext.flushService; flushService.flush(true).get(); @@ -913,9 +927,9 @@ public void testBuildAndUpload() throws Exception { .build(); // Check FlushService.upload called with correct arguments - final ArgumentCaptor storageCaptor = - ArgumentCaptor.forClass(StreamingIngestStorage.class); - final ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor storageCaptor = + ArgumentCaptor.forClass(InternalStage.class); + final ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(BlobPath.class); final ArgumentCaptor blobCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor> metadataCaptor = ArgumentCaptor.forClass(List.class); @@ -926,7 +940,7 @@ public void testBuildAndUpload() throws Exception { blobCaptor.capture(), metadataCaptor.capture(), ArgumentMatchers.any()); - Assert.assertEquals("file_name", nameCaptor.getValue()); + Assert.assertEquals("file_name", nameCaptor.getValue().fileName); ChunkMetadata metadataResult = metadataCaptor.getValue().get(0); List channelMetadataResult = metadataResult.getChannels(); @@ -1064,7 +1078,7 @@ public void testInvalidateChannels() { innerData.add(channel1Data); innerData.add(channel2Data); - IStorageManager storageManager = + IStorageManager storageManager = Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); FlushService flushService = new FlushService<>(client, channelCache, storageManager, false); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java similarity index 91% rename from src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java rename to src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java index d4c3f0374..e6335de0f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED; @@ -57,7 +61,7 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({TestUtils.class, HttpUtil.class, SnowflakeFileTransferAgent.class}) -public class StreamingIngestStorageTest { +public class InternalStageTest { private final String prefix = "EXAMPLE_PREFIX"; @@ -130,23 +134,22 @@ public void testPutRemote() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - IStorageManager storageManager = Mockito.mock(IStorageManager.class); + InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - StreamingIngestStorage stage = - new StreamingIngestStorage( + InternalStage stage = + new InternalStage( storageManager, "clientName", - new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( + new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), - null, 1); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); final ArgumentCaptor captor = ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class); - stage.putRemote("test/path", dataBytes); + stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes); PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class); SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture()); SnowflakeFileTransferConfig capturedConfig = captor.getValue(); @@ -173,18 +176,17 @@ public void testPutLocal() throws Exception { String fullFilePath = "testOutput"; String fileName = "putLocalOutput"; - StreamingIngestStorage stage = + InternalStage stage = Mockito.spy( - new StreamingIngestStorage( + new InternalStage( null, "clientName", - new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( + new SnowflakeFileTransferMetadataWithAge( fullFilePath, Optional.of(System.currentTimeMillis())), - null, 1)); Mockito.doReturn(true).when(stage).isLocalFS(); - stage.put(fileName, dataBytes); + stage.put(BlobPath.fileNameWithoutToken(fileName), dataBytes); Path outputPath = Paths.get(fullFilePath, fileName); List output = Files.readAllLines(outputPath); Assert.assertEquals(1, output.size()); @@ -201,16 +203,15 @@ public void doTestPutRemoteRefreshes() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - IStorageManager storageManager = Mockito.mock(IStorageManager.class); + InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - StreamingIngestStorage stage = - new StreamingIngestStorage<>( + InternalStage stage = + new InternalStage<>( storageManager, "clientName", - new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( + new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), - null, maxUploadRetryCount); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeSQLException e = @@ -222,7 +223,7 @@ public void doTestPutRemoteRefreshes() throws Exception { ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class); try { - stage.putRemote("test/path", dataBytes); + stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes); Assert.fail("Should not succeed"); } catch (SFException ex) { // Expected behavior given mocked response @@ -256,23 +257,22 @@ public void testPutRemoteGCS() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - IStorageManager storageManager = Mockito.mock(IStorageManager.class); + InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - StreamingIngestStorage stage = + InternalStage stage = Mockito.spy( - new StreamingIngestStorage<>( + new InternalStage<>( storageManager, "clientName", - new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( + new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), - null, 1)); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeFileTransferMetadataV1 metaMock = Mockito.mock(SnowflakeFileTransferMetadataV1.class); Mockito.doReturn(metaMock).when(stage).fetchSignedURL(Mockito.any()); - stage.putRemote("test/path", dataBytes); + stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes); SnowflakeFileTransferAgent.uploadWithoutConnection(Mockito.any()); Mockito.verify(stage, times(1)).fetchSignedURL("test/path"); } @@ -294,19 +294,14 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - IStorageManager storageManager = + InternalStageManager storageManager = new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); - StreamingIngestStorage stage = - new StreamingIngestStorage<>( - storageManager, - "clientName", - (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, - null, - 1); + InternalStage stage = + new InternalStage<>( + storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); - StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge metadataWithAge = - stage.refreshSnowflakeMetadata(true); + SnowflakeFileTransferMetadataWithAge metadataWithAge = stage.refreshSnowflakeMetadata(true); final ArgumentCaptor endpointCaptor = ArgumentCaptor.forClass(String.class); final ArgumentCaptor stringCaptor = ArgumentCaptor.forClass(String.class); @@ -353,10 +348,10 @@ public void testRefreshSnowflakeMetadataDeploymentIdMismatch() throws Exception SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - IStorageManager storageManager = + InternalStageManager storageManager = new InternalStageManager<>(true, "role", "clientName", snowflakeServiceClient); - StreamingIngestStorage storage = storageManager.getStorage(""); + InternalStage storage = storageManager.getStorage(""); storage.refreshSnowflakeMetadata(true); Assert.assertEquals(prefix + "_" + deploymentId, storageManager.getClientPrefix()); @@ -385,7 +380,7 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockClientInternal.getRole()).thenReturn("role"); SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - IStorageManager storageManager = + InternalStageManager storageManager = new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -394,13 +389,9 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - StreamingIngestStorage stage = - new StreamingIngestStorage( - storageManager, - "clientName", - (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, - null, - 1); + InternalStage stage = + new InternalStage( + storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); SnowflakeFileTransferMetadataV1 metadata = stage.fetchSignedURL("path/fileName"); @@ -431,7 +422,7 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { Mockito.when(mockClientInternal.getRole()).thenReturn("role"); SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - IStorageManager storageManager = + InternalStageManager storageManager = new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -440,13 +431,9 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - StreamingIngestStorage stage = - new StreamingIngestStorage<>( - storageManager, - "clientName", - (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, - null, - 1); + InternalStage stage = + new InternalStage<>( + storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); ThreadFactory buildUploadThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-build-upload-thread-%d").build(); @@ -457,7 +444,7 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { workers.submit( () -> { try { - stage.refreshSnowflakeMetadata(); + stage.refreshSnowflakeMetadata(false); } catch (Exception e) { throw new RuntimeException(e); } @@ -465,7 +452,7 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { workers.submit( () -> { try { - stage.refreshSnowflakeMetadata(); + stage.refreshSnowflakeMetadata(false); } catch (Exception e) { throw new RuntimeException(e); } @@ -574,16 +561,15 @@ public void testRefreshMetadataOnFirstPutException() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - IStorageManager storageManager = Mockito.mock(IStorageManager.class); + InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - StreamingIngestStorage stage = - new StreamingIngestStorage<>( + InternalStage stage = + new InternalStage<>( storageManager, "clientName", - new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( + new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), - null, maxUploadRetryCount); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeSQLException e = @@ -607,7 +593,7 @@ public Object answer(org.mockito.invocation.InvocationOnMock invocation) final ArgumentCaptor captor = ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class); - stage.putRemote("test/path", dataBytes); + stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes); PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class, times(maxUploadRetryCount)); SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture()); From 3cf3eb50ab2352dcdedd9f2fe8b110544295b7e0 Mon Sep 17 00:00:00 2001 From: Gloria Doci Date: Mon, 16 Sep 2024 15:05:09 +0200 Subject: [PATCH 3/3] SNOW-1659373 cleanup serializeFromParquetBuffers (#829) SNOW-1659373 remove serializeFromParquetBuffers --- .../internal/ClientBufferParameters.java | 18 --- .../streaming/internal/ParquetChunkData.java | 15 +-- .../streaming/internal/ParquetFlusher.java | 119 ++---------------- .../streaming/internal/ParquetRowBuffer.java | 55 +------- .../ingest/utils/ParameterProvider.java | 21 ---- .../streaming/internal/BlobBuilderTest.java | 46 +------ .../streaming/internal/RowBufferTest.java | 7 -- 7 files changed, 19 insertions(+), 262 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index ac05c814e..c73337d1e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -10,8 +10,6 @@ /** Channel's buffer relevant parameters that are set at the owning client level. */ public class ClientBufferParameters { - private boolean enableParquetInternalBuffering; - private long maxChunkSizeInBytes; private long maxAllowedRowSizeInBytes; @@ -23,18 +21,14 @@ public class ClientBufferParameters { /** * Private constructor used for test methods * - * @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is - * enabled * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes */ private ClientBufferParameters( - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic) { - this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; @@ -43,10 +37,6 @@ private ClientBufferParameters( /** @param clientInternal reference to the client object where the relevant parameters are set */ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) { - this.enableParquetInternalBuffering = - clientInternal != null - ? clientInternal.getParameterProvider().getEnableParquetInternalBuffering() - : ParameterProvider.ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT; this.maxChunkSizeInBytes = clientInternal != null ? clientInternal.getParameterProvider().getMaxChunkSizeInBytes() @@ -67,30 +57,22 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter } /** - * @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is - * enabled * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes * @return ClientBufferParameters object */ public static ClientBufferParameters test_createClientBufferParameters( - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic) { return new ClientBufferParameters( - enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, enableNewJsonParsingLogic); } - public boolean getEnableParquetInternalBuffering() { - return enableParquetInternalBuffering; - } - public long getMaxChunkSizeInBytes() { return maxChunkSizeInBytes; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java index 9950c44aa..e3fe97dfb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java @@ -4,37 +4,24 @@ package net.snowflake.ingest.streaming.internal; -import java.io.ByteArrayOutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.parquet.hadoop.BdecParquetWriter; /** Parquet data holder to buffer rows. */ public class ParquetChunkData { // buffered rows serialized into Java objects. Needed for the Parquet w/o memory optimization. final List> rows; - - final BdecParquetWriter parquetWriter; - final ByteArrayOutputStream output; final Map metadata; /** * Construct parquet data chunk. * * @param rows buffered row data as a list - * @param parquetWriter buffered parquet row data - * @param output byte array file output * @param metadata chunk metadata */ - public ParquetChunkData( - List> rows, - BdecParquetWriter parquetWriter, - ByteArrayOutputStream output, - Map metadata) { + public ParquetChunkData(List> rows, Map metadata) { this.rows = rows; - this.parquetWriter = parquetWriter; - this.output = output; // create a defensive copy of the parameter map because the argument map passed here // may currently be shared across multiple threads. this.metadata = createDefensiveCopy(metadata); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index d338a6a7b..ddfca4a42 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -15,7 +15,6 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; -import org.apache.parquet.hadoop.BdecParquetReader; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; @@ -26,22 +25,16 @@ public class ParquetFlusher implements Flusher { private static final Logging logger = new Logging(ParquetFlusher.class); private final MessageType schema; - private final boolean enableParquetInternalBuffering; private final long maxChunkSizeInBytes; private final Constants.BdecParquetCompression bdecParquetCompression; - /** - * Construct parquet flusher from its schema and set flag that indicates whether Parquet memory - * optimization is enabled, i.e. rows will be buffered in internal Parquet buffer. - */ + /** Construct parquet flusher from its schema. */ public ParquetFlusher( MessageType schema, - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression) { this.schema = schema; - this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; } @@ -50,97 +43,9 @@ public ParquetFlusher( public SerializationResult serialize( List> channelsDataPerTable, String filePath) throws IOException { - if (enableParquetInternalBuffering) { - return serializeFromParquetWriteBuffers(channelsDataPerTable, filePath); - } return serializeFromJavaObjects(channelsDataPerTable, filePath); } - private SerializationResult serializeFromParquetWriteBuffers( - List> channelsDataPerTable, String filePath) - throws IOException { - List channelsMetadataList = new ArrayList<>(); - long rowCount = 0L; - float chunkEstimatedUncompressedSize = 0f; - String firstChannelFullyQualifiedTableName = null; - Map columnEpStatsMapCombined = null; - BdecParquetWriter mergedChannelWriter = null; - ByteArrayOutputStream mergedChunkData = new ByteArrayOutputStream(); - Pair chunkMinMaxInsertTimeInMs = null; - - for (ChannelData data : channelsDataPerTable) { - // Create channel metadata - ChannelMetadata channelMetadata = - ChannelMetadata.builder() - .setOwningChannelFromContext(data.getChannelContext()) - .setRowSequencer(data.getRowSequencer()) - .setOffsetToken(data.getEndOffsetToken()) - .setStartOffsetToken(data.getStartOffsetToken()) - .build(); - // Add channel metadata to the metadata list - channelsMetadataList.add(channelMetadata); - - logger.logDebug( - "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}", - data.getChannelContext().getFullyQualifiedName(), - data.getRowCount(), - data.getBufferSize(), - filePath); - - if (mergedChannelWriter == null) { - columnEpStatsMapCombined = data.getColumnEps(); - mergedChannelWriter = data.getVectors().parquetWriter; - mergedChunkData = data.getVectors().output; - firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName(); - chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs(); - } else { - // This method assumes that channelsDataPerTable is grouped by table. We double check - // here and throw an error if the assumption is violated - if (!data.getChannelContext() - .getFullyQualifiedTableName() - .equals(firstChannelFullyQualifiedTableName)) { - throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK); - } - - columnEpStatsMapCombined = - ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps()); - data.getVectors().parquetWriter.close(); - BdecParquetReader.readFileIntoWriter( - data.getVectors().output.toByteArray(), mergedChannelWriter); - chunkMinMaxInsertTimeInMs = - ChannelData.getCombinedMinMaxInsertTimeInMs( - chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); - } - - rowCount += data.getRowCount(); - chunkEstimatedUncompressedSize += data.getBufferSize(); - - logger.logDebug( - "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}", - data.getChannelContext().getFullyQualifiedName(), - data.getRowCount(), - data.getBufferSize(), - filePath); - } - - if (mergedChannelWriter != null) { - mergedChannelWriter.close(); - this.verifyRowCounts( - "serializeFromParquetWriteBuffers", - mergedChannelWriter, - rowCount, - channelsDataPerTable, - -1); - } - return new SerializationResult( - channelsMetadataList, - columnEpStatsMapCombined, - rowCount, - chunkEstimatedUncompressedSize, - mergedChunkData, - chunkMinMaxInsertTimeInMs); - } - private SerializationResult serializeFromJavaObjects( List> channelsDataPerTable, String filePath) throws IOException { @@ -167,13 +72,11 @@ private SerializationResult serializeFromJavaObjects( channelsMetadataList.add(channelMetadata); logger.logDebug( - "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}," - + " enableParquetMemoryOptimization={}", + "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), data.getBufferSize(), - filePath, - enableParquetInternalBuffering); + filePath); if (rows == null) { columnEpStatsMapCombined = data.getColumnEps(); @@ -181,7 +84,7 @@ private SerializationResult serializeFromJavaObjects( firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName(); chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs(); } else { - // This method assumes that channelsDataPerTable is grouped by table. We double check + // This method assumes that channelsDataPerTable is grouped by table. We double-check // here and throw an error if the assumption is violated if (!data.getChannelContext() .getFullyQualifiedTableName() @@ -202,13 +105,11 @@ private SerializationResult serializeFromJavaObjects( chunkEstimatedUncompressedSize += data.getBufferSize(); logger.logDebug( - "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}," - + " enableParquetMemoryOptimization={}", + "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), data.getBufferSize(), - filePath, - enableParquetInternalBuffering); + filePath); } Map metadata = channelsDataPerTable.get(0).getVectors().metadata; @@ -227,8 +128,7 @@ private SerializationResult serializeFromJavaObjects( rows.forEach(parquetWriter::writeRow); parquetWriter.close(); - this.verifyRowCounts( - "serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size()); + this.verifyRowCounts(parquetWriter, rowCount, channelsDataPerTable, rows.size()); return new SerializationResult( channelsMetadataList, @@ -243,7 +143,6 @@ private SerializationResult serializeFromJavaObjects( * Validates that rows count in metadata matches the row count in Parquet footer and the row count * written by the parquet writer * - * @param serializationType Serialization type, used for logging purposes only * @param writer Parquet writer writing the data * @param channelsDataPerTable Channel data * @param totalMetadataRowCount Row count calculated during metadata collection @@ -251,7 +150,6 @@ private SerializationResult serializeFromJavaObjects( * Used only for logging purposes if there is a mismatch. */ private void verifyRowCounts( - String serializationType, BdecParquetWriter writer, long totalMetadataRowCount, List> channelsDataPerTable, @@ -285,7 +183,7 @@ private void verifyRowCounts( throw new SFException( ErrorCode.INTERNAL_ERROR, String.format( - "[%s]The number of rows in Parquet does not match the number of rows in metadata. " + "The number of rows in Parquet does not match the number of rows in metadata. " + "parquetTotalRowsInFooter=%d " + "totalMetadataRowCount=%d " + "parquetTotalRowsWritten=%d " @@ -294,7 +192,6 @@ private void verifyRowCounts( + "channelsCountInMetadata=%d " + "countOfSerializedJavaObjects=%d " + "channelNames=%s", - serializationType, parquetTotalRowsInFooter, totalMetadataRowCount, parquetTotalRowsWritten, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 30851c274..5ada286e5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -4,8 +4,6 @@ package net.snowflake.ingest.streaming.internal; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -24,7 +22,6 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; -import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -43,11 +40,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer { /* Unflushed rows as Java objects. Needed for the Parquet w/o memory optimization. */ private final List> data; - - /* BDEC Parquet writer. It is used to buffer unflushed data in Parquet internal buffers instead of using Java objects */ - private BdecParquetWriter bdecParquetWriter; - - private ByteArrayOutputStream fileOutput; private final List> tempData; private MessageType schema; @@ -111,33 +103,10 @@ public void setupSchema(List columns) { id++; } schema = new MessageType(PARQUET_MESSAGE_TYPE_NAME, parquetTypes); - createFileWriter(); tempData.clear(); data.clear(); } - /** Create BDEC file writer if Parquet memory optimization is enabled. */ - private void createFileWriter() { - fileOutput = new ByteArrayOutputStream(); - try { - if (clientBufferParameters.getEnableParquetInternalBuffering()) { - bdecParquetWriter = - new BdecParquetWriter( - fileOutput, - schema, - metadata, - channelFullyQualifiedName, - clientBufferParameters.getMaxChunkSizeInBytes(), - clientBufferParameters.getBdecParquetCompression()); - } else { - this.bdecParquetWriter = null; - } - data.clear(); - } catch (IOException e) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "cannot create parquet writer", e); - } - } - @Override boolean hasColumn(String name) { return fieldIndex.containsKey(name); @@ -154,11 +123,7 @@ float addRow( } void writeRow(List row) { - if (clientBufferParameters.getEnableParquetInternalBuffering()) { - bdecParquetWriter.writeRow(row); - } else { - data.add(row); - } + data.add(row); } @Override @@ -263,12 +228,10 @@ boolean hasColumns() { @Override Optional getSnapshot() { List> oldData = new ArrayList<>(); - if (!clientBufferParameters.getEnableParquetInternalBuffering()) { - data.forEach(r -> oldData.add(new ArrayList<>(r))); - } + data.forEach(r -> oldData.add(new ArrayList<>(r))); return bufferedRowCount <= 0 ? Optional.empty() - : Optional.of(new ParquetChunkData(oldData, bdecParquetWriter, fileOutput, metadata)); + : Optional.of(new ParquetChunkData(oldData, metadata)); } /** Used only for testing. */ @@ -309,27 +272,17 @@ int getTempRowCount() { @Override void reset() { super.reset(); - createFileWriter(); data.clear(); } /** Close the row buffer by releasing its internal resources. */ @Override - void closeInternal() { - if (bdecParquetWriter != null) { - try { - bdecParquetWriter.close(); - } catch (IOException e) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Failed to close parquet writer", e); - } - } - } + void closeInternal() {} @Override public Flusher createFlusher() { return new ParquetFlusher( schema, - clientBufferParameters.getEnableParquetInternalBuffering(), clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getBdecParquetCompression()); } diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 0525737a3..885314b01 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -30,8 +30,6 @@ public class ParameterProvider { public static final String BLOB_UPLOAD_MAX_RETRY_COUNT = "BLOB_UPLOAD_MAX_RETRY_COUNT".toLowerCase(); public static final String MAX_MEMORY_LIMIT_IN_BYTES = "MAX_MEMORY_LIMIT_IN_BYTES".toLowerCase(); - public static final String ENABLE_PARQUET_INTERNAL_BUFFERING = - "ENABLE_PARQUET_INTERNAL_BUFFERING".toLowerCase(); // This should not be needed once we have the ability to track size at table/chunk level public static final String MAX_CHANNEL_SIZE_IN_BYTES = "MAX_CHANNEL_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); @@ -79,10 +77,6 @@ public class ParameterProvider { /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; - /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. - It reduces memory consumption compared to using Java Objects for buffering.*/ - public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; - public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true; /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ @@ -212,13 +206,6 @@ private void setParameterMap( props, false); - this.checkAndUpdate( - ENABLE_PARQUET_INTERNAL_BUFFERING, - ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT, - parameterOverrides, - props, - false); - this.checkAndUpdate( MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, @@ -440,14 +427,6 @@ public long getMaxMemoryLimitInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return Return whether memory optimization for Parquet is enabled. */ - public boolean getEnableParquetInternalBuffering() { - Object val = - this.parameterMap.getOrDefault( - ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT); - return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; - } - /** @return The max channel size in bytes */ public long getMaxChannelSizeInBytes() { Object val = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 17d90f458..62a6b7a4b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -35,12 +35,7 @@ public void testSerializationErrors() throws Exception { // Construction succeeds if both data and metadata contain 1 row BlobBuilder.constructBlobAndMetadata( "a.bdec", - Collections.singletonList(createChannelDataPerTable(1, false)), - Constants.BdecVersion.THREE, - encrypt); - BlobBuilder.constructBlobAndMetadata( - "a.bdec", - Collections.singletonList(createChannelDataPerTable(1, true)), + Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, encrypt); @@ -48,13 +43,11 @@ public void testSerializationErrors() throws Exception { try { BlobBuilder.constructBlobAndMetadata( "a.bdec", - Collections.singletonList(createChannelDataPerTable(0, false)), + Collections.singletonList(createChannelDataPerTable(0)), Constants.BdecVersion.THREE, encrypt); - Assert.fail("Should not pass enableParquetInternalBuffering=false"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); - Assert.assertTrue(e.getMessage().contains("serializeFromJavaObjects")); Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1")); Assert.assertTrue(e.getMessage().contains("totalMetadataRowCount=0")); Assert.assertTrue(e.getMessage().contains("parquetTotalRowsWritten=1")); @@ -63,42 +56,18 @@ public void testSerializationErrors() throws Exception { Assert.assertTrue(e.getMessage().contains("channelsCountInMetadata=1")); Assert.assertTrue(e.getMessage().contains("countOfSerializedJavaObjects=1")); } - - try { - BlobBuilder.constructBlobAndMetadata( - "a.bdec", - Collections.singletonList(createChannelDataPerTable(0, true)), - Constants.BdecVersion.THREE, - encrypt); - Assert.fail("Should not pass enableParquetInternalBuffering=true"); - } catch (SFException e) { - Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); - Assert.assertTrue(e.getMessage().contains("serializeFromParquetWriteBuffers")); - Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1")); - Assert.assertTrue(e.getMessage().contains("totalMetadataRowCount=0")); - Assert.assertTrue(e.getMessage().contains("parquetTotalRowsWritten=1")); - Assert.assertTrue(e.getMessage().contains("perChannelRowCountsInMetadata=0")); - Assert.assertTrue(e.getMessage().contains("perBlockRowCountsInFooter=1")); - Assert.assertTrue(e.getMessage().contains("channelsCountInMetadata=1")); - Assert.assertTrue(e.getMessage().contains("countOfSerializedJavaObjects=-1")); - } } /** * Creates a channel data configurable number of rows in metadata and 1 physical row (using both * with and without internal buffering optimization) */ - private List> createChannelDataPerTable( - int metadataRowCount, boolean enableParquetInternalBuffering) throws IOException { + private List> createChannelDataPerTable(int metadataRowCount) + throws IOException { String columnName = "C1"; ChannelData channelData = Mockito.spy(new ChannelData<>()); MessageType schema = createSchema(columnName); - Mockito.doReturn( - new ParquetFlusher( - schema, - enableParquetInternalBuffering, - 100L, - Constants.BdecParquetCompression.GZIP)) + Mockito.doReturn(new ParquetFlusher(schema, 100L, Constants.BdecParquetCompression.GZIP)) .when(channelData) .createFlusher(); @@ -115,10 +84,7 @@ private List> createChannelDataPerTable( bdecParquetWriter.writeRow(Collections.singletonList("1")); channelData.setVectors( new ParquetChunkData( - Collections.singletonList(Collections.singletonList("A")), - bdecParquetWriter, - stream, - new HashMap<>())); + Collections.singletonList(Collections.singletonList("A")), new HashMap<>())); channelData.setColumnEps(new HashMap<>()); channelData.setRowCount(metadataRowCount); channelData.setMinMaxInsertTimeInMs(new Pair<>(2L, 3L)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 3f8e927a4..f0a867075 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -31,16 +31,10 @@ import org.junit.Test; public class RowBufferTest { - - private final boolean enableParquetMemoryOptimization; private AbstractRowBuffer rowBufferOnErrorContinue; private AbstractRowBuffer rowBufferOnErrorAbort; private AbstractRowBuffer rowBufferOnErrorSkipBatch; - public RowBufferTest() { - this.enableParquetMemoryOptimization = false; - } - @Before public void setupRowBuffer() { this.rowBufferOnErrorContinue = createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); @@ -129,7 +123,6 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o rs -> {}, initialState, ClientBufferParameters.test_createClientBufferParameters( - enableParquetMemoryOptimization, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, Constants.BdecParquetCompression.GZIP,