diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java index cef6e631c..034b4a6f0 100644 --- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java +++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2017 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.connection; @@ -42,7 +42,8 @@ public enum ApiName { STREAMING_DROP_CHANNEL("POST"), STREAMING_CHANNEL_STATUS("POST"), STREAMING_REGISTER_BLOB("POST"), - STREAMING_CLIENT_CONFIGURE("POST"); + STREAMING_CLIENT_CONFIGURE("POST"), + STREAMING_CHANNEL_CONFIGURE("POST"); private final String httpMethod; private ApiName(String httpMethod) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureCallHandler.java b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureCallHandler.java new file mode 100644 index 000000000..28398f709 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureCallHandler.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; +import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.connection.ServiceResponseHandler; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; + +/** A class that is used to configure a Snowflake Ingest Storage */ +class ConfigureCallHandler { + + // Object mapper for creating payload, ignore null fields + private static final ObjectMapper mapper = + new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + + private final CloseableHttpClient httpClient; + private final RequestBuilder requestBuilder; + private final ServiceResponseHandler.ApiName apiName; + private final String configureEndpoint; + private final String role; + private final String database; + private final String schema; + private final String table; + + /** + * Builder method to create a {@link ConfigureCallHandler} + * + * @param httpClient the HTTP client + * @param requestBuilder the request builder + * @param apiName the API name, used for logging + * @param configureEndpoint the configure endpoint + * @return a {@link ConfigureCallHandlerBuilder} object + */ + static ConfigureCallHandlerBuilder builder( + CloseableHttpClient httpClient, + RequestBuilder requestBuilder, + ServiceResponseHandler.ApiName apiName, + String configureEndpoint) { + return new ConfigureCallHandlerBuilder(httpClient, requestBuilder, apiName, configureEndpoint); + } + + /** Builder class to build a {@link ConfigureCallHandler} */ + static class ConfigureCallHandlerBuilder { + private final CloseableHttpClient httpClient; + private final RequestBuilder requestBuilder; + private final ServiceResponseHandler.ApiName ApiName; + private final String configureEndpoint; + private String role; + private String database; + private String schema; + private String table; + private boolean isTestMode; + + /** + * Constructor for ConfigureCallHandlerBuilder + * + * @param httpClient the HTTP client + * @param requestBuilder the request builder + * @param apiName the API name, used for logging + * @param configureEndpoint the configure endpoint + */ + ConfigureCallHandlerBuilder( + CloseableHttpClient httpClient, + RequestBuilder requestBuilder, + ServiceResponseHandler.ApiName apiName, + String configureEndpoint) { + this.httpClient = httpClient; + this.requestBuilder = requestBuilder; + this.ApiName = apiName; + this.configureEndpoint = configureEndpoint; + } + + public ConfigureCallHandlerBuilder setRole(String role) { + this.role = role; + return this; + } + + public ConfigureCallHandlerBuilder setDatabase(String database) { + this.database = database; + return this; + } + + public ConfigureCallHandlerBuilder setSchema(String schema) { + this.schema = schema; + return this; + } + + public ConfigureCallHandlerBuilder setTable(String table) { + this.table = table; + return this; + } + + public ConfigureCallHandlerBuilder setIsTestMode(boolean isTestMode) { + this.isTestMode = isTestMode; + return this; + } + + public ConfigureCallHandler build() { + return new ConfigureCallHandler(this); + } + } + + ConfigureCallHandler(ConfigureCallHandlerBuilder builder) { + if (!builder.isTestMode) { + Utils.assertNotNull("http client", builder.httpClient); + Utils.assertNotNull("request builder", builder.requestBuilder); + Utils.assertNotNull("api name", builder.ApiName); + Utils.assertStringNotNullOrEmpty("configure endpoint", builder.configureEndpoint); + } + + this.httpClient = builder.httpClient; + this.requestBuilder = builder.requestBuilder; + this.apiName = builder.ApiName; + this.configureEndpoint = builder.configureEndpoint; + this.role = builder.role; + this.database = builder.database; + this.schema = builder.schema; + this.table = builder.table; + } + + /** + * Make a configure call to the Snowflake service + * + * @return the configure response + * @throws IOException + */ + ConfigureResponse makeConfigureCall() throws IOException { + return makeConfigureCall(makeConfigurePayload()); + } + + /** + * Make a configure call to the Snowflake service with a file name, used for GCS + * + * @param fileName the file name + * @return the configure response + * @throws IOException + */ + ConfigureResponse makeConfigureCall(String fileName) throws IOException { + Map payload = makeConfigurePayload(); + payload.put("file_name", fileName); + return makeConfigureCall(payload); + } + + private ConfigureResponse makeConfigureCall(Map payload) throws IOException { + try { + ConfigureResponse response = + executeWithRetries( + ConfigureResponse.class, + this.configureEndpoint, + mapper.writeValueAsString(payload), + "client configure", + this.apiName, + this.httpClient, + this.requestBuilder); + + // Check for Snowflake specific response code + if (response.getStatusCode() != RESPONSE_SUCCESS) { + throw new SFException(ErrorCode.CONFIGURE_FAILURE, response.getMessage()); + } + return response; + } catch (IngestResponseException e) { + throw new SFException(e, ErrorCode.CONFIGURE_FAILURE, e.getMessage()); + } + } + + private Map makeConfigurePayload() { + Map payload = new HashMap<>(); + payload.put("role", this.role); + payload.put("database", this.database); + payload.put("schema", this.schema); + payload.put("table", this.table); + return payload; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java new file mode 100644 index 000000000..32414fb53 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE; +import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT; + +import java.io.IOException; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; + +/** Class to manage multiple external volumes */ +class ExternalVolumeManager implements StorageManager { + + // Reference to the external volume per table + private final Map externalVolumeMap; + private final SnowflakeStreamingIngestClientInternal owningClient; + + private final boolean isTestMode; + + /** + * Constructor for ExternalVolumeManager + * + * @param isTestMode whether the manager in test mode + * @param client the owning client + */ + ExternalVolumeManager(boolean isTestMode, SnowflakeStreamingIngestClientInternal client) { + this.owningClient = client; + this.isTestMode = isTestMode; + this.externalVolumeMap = new ConcurrentHashMap<>(); + } + + /** + * Given a blob, return the target storage by looking up the table name from the channel context + * + * @param blobData the blob to upload + * @return target storage + */ + @Override + public StreamingIngestStorage getStorage(List>> blobData) { + // Only one chunk per blob in Iceberg mode. + ChannelFlushContext channelContext = blobData.get(0).get(0).getChannelContext(); + StreamingIngestStorage stage = + this.externalVolumeMap.get(channelContext.getFullyQualifiedTableName()); + + if (stage == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format( + "No storage found for table %s", channelContext.getFullyQualifiedTableName())); + } + + return stage; + } + + /** + * Add a storage to the manager by looking up the table name from the open channel response + * + * @param openChannelResponse response from open channel + */ + @Override + public void addStorage(OpenChannelResponse openChannelResponse) { + String fullyQualifiedTableName = + String.format( + "%s.%s.%s", + openChannelResponse.getDBName(), + openChannelResponse.getSchemaName(), + openChannelResponse.getTableName()); + if (!this.externalVolumeMap.containsKey(fullyQualifiedTableName)) { + try { + ConfigureCallHandler configureCallHandler = + ConfigureCallHandler.builder( + this.owningClient.getHttpClient(), + this.owningClient.getRequestBuilder(), + STREAMING_CHANNEL_CONFIGURE, + CHANNEL_CONFIGURE_ENDPOINT) + .setRole(this.owningClient.getRole()) + .setDatabase(openChannelResponse.getDBName()) + .setSchema(openChannelResponse.getSchemaName()) + .setTable(openChannelResponse.getTableName()) + .build(); + this.externalVolumeMap.put( + fullyQualifiedTableName, + new StreamingIngestStorage( + isTestMode, + configureCallHandler, + this.owningClient.getName(), + DEFAULT_MAX_UPLOAD_RETRIES)); + } catch (SnowflakeSQLException | IOException err) { + throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE); + } + } + } + + // TODO: SNOW-1502887 Blob path generation for iceberg table + @Override + public String generateBlobPath() { + return "snow_dummy_file_name"; + } + + // TODO: SNOW-1502887 Blob path generation for iceberg table + @Override + public String getBlobPath(Calendar calendar, String clientPrefix) { + return ""; + } + + /** + * Get the client prefix from first external volume in the map + * + * @return the client prefix + */ + @Override + public String getClientPrefix() { + if (this.externalVolumeMap.isEmpty()) { + return null; + } + return this.externalVolumeMap.values().iterator().next().getClientPrefix(); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 05d72433d..b1de58431 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -4,7 +4,6 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; import static net.snowflake.ingest.utils.Constants.DISABLE_BACKGROUND_FLUSH; import static net.snowflake.ingest.utils.Constants.MAX_BLOB_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.MAX_THREAD_COUNT; @@ -19,13 +18,11 @@ import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Calendar; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.TimeZone; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -34,11 +31,9 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; -import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; @@ -84,9 +79,6 @@ List>> getData() { private static final Logging logger = new Logging(FlushService.class); - // Increasing counter to generate a unique blob name per client - private final AtomicLong counter; - // The client that owns this flush service private final SnowflakeStreamingIngestClientInternal owningClient; @@ -102,8 +94,8 @@ List>> getData() { // Reference to the channel cache private final ChannelCache channelCache; - // Reference to the Streaming Ingest stage - private final StreamingIngestStage targetStage; + // Reference to the Stream Ingest storage manager + private final StorageManager storageManager; // Reference to register service private final RegisterService registerService; @@ -124,21 +116,21 @@ List>> getData() { private final Constants.BdecVersion bdecVersion; /** - * Constructor for TESTING that takes (usually mocked) StreamingIngestStage + * Default constructor * - * @param client - * @param cache - * @param isTestMode + * @param client the owning client + * @param cache the channel cache + * @param storageManager the storage manager + * @param isTestMode whether the service is running in test mode */ FlushService( SnowflakeStreamingIngestClientInternal client, ChannelCache cache, - StreamingIngestStage targetStage, // For testing + StorageManager storageManager, boolean isTestMode) { this.owningClient = client; this.channelCache = cache; - this.targetStage = targetStage; - this.counter = new AtomicLong(0); + this.storageManager = storageManager; this.registerService = new RegisterService<>(client, isTestMode); this.isNeedFlush = false; this.lastFlushTime = System.currentTimeMillis(); @@ -148,40 +140,6 @@ List>> getData() { createWorkers(); } - /** - * Default constructor - * - * @param client - * @param cache - * @param isTestMode - */ - FlushService( - SnowflakeStreamingIngestClientInternal client, ChannelCache cache, boolean isTestMode) { - this.owningClient = client; - this.channelCache = cache; - try { - this.targetStage = - new StreamingIngestStage( - isTestMode, - client.getRole(), - client.getHttpClient(), - client.getRequestBuilder(), - client.getName(), - DEFAULT_MAX_UPLOAD_RETRIES); - } catch (SnowflakeSQLException | IOException err) { - throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE); - } - - this.registerService = new RegisterService<>(client, isTestMode); - this.counter = new AtomicLong(0); - this.isNeedFlush = false; - this.lastFlushTime = System.currentTimeMillis(); - this.isTestMode = isTestMode; - this.latencyTimerContextMap = new ConcurrentHashMap<>(); - this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); - createWorkers(); - } - /** * Updates performance stats enabled * @@ -359,11 +317,17 @@ void distributeFlushTasks() { itr = this.channelCache.iterator(); List, CompletableFuture>> blobs = new ArrayList<>(); List> leftoverChannelsDataPerTable = new ArrayList<>(); + boolean isBlobCreated = true; + String currentBlobPath = null; while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) { List>> blobData = new ArrayList<>(); float totalBufferSizeInBytes = 0F; - final String blobPath = getBlobPath(this.targetStage.getClientPrefix()); + if (isBlobCreated) { + currentBlobPath = this.storageManager.generateBlobPath(); + } + isBlobCreated = false; + final String blobPath = currentBlobPath; // Distribute work at table level, split the blob if reaching the blob size limit or the // channel has different encryption key ids @@ -444,11 +408,8 @@ && shouldStopProcessing( } // Kick off a build job - if (blobData.isEmpty()) { - // we decrement the counter so that we do not have gaps in the blob names created by this - // client. See method getBlobPath() below. - this.counter.decrementAndGet(); - } else { + if (!blobData.isEmpty()) { + isBlobCreated = true; long flushStartMs = System.currentTimeMillis(); if (this.owningClient.flushLatency != null) { latencyTimerContextMap.putIfAbsent(blobPath, this.owningClient.flushLatency.time()); @@ -560,12 +521,18 @@ BlobMetadata buildAndUpload(String blobPath, List>> blobData blob.blobStats.setBuildDurationMs(buildContext); - return upload(blobPath, blob.blobBytes, blob.chunksMetadataList, blob.blobStats); + return upload( + this.storageManager.getStorage(blobData), + blobPath, + blob.blobBytes, + blob.chunksMetadataList, + blob.blobStats); } /** * Upload a blob to Streaming Ingest dedicated stage * + * @param storage the storage to upload the blob * @param blobPath full path of the blob * @param blob blob data * @param metadata a list of chunk metadata @@ -573,13 +540,17 @@ BlobMetadata buildAndUpload(String blobPath, List>> blobData * @return BlobMetadata object used to create the register blob request */ BlobMetadata upload( - String blobPath, byte[] blob, List metadata, BlobStats blobStats) + StreamingIngestStorage storage, + String blobPath, + byte[] blob, + List metadata, + BlobStats blobStats) throws NoSuchAlgorithmException { logger.logInfo("Start uploading blob={}, size={}", blobPath, blob.length); long startTime = System.currentTimeMillis(); Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency); - this.targetStage.put(blobPath, blob); + storage.put(blobPath, blob); if (uploadContext != null) { blobStats.setUploadDurationMs(uploadContext); @@ -636,45 +607,6 @@ void setNeedFlush() { this.isNeedFlush = true; } - /** - * Generate a blob path, which is: "YEAR/MONTH/DAY_OF_MONTH/HOUR_OF_DAY/MINUTE/.BDEC" - * - * @return the generated blob file path - */ - private String getBlobPath(String clientPrefix) { - Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - return getBlobPath(calendar, clientPrefix); - } - - /** For TESTING */ - String getBlobPath(Calendar calendar, String clientPrefix) { - if (isTestMode && clientPrefix == null) { - clientPrefix = "testPrefix"; - } - - Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix); - int year = calendar.get(Calendar.YEAR); - int month = calendar.get(Calendar.MONTH) + 1; // Gregorian calendar starts from 0 - int day = calendar.get(Calendar.DAY_OF_MONTH); - int hour = calendar.get(Calendar.HOUR_OF_DAY); - int minute = calendar.get(Calendar.MINUTE); - long time = TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis()); - long threadId = Thread.currentThread().getId(); - // Create the blob short name, the clientPrefix contains the deployment id - String blobShortName = - Long.toString(time, 36) - + "_" - + clientPrefix - + "_" - + threadId - + "_" - + this.counter.getAndIncrement() - + "." - + BLOB_EXTENSION_TYPE; - return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + blobShortName; - } - /** * Invalidate all the channels in the blob data * @@ -700,7 +632,7 @@ void invalidateAllChannelsInBlob( /** Get the server generated unique prefix for this client */ String getClientPrefix() { - return this.targetStage.getClientPrefix(); + return this.storageManager.getClientPrefix(); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java new file mode 100644 index 000000000..c7d74cc52 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; +import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; +import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; + +/** Class to manage single Snowflake internal stage */ +class InternalStageManager implements StorageManager { + // Target stage for the client + private final StreamingIngestStorage targetStage; + + // Increasing counter to generate a unique blob name per client + private final AtomicLong counter; + + // Whether the manager in test mode + private final boolean isTestMode; + + /** + * Constructor for InternalStageManager + * + * @param isTestMode whether the manager in test mode + * @param client the owning client + */ + InternalStageManager(boolean isTestMode, SnowflakeStreamingIngestClientInternal client) { + ConfigureCallHandler configureCallHandler = + ConfigureCallHandler.builder( + client.getHttpClient(), + client.getRequestBuilder(), + STREAMING_CLIENT_CONFIGURE, + CLIENT_CONFIGURE_ENDPOINT) + .setRole(client.getRole()) + .setIsTestMode(isTestMode) + .build(); + this.isTestMode = isTestMode; + this.counter = new AtomicLong(0); + try { + targetStage = + new StreamingIngestStorage( + isTestMode, configureCallHandler, client.getName(), DEFAULT_MAX_UPLOAD_RETRIES); + } catch (SnowflakeSQLException | IOException err) { + throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE); + } + } + + /** + * Get the storage. In this case, the storage is always the target stage as there's only one stage + * in non-iceberg mode. + * + * @param blobData this parameter does not affect the method outcome + * @return the target storage + */ + @Override + @SuppressWarnings("unused") + public StreamingIngestStorage getStorage(List>> blobData) { + // There's always only one stage for the client in non-iceberg mode + return targetStage; + } + + /** + * Add storage to the manager. Should not be called in non-iceberg mode. + * + * @param openChannelResponse this parameter does not affect the instance + */ + @Override + public void addStorage(OpenChannelResponse openChannelResponse) {} + + /** + * Generate a blob path, which is: "YEAR/MONTH/DAY_OF_MONTH/HOUR_OF_DAY/MINUTE/.BDEC" + * + * @return the generated blob file path + */ + @Override + public String generateBlobPath() { + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + return getBlobPath(calendar, this.targetStage.getClientPrefix()); + } + + /** + * Get the unique client prefix generated by the Snowflake server + * + * @return the client prefix + */ + @Override + public String getClientPrefix() { + return this.targetStage.getClientPrefix(); + } + + /** For TESTING */ + @VisibleForTesting + public String getBlobPath(Calendar calendar, String clientPrefix) { + if (this.isTestMode && clientPrefix == null) { + clientPrefix = "testPrefix"; + } + + Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix); + int year = calendar.get(Calendar.YEAR); + int month = calendar.get(Calendar.MONTH) + 1; // Gregorian calendar starts from 0 + int day = calendar.get(Calendar.DAY_OF_MONTH); + int hour = calendar.get(Calendar.HOUR_OF_DAY); + int minute = calendar.get(Calendar.MINUTE); + long time = TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis()); + long threadId = Thread.currentThread().getId(); + // Create the blob short name, the clientPrefix contains the deployment id + String blobShortName = + Long.toString(time, 36) + + "_" + + clientPrefix + + "_" + + threadId + + "_" + + this.counter.getAndIncrement() + + "." + + BLOB_EXTENSION_TYPE; + return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + blobShortName; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 1de473da6..1831c5f36 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -121,6 +121,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Reference to the flush service private final FlushService flushService; + // Reference to storage manager + private final StorageManager storageManager; + // Indicates whether the client has closed private volatile boolean isClosed; @@ -238,8 +241,14 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.setupMetricsForClient(); } + this.storageManager = + isIcebergMode + ? new ExternalVolumeManager<>(isTestMode, this) + : new InternalStageManager<>(isTestMode, this); + try { - this.flushService = new FlushService<>(this, this.channelCache, this.isTestMode); + this.flushService = + new FlushService<>(this, this.channelCache, this.storageManager, this.isTestMode); } catch (Exception e) { // Need to clean up the resources before throwing any exceptions cleanUpResources(); @@ -399,6 +408,9 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest // Add channel to the channel cache this.channelCache.addChannel(channel); + // Add storage to the storage manager, only for external volume + this.storageManager.addStorage(response); + return channel; } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java new file mode 100644 index 000000000..aca34e72a --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Calendar; +import java.util.List; + +/** Interface to manage {@link StreamingIngestStorage} for {@link FlushService} */ +interface StorageManager { + // Default max upload retries for streaming ingest storage + int DEFAULT_MAX_UPLOAD_RETRIES = 5; + + /** + * Given a blob, return the target storage + * + * @param blobData the blob to upload + * @return target stage + */ + StreamingIngestStorage getStorage(List>> blobData); + + /** + * Add a storage to the manager + * + * @param openChannelResponse response from open channel + */ + void addStorage(OpenChannelResponse openChannelResponse); + + /** + * Generate a unique blob path for the blob + * + * @return the blob path + */ + String generateBlobPath(); + + /** + * Get the blob path given time and client prefix, used for testing only + * + * @param calendar the time + * @param clientPrefix the client prefix + * @return the blob path + */ + @VisibleForTesting + String getBlobPath(Calendar calendar, String clientPrefix); + + /** + * Get the unique client prefix generated by the Snowflake server + * + * @return the client prefix + */ + String getClientPrefix(); +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java similarity index 80% rename from src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java rename to src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java index 9752b311c..13bac5386 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java @@ -1,13 +1,9 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; -import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; -import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; import static net.snowflake.ingest.utils.HttpUtil.generateProxyPropertiesForJDBC; import static net.snowflake.ingest.utils.Utils.getStackTrace; @@ -21,8 +17,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -33,16 +27,13 @@ import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.client.jdbc.cloud.storage.StageInfo; import net.snowflake.client.jdbc.internal.apache.commons.io.FileUtils; -import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; -import net.snowflake.ingest.connection.IngestResponseException; -import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; -/** Handles uploading files to the Snowflake Streaming Ingest Stage */ -class StreamingIngestStage { +/** Handles uploading files to the Snowflake Streaming Ingest Storage */ +class StreamingIngestStorage { private static final ObjectMapper mapper = new ObjectMapper(); // Object mapper for parsing the client/configure response to Jackson version the same as @@ -54,7 +45,7 @@ class StreamingIngestStage { private static final long REFRESH_THRESHOLD_IN_MS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); - private static final Logging logger = new Logging(StreamingIngestStage.class); + private static final Logging logger = new Logging(StreamingIngestStorage.class); /** * Wrapper class containing SnowflakeFileTransferMetadata and the timestamp at which the metadata @@ -86,9 +77,7 @@ state to record unknown age. } private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; - private final CloseableHttpClient httpClient; - private final RequestBuilder requestBuilder; - private final String role; + private final ConfigureCallHandler configureCallHandler; private final String clientName; private String clientPrefix; @@ -97,17 +86,23 @@ state to record unknown age. // Proxy parameters that we set while calling the Snowflake JDBC to upload the streams private final Properties proxyProperties; - StreamingIngestStage( + /** + * Default constructor for internal stage + * + * @param isTestMode Whether it's in test mode + * @param configureCallHandler The ConfigureCallHandler to use + * @param clientName The client name + * @param maxUploadRetries The maximum number of retries to attempt + * @throws SnowflakeSQLException + * @throws IOException + */ + StreamingIngestStorage( boolean isTestMode, - String role, - CloseableHttpClient httpClient, - RequestBuilder requestBuilder, + ConfigureCallHandler configureCallHandler, String clientName, int maxUploadRetries) throws SnowflakeSQLException, IOException { - this.httpClient = httpClient; - this.role = role; - this.requestBuilder = requestBuilder; + this.configureCallHandler = configureCallHandler; this.clientName = clientName; this.proxyProperties = generateProxyPropertiesForJDBC(); this.maxUploadRetries = maxUploadRetries; @@ -116,26 +111,43 @@ state to record unknown age. } } + /** + * Default constructor for external volume + * + * @param configureCallHandler The ConfigureCallHandler to use + * @param clientName The client name + * @param fileLocationInfo The file location information from open channel response + * @param maxUploadRetries The maximum number of retries to attempt + */ + StreamingIngestStorage( + ConfigureCallHandler configureCallHandler, + String clientName, + FileLocationInfo fileLocationInfo, + int maxUploadRetries) + throws SnowflakeSQLException, IOException { + this.configureCallHandler = configureCallHandler; + this.clientName = clientName; + this.proxyProperties = generateProxyPropertiesForJDBC(); + this.maxUploadRetries = maxUploadRetries; + createFileTransferMetadataWithAge(fileLocationInfo); + } + /** * Constructor for TESTING that takes SnowflakeFileTransferMetadataWithAge as input * * @param isTestMode must be true - * @param role Snowflake role used by the Client - * @param httpClient http client reference - * @param requestBuilder request builder to build the HTTP request + * @param configureCallHandler the ConfigureCallHandler to use * @param clientName the client name * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with */ - StreamingIngestStage( + StreamingIngestStorage( boolean isTestMode, - String role, - CloseableHttpClient httpClient, - RequestBuilder requestBuilder, + ConfigureCallHandler configureCallHandler, String clientName, SnowflakeFileTransferMetadataWithAge testMetadata, int maxRetryCount) throws SnowflakeSQLException, IOException { - this(isTestMode, role, httpClient, requestBuilder, clientName, maxRetryCount); + this(isTestMode, configureCallHandler, clientName, maxRetryCount); if (!isTestMode) { throw new SFException(ErrorCode.INTERNAL_ERROR); } @@ -250,26 +262,29 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - Map payload = new HashMap<>(); - payload.put("role", this.role); - ConfigureResponse response = this.makeClientConfigureCall(payload); - + ConfigureResponse response = this.configureCallHandler.makeConfigureCall(); // Do not change the prefix everytime we have to refresh credentials if (Utils.isNullOrEmpty(this.clientPrefix)) { this.clientPrefix = createClientPrefix(response); } + return createFileTransferMetadataWithAge(response.getStageLocation()); + } + + private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( + FileLocationInfo fileLocationInfo) + throws JsonProcessingException, + net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException, + SnowflakeSQLException { Utils.assertStringNotNullOrEmpty("client prefix", this.clientPrefix); - if (response - .getStageLocation() + if (fileLocationInfo .getLocationType() .replaceAll( "^[\"]|[\"]$", "") // Replace the first and last character if they're double quotes .equals(StageInfo.StageType.LOCAL_FS.name())) { this.fileTransferMetadataWithAge = new SnowflakeFileTransferMetadataWithAge( - response - .getStageLocation() + fileLocationInfo .getLocation() .replaceAll( "^[\"]|[\"]$", @@ -280,7 +295,7 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole new SnowflakeFileTransferMetadataWithAge( (SnowflakeFileTransferMetadataV1) SnowflakeFileTransferAgent.getFileTransferMetadatas( - parseClientConfigureResponse(response)) + parseFileLocationInfo(fileLocationInfo)) .get(0), Optional.of(System.currentTimeMillis())); } @@ -297,7 +312,7 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole * @return the client prefix. */ private String createClientPrefix(final ConfigureResponse response) { - final String prefix = response.getPrefix(); + final String prefix = response.getPrefix() == null ? "" : response.getPrefix(); final String deploymentId = response.getDeploymentId() != null ? "_" + response.getDeploymentId() : ""; return prefix + deploymentId; @@ -312,15 +327,12 @@ private String createClientPrefix(final ConfigureResponse response) { SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { - Map payload = new HashMap<>(); - payload.put("role", this.role); - payload.put("file_name", fileName); - ConfigureResponse response = this.makeClientConfigureCall(payload); + ConfigureResponse response = this.configureCallHandler.makeConfigureCall(fileName); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) SnowflakeFileTransferAgent.getFileTransferMetadatas( - parseClientConfigureResponse(response)) + parseFileLocationInfo(response.getStageLocation())) .get(0); // Transfer agent trims path for fileName metadata.setPresignedUrlFileName(fileName); @@ -328,51 +340,28 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) } private net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode - parseClientConfigureResponse(ConfigureResponse response) + parseFileLocationInfo(FileLocationInfo fileLocationInfo) throws JsonProcessingException, net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException { - JsonNode responseNode = mapper.valueToTree(response); + JsonNode fileLocationInfoNode = mapper.valueToTree(fileLocationInfo); // Currently there are a few mismatches between the client/configure response and what // SnowflakeFileTransferAgent expects - ObjectNode mutable = (ObjectNode) responseNode; - mutable.putObject("data"); - ObjectNode dataNode = (ObjectNode) mutable.get("data"); - dataNode.set("stageInfo", responseNode.get("stage_location")); + + ObjectNode node = mapper.createObjectNode(); + node.putObject("data"); + ObjectNode dataNode = (ObjectNode) node.get("data"); + dataNode.set("stageInfo", fileLocationInfoNode); // JDBC expects this field which maps to presignedFileUrlName. We will set this later dataNode.putArray("src_locations").add("placeholder"); // use String as intermediate object to avoid Jackson version mismatch // TODO: SNOW-1493470 Align Jackson version - String responseString = mapper.writeValueAsString(responseNode); + String responseString = mapper.writeValueAsString(node); return parseConfigureResponseMapper.readTree(responseString); } - private ConfigureResponse makeClientConfigureCall(Map payload) - throws IOException { - try { - - ConfigureResponse response = - executeWithRetries( - ConfigureResponse.class, - CLIENT_CONFIGURE_ENDPOINT, - mapper.writeValueAsString(payload), - "client configure", - STREAMING_CLIENT_CONFIGURE, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - throw new SFException(ErrorCode.CLIENT_CONFIGURE_FAILURE, response.getMessage()); - } - return response; - } catch (IngestResponseException e) { - throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); - } - } - /** * Upload file to internal stage * diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 3579e4d24..3d09d9a2a 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.utils; @@ -52,6 +52,7 @@ public class Constants { public static final String BLOB_EXTENSION_TYPE = "bdec"; public static final int MAX_THREAD_COUNT = Integer.MAX_VALUE; public static final String CLIENT_CONFIGURE_ENDPOINT = "/v1/streaming/client/configure/"; + public static final String CHANNEL_CONFIGURE_ENDPOINT = "/v1/streaming/channels/configure/"; public static final int COMMIT_MAX_RETRY_COUNT = 60; public static final int COMMIT_RETRY_INTERVAL_IN_MS = 1000; public static final String ENCRYPTION_ALGORITHM = "AES/CTR/NoPadding"; diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index b863717e9..f0d26de5e 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.utils; @@ -14,7 +14,7 @@ public enum ErrorCode { REGISTER_BLOB_FAILURE("0006"), OPEN_CHANNEL_FAILURE("0007"), BUILD_REQUEST_FAILURE("0008"), - CLIENT_CONFIGURE_FAILURE("0009"), + CONFIGURE_FAILURE("0009"), MISSING_CONFIG("0010"), BLOB_UPLOAD_FAILURE("0011"), RESOURCE_CLEANUP_FAILURE("0012"), diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 5a64e05a0..8778977fe 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -93,7 +93,8 @@ private abstract static class TestContext implements AutoCloseable { ChannelCache channelCache; final Map> channels = new HashMap<>(); FlushService flushService; - StreamingIngestStage stage; + StorageManager storageManager; + StreamingIngestStorage storage; ParameterProvider parameterProvider; InternalParameterProvider internalParameterProvider; RegisterService registerService; @@ -101,17 +102,19 @@ private abstract static class TestContext implements AutoCloseable { final List> channelData = new ArrayList<>(); TestContext() { - stage = Mockito.mock(StreamingIngestStage.class); - Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); + storage = Mockito.mock(StreamingIngestStorage.class); + Mockito.when(storage.getClientPrefix()).thenReturn("client_prefix"); parameterProvider = new ParameterProvider(isIcebergMode); internalParameterProvider = new InternalParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); + storageManager = Mockito.spy(new InternalStageManager<>(true, client)); + Mockito.when(storageManager.getStorage(ArgumentMatchers.any())).thenReturn(storage); channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); registerService = Mockito.spy(new RegisterService(client, client.isTestMode())); - flushService = Mockito.spy(new FlushService<>(client, channelCache, stage, true)); + flushService = Mockito.spy(new FlushService<>(client, channelCache, storageManager, true)); } ChannelData flushChannel(String name) { @@ -408,10 +411,10 @@ private static ColumnMetadata createLargeTestTextColumn(String name) { @Test public void testGetFilePath() { TestContext testContext = testContextFactory.create(); - FlushService flushService = testContext.flushService; + StorageManager storageManager = testContext.storageManager; Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; - String outputString = flushService.getBlobPath(calendar, clientPrefix); + String outputString = storageManager.getBlobPath(calendar, clientPrefix); Path outputPath = Paths.get(outputString); Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); Assert.assertTrue( @@ -785,12 +788,15 @@ public void testBuildAndUpload() throws Exception { .build(); // Check FlushService.upload called with correct arguments + final ArgumentCaptor storageCaptor = + ArgumentCaptor.forClass(StreamingIngestStorage.class); final ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class); final ArgumentCaptor blobCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor> metadataCaptor = ArgumentCaptor.forClass(List.class); Mockito.verify(testContext.flushService) .upload( + storageCaptor.capture(), nameCaptor.capture(), blobCaptor.capture(), metadataCaptor.capture(), @@ -933,10 +939,12 @@ public void testInvalidateChannels() { innerData.add(channel1Data); innerData.add(channel2Data); - StreamingIngestStage stage = Mockito.mock(StreamingIngestStage.class); + StreamingIngestStorage stage = Mockito.mock(StreamingIngestStorage.class); Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); + StorageManager storageManager = + Mockito.spy(new InternalStageManager<>(true, client)); FlushService flushService = - new FlushService<>(client, channelCache, stage, false); + new FlushService<>(client, channelCache, storageManager, false); flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test"); Assert.assertFalse(channel1.isValid()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java similarity index 91% rename from src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java rename to src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java index d12c6231c..b018025b4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java @@ -1,6 +1,11 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_PASSWORD; import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_USER; import static net.snowflake.ingest.utils.HttpUtil.NON_PROXY_HOSTS; @@ -42,7 +47,6 @@ import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.connection.RequestBuilder; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; import org.junit.Assert; @@ -56,7 +60,7 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({TestUtils.class, HttpUtil.class, SnowflakeFileTransferAgent.class}) -public class StreamingIngestStageTest { +public class StreamingIngestStorageTest { private final String prefix = "EXAMPLE_PREFIX"; @@ -114,14 +118,12 @@ public void testPutRemote() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStage stage = - new StreamingIngestStage( + StreamingIngestStorage stage = + new StreamingIngestStorage( true, - "role", - null, null, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), 1); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); @@ -156,15 +158,13 @@ public void testPutLocal() throws Exception { String fullFilePath = "testOutput"; String fileName = "putLocalOutput"; - StreamingIngestStage stage = + StreamingIngestStorage stage = Mockito.spy( - new StreamingIngestStage( + new StreamingIngestStorage( true, - "role", - null, null, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( fullFilePath, Optional.of(System.currentTimeMillis())), 1)); Mockito.doReturn(true).when(stage).isLocalFS(); @@ -186,14 +186,12 @@ public void doTestPutRemoteRefreshes() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStage stage = - new StreamingIngestStage( + StreamingIngestStorage stage = + new StreamingIngestStorage( true, - "role", - null, null, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), maxUploadRetryCount); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); @@ -240,15 +238,13 @@ public void testPutRemoteGCS() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStage stage = + StreamingIngestStorage stage = Mockito.spy( - new StreamingIngestStage( + new StreamingIngestStorage( true, - "role", - null, null, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), 1)); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); @@ -265,6 +261,11 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + ConfigureCallHandler configureCallHandler = + ConfigureCallHandler.builder( + mockClient, mockBuilder, STREAMING_CLIENT_CONFIGURE, "endpoint") + .setRole("role") + .build(); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -277,10 +278,10 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); ParameterProvider parameterProvider = new ParameterProvider(false); - StreamingIngestStage stage = - new StreamingIngestStage(true, "role", mockClient, mockBuilder, "clientName", 1); + StreamingIngestStorage stage = + new StreamingIngestStorage(true, configureCallHandler, "clientName", 1); - StreamingIngestStage.SnowflakeFileTransferMetadataWithAge metadataWithAge = + StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge metadataWithAge = stage.refreshSnowflakeMetadata(true); final ArgumentCaptor endpointCaptor = ArgumentCaptor.forClass(String.class); @@ -288,7 +289,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Mockito.verify(mockBuilder) .generateStreamingIngestPostRequest( stringCaptor.capture(), endpointCaptor.capture(), Mockito.any()); - Assert.assertEquals(Constants.CLIENT_CONFIGURE_ENDPOINT, endpointCaptor.getValue()); + Assert.assertEquals("endpoint", endpointCaptor.getValue()); Assert.assertTrue(metadataWithAge.timestamp.isPresent()); Assert.assertEquals( StageInfo.StageType.S3, metadataWithAge.fileTransferMetadata.getStageInfo().getStageType()); @@ -307,6 +308,11 @@ public void testFetchSignedURL() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + ConfigureCallHandler configureCallHandler = + ConfigureCallHandler.builder( + mockClient, mockBuilder, STREAMING_CLIENT_CONFIGURE, "endpoint") + .setRole("role") + .build(); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -318,8 +324,8 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(entity); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - StreamingIngestStage stage = - new StreamingIngestStage(true, "role", mockClient, mockBuilder, "clientName", 1); + StreamingIngestStorage stage = + new StreamingIngestStorage(true, configureCallHandler, "clientName", 1); SnowflakeFileTransferMetadataV1 metadata = stage.fetchSignedURL("path/fileName"); @@ -328,7 +334,7 @@ public void testFetchSignedURL() throws Exception { Mockito.verify(mockBuilder) .generateStreamingIngestPostRequest( stringCaptor.capture(), endpointCaptor.capture(), Mockito.any()); - Assert.assertEquals(Constants.CLIENT_CONFIGURE_ENDPOINT, endpointCaptor.getValue()); + Assert.assertEquals("endpoint", endpointCaptor.getValue()); Assert.assertEquals(StageInfo.StageType.S3, metadata.getStageInfo().getStageType()); Assert.assertEquals("foo/streaming_ingest/", metadata.getStageInfo().getLocation()); Assert.assertEquals("path/fileName", metadata.getPresignedUrlFileName()); @@ -345,6 +351,11 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + ConfigureCallHandler configureCallHandler = + ConfigureCallHandler.builder( + mockClient, mockBuilder, STREAMING_CLIENT_CONFIGURE, "endpoint") + .setRole("role") + .build(); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -356,14 +367,12 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(entity); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - StreamingIngestStage stage = - new StreamingIngestStage( + StreamingIngestStorage stage = + new StreamingIngestStorage( true, - "role", - mockClient, - mockBuilder, + configureCallHandler, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(0L)), 1); @@ -493,14 +502,12 @@ public void testRefreshMetadataOnFirstPutException() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStage stage = - new StreamingIngestStage( + StreamingIngestStorage stage = + new StreamingIngestStorage( true, - "role", - null, null, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), maxUploadRetryCount); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class);