From d02e5eed8c054b80e820f0de373a39b99a2bddeb Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Fri, 13 Sep 2024 15:02:40 +0000 Subject: [PATCH] fix formatting --- .../ingest/streaming/internal/BlobPath.java | 3 +- .../streaming/internal/ExternalVolume.java | 6 +- .../internal/ExternalVolumeManager.java | 42 +++-- .../streaming/internal/FlushService.java | 167 +++++++++--------- .../ingest/streaming/internal/IStorage.java | 6 +- .../streaming/internal/IStorageManager.java | 18 +- .../streaming/internal/InternalStage.java | 42 ++--- .../internal/InternalStageManager.java | 66 +++---- .../SnowflakeFileTransferMetadataWithAge.java | 5 +- ...nowflakeStreamingIngestClientInternal.java | 5 +- .../ingest/streaming/internal/TableRef.java | 12 +- .../streaming/internal/InternalStageTest.java | 19 +- 12 files changed, 168 insertions(+), 223 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java index 7e8ca7271..8de78460a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java @@ -5,7 +5,8 @@ package net.snowflake.ingest.streaming.internal; /** - * Class to manage blob path strings that might have an embedded security token if its a presigned url + * Class to manage blob path strings that might have an embedded security token if its a presigned + * url */ public class BlobPath { public final String blobPath; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java index f07c9964c..0f1c1a934 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java @@ -1,11 +1,9 @@ package net.snowflake.ingest.streaming.internal; -/** - * Handles uploading files to the Iceberg Table's external volume's table data path - */ +/** Handles uploading files to the Iceberg Table's external volume's table data path */ class ExternalVolume implements IStorage { @Override public void put(BlobPath blobPath, byte[] blob) { throw new RuntimeException("not implemented"); } -} \ No newline at end of file +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java index a83039766..3c6bf3f9d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java @@ -7,15 +7,12 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; -/** - * Class to manage multiple external volumes - */ +/** Class to manage multiple external volumes */ class ExternalVolumeManager implements IStorageManager { // TODO: Rename all logger members to LOGGER and checkin code formatting rules private static final Logging logger = new Logging(ExternalVolumeManager.class); @@ -34,16 +31,17 @@ class ExternalVolumeManager implements IStorageManager { // Client prefix generated by the Snowflake server private final String clientPrefix; - // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if openChannel is called + // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if + // openChannel is called // multiple times concurrently) private final Object registerTableLock = new Object(); /** * Constructor for ExternalVolumeManager * - * @param isTestMode whether the manager in test mode - * @param role the role of the client - * @param clientName the name of the client + * @param isTestMode whether the manager in test mode + * @param role the role of the client + * @param clientName the name of the client * @param snowflakeServiceClient the Snowflake service client used for configure calls */ ExternalVolumeManager( @@ -60,13 +58,15 @@ class ExternalVolumeManager implements IStorageManager { isTestMode ? "testPrefix" : this.serviceClient - .clientConfigure(new ClientConfigureRequest(role)) - .getClientPrefix(); + .clientConfigure(new ClientConfigureRequest(role)) + .getClientPrefix(); } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } - logger.logDebug("Created ExternalVolumeManager with clientName=%s and clientPrefix=%s", clientName, clientPrefix); + logger.logDebug( + "Created ExternalVolumeManager with clientName=%s and clientPrefix=%s", + clientName, clientPrefix); } /** @@ -81,20 +81,24 @@ public IStorage getStorage(String fullyQualifiedTableName) { return getVolumeSafe(fullyQualifiedTableName); } - /** - * Informs the storage manager about a new table that's being ingested into by the client. - */ + /** Informs the storage manager about a new table that's being ingested into by the client. */ @Override public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { - logger.logInfo("Skip registering table since its already been registered with the VolumeManager. tableRef=%s", tableRef); + logger.logInfo( + "Skip registering table since its already been registered with the VolumeManager." + + " tableRef=%s", + tableRef); return; } // future enhancement - per table locks instead of per-client lock synchronized (registerTableLock) { if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { - logger.logInfo("Skip registering table since its already been registered with the VolumeManager. tableRef=%s", tableRef); + logger.logInfo( + "Skip registering table since its already been registered with the VolumeManager." + + " tableRef=%s", + tableRef); return; } @@ -102,11 +106,13 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { ExternalVolume externalVolume = new ExternalVolume(); this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume); } catch (SFException ex) { - logger.logError("ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); // allow external volume ctor's SFExceptions to bubble up directly throw ex; } catch (Exception err) { - logger.logError("ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); throw new SFException( err, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 15e6f9c57..3823154fd 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -12,7 +12,6 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.lang.management.ManagementFactory; import java.security.InvalidAlgorithmParameterException; @@ -38,7 +37,6 @@ import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; - import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; @@ -84,16 +82,13 @@ List>> getData() { private final SnowflakeStreamingIngestClientInternal owningClient; // Thread to schedule the flush job - @VisibleForTesting - ScheduledExecutorService flushWorker; + @VisibleForTesting ScheduledExecutorService flushWorker; // Thread to register the blob - @VisibleForTesting - ExecutorService registerWorker; + @VisibleForTesting ExecutorService registerWorker; // Threads to build and upload the blob - @VisibleForTesting - ExecutorService buildUploadWorkers; + @VisibleForTesting ExecutorService buildUploadWorkers; // Reference to the channel cache private final ChannelCache channelCache; @@ -109,11 +104,9 @@ List>> getData() { * blob is not 1. When max chunk in blob is 1, flush service ignores these variables and uses * table level last flush time and need flush flag. See {@link ChannelCache.FlushInfo}. */ - @VisibleForTesting - volatile long lastFlushTime; + @VisibleForTesting volatile long lastFlushTime; - @VisibleForTesting - volatile boolean isNeedFlush; + @VisibleForTesting volatile boolean isNeedFlush; // Indicates whether it's running as part of the test private final boolean isTestMode; @@ -128,10 +121,10 @@ List>> getData() { /** * Default constructor * - * @param client the owning client - * @param cache the channel cache + * @param client the owning client + * @param cache the channel cache * @param storageManager the storage manager - * @param isTestMode whether the service is running in test mode + * @param isTestMode whether the service is running in test mode */ FlushService( SnowflakeStreamingIngestClientInternal client, @@ -170,8 +163,8 @@ private CompletableFuture statsFuture() { } /** - * @param isForce if true will flush regardless of other conditions - * @param tablesToFlush list of tables to flush + * @param isForce if true will flush regardless of other conditions + * @param tablesToFlush list of tables to flush * @param flushStartTime the time when the flush started * @return */ @@ -193,9 +186,7 @@ private CompletableFuture distributeFlush( this.flushWorker); } - /** - * If tracing is enabled, print always else, check if it needs flush or is forceful. - */ + /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) { boolean isNeedFlush = this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 @@ -270,7 +261,7 @@ private CompletableFuture registerFuture() { * * @param isForce * @return Completable future that will return when the blobs are registered successfully, or null - * if none of the conditions is met above + * if none of the conditions is met above */ CompletableFuture flush(boolean isForce) { final long flushStartTime = System.currentTimeMillis(); @@ -311,9 +302,7 @@ CompletableFuture flush(boolean isForce) { return this.statsFuture(); } - /** - * Create the workers for each specific job - */ + /** Create the workers for each specific job */ private void createWorkers() { // Create thread for checking and scheduling flush job ThreadFactory flushThreadFactory = @@ -395,12 +384,12 @@ private void createWorkers() { */ void distributeFlushTasks(Set tablesToFlush) { Iterator< - Map.Entry< - String, ConcurrentHashMap>>> + Map.Entry< + String, ConcurrentHashMap>>> itr = - this.channelCache.entrySet().stream() - .filter(e -> tablesToFlush.contains(e.getKey())) - .iterator(); + this.channelCache.entrySet().stream() + .filter(e -> tablesToFlush.contains(e.getKey())) + .iterator(); List, CompletableFuture>> blobs = new ArrayList<>(); List> leftoverChannelsDataPerTable = new ArrayList<>(); @@ -422,7 +411,8 @@ void distributeFlushTasks(Set tablesToFlush) { >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) { // Create a new blob if the current one already contains max allowed number of chunks logger.logInfo( - "Max allowed number of chunks in the current blob reached. chunkCount={} maxChunkCount={}", + "Max allowed number of chunks in the current blob reached. chunkCount={} " + + " maxChunkCount={}", blobData.size(), this.owningClient.getParameterProvider().getMaxChunksInBlob()); break; @@ -501,54 +491,57 @@ && shouldStopProcessing( long flushStartMs = System.currentTimeMillis(); if (this.owningClient.flushLatency != null) { - latencyTimerContextMap.putIfAbsent(blobPath.fileName, this.owningClient.flushLatency.time()); + latencyTimerContextMap.putIfAbsent( + blobPath.fileName, this.owningClient.flushLatency.time()); } - Supplier supplier = () -> { - try { - BlobMetadata blobMetadata = - buildAndUpload(blobPath, blobData, fullyQualifiedTableName); - blobMetadata.getBlobStats().setFlushStartMs(flushStartMs); - return blobMetadata; - } catch (Throwable e) { - Throwable ex = e.getCause() == null ? e : e.getCause(); - String errorMessage = - String.format( - "Building blob failed, client=%s, blob=%s, exception=%s," - + " detail=%s, trace=%s, all channels in the blob will be" - + " invalidated", - this.owningClient.getName(), - blobPath.fileName, - ex, - ex.getMessage(), - getStackTrace(ex)); - logger.logError(errorMessage); - if (this.owningClient.getTelemetryService() != null) { - this.owningClient - .getTelemetryService() - .reportClientFailure(this.getClass().getSimpleName(), errorMessage); - } - - if (e instanceof IOException) { - invalidateAllChannelsInBlob(blobData, errorMessage); - return null; - } else if (e instanceof NoSuchAlgorithmException) { - throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE); - } else if (e instanceof InvalidAlgorithmParameterException - | e instanceof NoSuchPaddingException - | e instanceof IllegalBlockSizeException - | e instanceof BadPaddingException - | e instanceof InvalidKeyException) { - throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE); - } else { - throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()); - } - } - }; + Supplier supplier = + () -> { + try { + BlobMetadata blobMetadata = + buildAndUpload(blobPath, blobData, fullyQualifiedTableName); + blobMetadata.getBlobStats().setFlushStartMs(flushStartMs); + return blobMetadata; + } catch (Throwable e) { + Throwable ex = e.getCause() == null ? e : e.getCause(); + String errorMessage = + String.format( + "Building blob failed, client=%s, blob=%s, exception=%s," + + " detail=%s, trace=%s, all channels in the blob will be" + + " invalidated", + this.owningClient.getName(), + blobPath.fileName, + ex, + ex.getMessage(), + getStackTrace(ex)); + logger.logError(errorMessage); + if (this.owningClient.getTelemetryService() != null) { + this.owningClient + .getTelemetryService() + .reportClientFailure(this.getClass().getSimpleName(), errorMessage); + } + + if (e instanceof IOException) { + invalidateAllChannelsInBlob(blobData, errorMessage); + return null; + } else if (e instanceof NoSuchAlgorithmException) { + throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE); + } else if (e instanceof InvalidAlgorithmParameterException + | e instanceof NoSuchPaddingException + | e instanceof IllegalBlockSizeException + | e instanceof BadPaddingException + | e instanceof InvalidKeyException) { + throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE); + } else { + throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + }; - blobs.add(new Pair<>( - new BlobData<>(blobPath.fileName, blobData), - CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers))); + blobs.add( + new Pair<>( + new BlobData<>(blobPath.fileName, blobData), + CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers))); logger.logInfo( "buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}", @@ -590,18 +583,18 @@ private boolean shouldStopProcessing( /** * Builds and uploads blob to cloud storage. * - * @param blobPath Path of the destination blob in cloud storage - * @param blobData All the data for one blob. Assumes that all ChannelData in the inner List - * belongs to the same table. Will error if this is not the case + * @param blobPath Path of the destination blob in cloud storage + * @param blobData All the data for one blob. Assumes that all ChannelData in the inner List + * belongs to the same table. Will error if this is not the case * @param fullyQualifiedTableName the table name of the first channel in the blob, only matters in - * Iceberg mode + * Iceberg mode * @return BlobMetadata for FlushService.upload */ BlobMetadata buildAndUpload( BlobPath blobPath, List>> blobData, String fullyQualifiedTableName) throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, - NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, - InvalidKeyException { + NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, + InvalidKeyException { Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency); // Construct the blob along with the metadata of the blob @@ -625,10 +618,10 @@ BlobMetadata buildAndUpload( /** * Upload a blob to Streaming Ingest dedicated stage * - * @param storage the storage to upload the blob - * @param blobPath full path of the blob - * @param blob blob data - * @param metadata a list of chunk metadata + * @param storage the storage to upload the blob + * @param blobPath full path of the blob + * @param blob blob data + * @param metadata a list of chunk metadata * @param blobStats an object to track latencies and other stats of the blob * @return BlobMetadata object used to create the register blob request */ @@ -747,9 +740,7 @@ boolean throttleDueToQueuedFlushTasks() { return throttleOnQueuedTasks; } - /** - * Get whether we're running under test mode - */ + /** Get whether we're running under test mode */ boolean isTestMode() { return this.isTestMode; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java index f69c6fcd7..3a41f38f1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java @@ -5,12 +5,14 @@ package net.snowflake.ingest.streaming.internal; /** - * Interface that represents a storage location to which we should upload data files. - * It is the account's internal stage for snowflake tables, and the table's external volume for iceberg tables. + * Interface that represents a storage location to which we should upload data files. It is the + * account's internal stage for snowflake tables, and the table's external volume for iceberg + * tables. */ interface IStorage { /** * Writes out the byte[] to the path passed in. + * * @param blobPath * @param blob */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java index f147382c6..edd92f939 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java @@ -4,14 +4,10 @@ package net.snowflake.ingest.streaming.internal; -/** - * Interface to manage {@link InternalStage} and {@link ExternalVolume} for {@link FlushService} - */ +/** Interface to manage {@link InternalStage} and {@link ExternalVolume} for {@link FlushService} */ interface IStorageManager { - /** - * Default max upload retries for streaming ingest storage - */ + /** Default max upload retries for streaming ingest storage */ int DEFAULT_MAX_UPLOAD_RETRIES = 5; /** @@ -20,12 +16,11 @@ interface IStorageManager { * @param fullyQualifiedTableName the target fully qualified table name * @return target stage */ - // TODO: Use TableRef everywhere instead of constructing strings and passing them around everywhere + // TODO: Use TableRef everywhere instead of constructing strings and passing them around + // everywhere IStorage getStorage(String fullyQualifiedTableName); - /** - * Informs the storage manager about a new table that's being ingested into by the client. - */ + /** Informs the storage manager about a new table that's being ingested into by the client. */ void registerTable(TableRef tableRef, FileLocationInfo locationInfo); /** @@ -34,7 +29,8 @@ interface IStorageManager { * @param fullyQualifiedTableName The table for which the path must be generated * @return the blob path */ - // TODO: Use TableRef everywhere instead of constructing strings and passing them around everywhere + // TODO: Use TableRef everywhere instead of constructing strings and passing them around + // everywhere BlobPath generateBlobPath(String fullyQualifiedTableName); /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java index ae5123925..984201555 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java @@ -12,7 +12,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; - import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -23,7 +22,6 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; - import net.snowflake.client.core.OCSPMode; import net.snowflake.client.jdbc.SnowflakeFileTransferAgent; import net.snowflake.client.jdbc.SnowflakeFileTransferConfig; @@ -36,9 +34,7 @@ import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; -/** - * Handles uploading files to the Snowflake Streaming Ingest Storage - */ +/** Handles uploading files to the Snowflake Streaming Ingest Storage */ class InternalStage implements IStorage { private static final ObjectMapper mapper = new ObjectMapper(); @@ -75,8 +71,8 @@ class InternalStage implements IStorage { /** * Default constructor * - * @param owningManager the storage manager owning this storage - * @param clientName The client name + * @param owningManager the storage manager owning this storage + * @param clientName The client name * @param fileLocationInfo The file location information from open channel response * @param maxUploadRetries The maximum number of retries to attempt */ @@ -86,11 +82,7 @@ class InternalStage implements IStorage { FileLocationInfo fileLocationInfo, int maxUploadRetries) throws SnowflakeSQLException, IOException { - this( - owningManager, - clientName, - (SnowflakeFileTransferMetadataWithAge) null, - maxUploadRetries); + this(owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries); Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix()); this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo); } @@ -98,9 +90,9 @@ class InternalStage implements IStorage { /** * Constructor for TESTING that takes SnowflakeFileTransferMetadataWithAge as input * - * @param owningManager the storage manager owning this storage - * @param clientName the client name - * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with + * @param owningManager the storage manager owning this storage + * @param clientName the client name + * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with * @param maxUploadRetries the maximum number of retries to attempt */ InternalStage( @@ -212,8 +204,7 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - FileLocationInfo location = - this.owningManager.getRefreshedLocation(Optional.empty()); + FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.empty()); SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location); this.fileTransferMetadataWithAge = metadata; return metadata; @@ -222,8 +213,8 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( FileLocationInfo fileLocationInfo) throws JsonProcessingException, - net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException, - SnowflakeSQLException { + net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException, + SnowflakeSQLException { final SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; if (fileLocationInfo @@ -274,8 +265,7 @@ static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { - FileLocationInfo location = - this.owningManager.getRefreshedLocation(Optional.of(fileName)); + FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.of(fileName)); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) @@ -287,9 +277,9 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) } static net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode - parseFileLocationInfo(FileLocationInfo fileLocationInfo) - throws JsonProcessingException, - net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException { + parseFileLocationInfo(FileLocationInfo fileLocationInfo) + throws JsonProcessingException, + net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException { JsonNode fileLocationInfoNode = mapper.valueToTree(fileLocationInfo); // Currently there are a few mismatches between the client/configure response and what @@ -309,9 +299,7 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) return parseConfigureResponseMapper.readTree(responseString); } - /** - * Upload file to internal stage - */ + /** Upload file to internal stage */ public void put(BlobPath blobPath, byte[] blob) { String filePath = blobPath.fileName; if (this.isLocalFS()) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index 18f434af6..14ca18822 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -7,70 +7,50 @@ import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.util.Calendar; import java.util.Optional; import java.util.TimeZone; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; -/** - * Class to manage single Snowflake internal stage - */ +/** Class to manage single Snowflake internal stage */ class InternalStageManager implements IStorageManager { - /** - * Target stage for the client - */ + /** Target stage for the client */ private final InternalStage targetStage; - /** - * Increasing counter to generate a unique blob name per client - */ + /** Increasing counter to generate a unique blob name per client */ private final AtomicLong counter; - /** - * Whether the manager in test mode - */ + /** Whether the manager in test mode */ private final boolean isTestMode; - /** - * Snowflake service client used for configure calls - */ + /** Snowflake service client used for configure calls */ private final SnowflakeServiceClient snowflakeServiceClient; - /** - * The name of the client - */ + /** The name of the client */ private final String clientName; - /** - * The role of the client - */ + /** The role of the client */ private final String role; - /** - * Client prefix generated by the Snowflake server - */ + /** Client prefix generated by the Snowflake server */ private String clientPrefix; - /** - * Deployment ID generated by the Snowflake server - */ + /** Deployment ID generated by the Snowflake server */ private Long deploymentId; /** * Constructor for InternalStageManager * - * @param isTestMode whether the manager in test mode - * @param role the role of the client - * @param clientName the name of the client + * @param isTestMode whether the manager in test mode + * @param role the role of the client + * @param clientName the name of the client * @param snowflakeServiceClient the Snowflake service client to use for configure calls */ InternalStageManager( @@ -91,10 +71,7 @@ class InternalStageManager implements IStorageManager { this.deploymentId = response.getDeploymentId(); this.targetStage = new InternalStage( - this, - clientName, - response.getStageLocation(), - DEFAULT_MAX_UPLOAD_RETRIES); + this, clientName, response.getStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES); } else { this.clientPrefix = null; this.deploymentId = null; @@ -127,12 +104,11 @@ public InternalStage getStorage(String fullyQualifiedTableName) { } /** - * Informs the storage manager about a new table that's being ingested into by the client. - * Do nothing as there's no per-table state yet for FDN tables (that use internal stages). + * Informs the storage manager about a new table that's being ingested into by the client. Do + * nothing as there's no per-table state yet for FDN tables (that use internal stages). */ @Override - public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { - } + public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {} /** * Gets the latest file location info (with a renewed short-lived access token) for the specified @@ -171,16 +147,16 @@ FileLocationInfo getRefreshedLocation(Optional fileName) { */ @Override public BlobPath generateBlobPath(String fullyQualifiedTableName) { - // the table name argument is not going to be used in internal stages since we don't have per table paths. - // For external volumes (in iceberg), the blob path has a per-table element in it, thus the other implementation + // the table name argument is not going to be used in internal stages since we don't have per + // table paths. + // For external volumes (in iceberg), the blob path has a per-table element in it, thus the + // other implementation // of IStorageManager does end up using this argument. Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); return BlobPath.fileNameWithoutToken(getNextFileName(calendar, this.clientPrefix)); } - /** - * For TESTING - */ + /** For TESTING */ @VisibleForTesting public String getNextFileName(Calendar calendar, String clientPrefix) { if (this.isTestMode && clientPrefix == null) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeFileTransferMetadataWithAge.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeFileTransferMetadataWithAge.java index 9102e5b54..4d9ba1af9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeFileTransferMetadataWithAge.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeFileTransferMetadataWithAge.java @@ -4,9 +4,8 @@ package net.snowflake.ingest.streaming.internal; -import net.snowflake.client.jdbc.SnowflakeFileTransferMetadataV1; - import java.util.Optional; +import net.snowflake.client.jdbc.SnowflakeFileTransferMetadataV1; /** * Wrapper class containing SnowflakeFileTransferMetadata and the timestamp at which the metadata @@ -36,4 +35,4 @@ state to record unknown age. this.localLocation = localLocation; this.timestamp = timestamp; } -} \ No newline at end of file +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index ee8b08eab..be6b192b0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -380,10 +380,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest // Add channel to the channel cache this.channelCache.addChannel(channel); this.storageManager.registerTable( - new TableRef( - response.getDBName(), - response.getSchemaName(), - response.getTableName()), + new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), response.getExternalVolumeLocation()); return channel; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/TableRef.java b/src/main/java/net/snowflake/ingest/streaming/internal/TableRef.java index 7c377db3f..1d15d38a7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/TableRef.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/TableRef.java @@ -4,13 +4,13 @@ package net.snowflake.ingest.streaming.internal; -import net.snowflake.ingest.utils.Utils; - import javax.annotation.Nonnull; +import net.snowflake.ingest.utils.Utils; /** - * Class to carry around the table pointer across the SDK codebase. This is being retrofitted into places that - * used to work with a fullyQualifiedTableName string. Can be used as a key in maps (has equals/hashcode implemented) + * Class to carry around the table pointer across the SDK codebase. This is being retrofitted into + * places that used to work with a fullyQualifiedTableName string. Can be used as a key in maps (has + * equals/hashcode implemented) */ class TableRef { final String dbName; @@ -44,7 +44,9 @@ public boolean equals(Object obj) { } final TableRef other = (TableRef) obj; - return this.dbName.equals(other.dbName) && this.schemaName.equals(other.schemaName) && this.tableName.equals(other.tableName); + return this.dbName.equals(other.dbName) + && this.schemaName.equals(other.schemaName) + && this.tableName.equals(other.tableName); } @Override diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java index 9999848ba..e6335de0f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java @@ -29,7 +29,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - import net.snowflake.client.core.HttpUtil; import net.snowflake.client.core.OCSPMode; import net.snowflake.client.core.SFSessionProperty; @@ -300,13 +299,9 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { InternalStage stage = new InternalStage<>( - storageManager, - "clientName", - (SnowflakeFileTransferMetadataWithAge) null, - 1); + storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); - SnowflakeFileTransferMetadataWithAge metadataWithAge = - stage.refreshSnowflakeMetadata(true); + SnowflakeFileTransferMetadataWithAge metadataWithAge = stage.refreshSnowflakeMetadata(true); final ArgumentCaptor endpointCaptor = ArgumentCaptor.forClass(String.class); final ArgumentCaptor stringCaptor = ArgumentCaptor.forClass(String.class); @@ -396,10 +391,7 @@ public void testFetchSignedURL() throws Exception { InternalStage stage = new InternalStage( - storageManager, - "clientName", - (SnowflakeFileTransferMetadataWithAge) null, - 1); + storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); SnowflakeFileTransferMetadataV1 metadata = stage.fetchSignedURL("path/fileName"); @@ -441,10 +433,7 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { InternalStage stage = new InternalStage<>( - storageManager, - "clientName", - (SnowflakeFileTransferMetadataWithAge) null, - 1); + storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); ThreadFactory buildUploadThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-build-upload-thread-%d").build();