diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java index cef6e631c..034b4a6f0 100644 --- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java +++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2017 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.connection; @@ -42,7 +42,8 @@ public enum ApiName { STREAMING_DROP_CHANNEL("POST"), STREAMING_CHANNEL_STATUS("POST"), STREAMING_REGISTER_BLOB("POST"), - STREAMING_CLIENT_CONFIGURE("POST"); + STREAMING_CLIENT_CONFIGURE("POST"), + STREAMING_CHANNEL_CONFIGURE("POST"); private final String httpMethod; private ApiName(String httpMethod) { diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index cc8782dbd..9a7272a16 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming; @@ -41,7 +41,6 @@ public enum OnErrorOption { private final ZoneId defaultTimezone; private final String offsetToken; - private final boolean isOffsetTokenProvided; private final OffsetTokenVerificationFunction offsetTokenVerificationFunction; @@ -59,7 +58,6 @@ public static class OpenChannelRequestBuilder { private ZoneId defaultTimezone; private String offsetToken; - private boolean isOffsetTokenProvided = false; private OffsetTokenVerificationFunction offsetTokenVerificationFunction; @@ -95,7 +93,6 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) { public OpenChannelRequestBuilder setOffsetToken(String offsetToken) { this.offsetToken = offsetToken; - this.isOffsetTokenProvided = true; return this; } @@ -125,7 +122,6 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) { this.onErrorOption = builder.onErrorOption; this.defaultTimezone = builder.defaultTimezone; this.offsetToken = builder.offsetToken; - this.isOffsetTokenProvided = builder.isOffsetTokenProvided; this.offsetTokenVerificationFunction = builder.offsetTokenVerificationFunction; } @@ -150,7 +146,7 @@ public ZoneId getDefaultTimezone() { } public String getFullyQualifiedTableName() { - return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName); + return Utils.getFullyQualifiedTableName(this.dbName, this.schemaName, this.tableName); } public OnErrorOption getOnErrorOption() { @@ -161,10 +157,6 @@ public String getOffsetToken() { return this.offsetToken; } - public boolean isOffsetTokenProvided() { - return this.isOffsetTokenProvided; - } - public OffsetTokenVerificationFunction getOffsetTokenVerificationFunction() { return this.offsetTokenVerificationFunction; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 989be0fa1..539da6287 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java new file mode 100644 index 000000000..31bca95ac --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** Class used to serialize the channel configure request. */ +class ChannelConfigureRequest extends ConfigureRequest { + @JsonProperty("database") + private String database; + + @JsonProperty("schema") + private String schema; + + @JsonProperty("table") + private String table; + + /** + * Constructor for channel configure request + * + * @param role Role to be used for the request. + * @param database Database name. + * @param schema Schema name. + * @param table Table name. + */ + ChannelConfigureRequest(String role, String database, String schema, String table) { + setRole(role); + this.database = database; + this.schema = schema; + this.table = table; + } + + String getDatabase() { + return database; + } + + String getSchema() { + return schema; + } + + String getTable() { + return table; + } + + @Override + public String getStringForLogging() { + return String.format( + "ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)", + getRole(), database, schema, table, getFileName()); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java new file mode 100644 index 000000000..da65960b4 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** Class used to deserialize responses from channel configure endpoint */ +@JsonIgnoreProperties(ignoreUnknown = true) +class ChannelConfigureResponse extends StreamingIngestResponse { + @JsonProperty("status_code") + private Long statusCode; + + @JsonProperty("message") + private String message; + + @JsonProperty("stage_location") + private FileLocationInfo stageLocation; + + @Override + Long getStatusCode() { + return statusCode; + } + + void setStatusCode(Long statusCode) { + this.statusCode = statusCode; + } + + String getMessage() { + return message; + } + + void setMessage(String message) { + this.message = message; + } + + FileLocationInfo getStageLocation() { + return stageLocation; + } + + void setStageLocation(FileLocationInfo stageLocation) { + this.stageLocation = stageLocation; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java index 3e5265719..fe9542267 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java @@ -1,9 +1,11 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; +import net.snowflake.ingest.utils.Utils; + /** * Channel immutable identification and encryption attributes. * @@ -36,12 +38,12 @@ class ChannelFlushContext { String encryptionKey, Long encryptionKeyId) { this.name = name; - this.fullyQualifiedName = String.format("%s.%s.%s.%s", dbName, schemaName, tableName, name); + this.fullyQualifiedName = + Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name); this.dbName = dbName; this.schemaName = schemaName; this.tableName = tableName; - this.fullyQualifiedTableName = - String.format("%s.%s.%s", this.getDbName(), this.getSchemaName(), this.getTableName()); + this.fullyQualifiedTableName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName); this.channelSequencer = channelSequencer; this.encryptionKey = encryptionKey; this.encryptionKeyId = encryptionKeyId; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java index b98782ab9..ce8a76ef1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java @@ -1,14 +1,16 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import java.util.stream.Collectors; +import net.snowflake.ingest.utils.Utils; /** Class to deserialize a request from a channel status request */ -class ChannelsStatusRequest { +class ChannelsStatusRequest implements StreamingIngestRequest { // Used to deserialize a channel request static class ChannelStatusRequestDTO { @@ -99,4 +101,21 @@ void setChannels(List channels) { List getChannels() { return channels; } + + @Override + public String getStringForLogging() { + return String.format( + "ChannelsStatusRequest(requestId=%s, role=%s, channels=[%s])", + requestId, + role, + channels.stream() + .map( + r -> + Utils.getFullyQualifiedChannelName( + r.getDatabaseName(), + r.getSchemaName(), + r.getTableName(), + r.getChannelName())) + .collect(Collectors.joining(", "))); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java new file mode 100644 index 000000000..ca6d72ab3 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** Class used to serialize client configure request */ +class ClientConfigureRequest extends ConfigureRequest { + /** + * Constructor for client configure request + * + * @param role Role to be used for the request. + */ + ClientConfigureRequest(String role) { + setRole(role); + } + + @Override + public String getStringForLogging() { + return String.format("ClientConfigureRequest(role=%s, file_name=%s)", getRole(), getFileName()); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java similarity index 79% rename from src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java rename to src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java index 6fe73fbd5..03a1d3576 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java @@ -4,10 +4,12 @@ package net.snowflake.ingest.streaming.internal; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; /** Class used to deserialize responses from configure endpoint */ -class ConfigureResponse extends StreamingIngestResponse { +@JsonIgnoreProperties(ignoreUnknown = true) +class ClientConfigureResponse extends StreamingIngestResponse { @JsonProperty("prefix") private String prefix; @@ -63,4 +65,11 @@ Long getDeploymentId() { void setDeploymentId(Long deploymentId) { this.deploymentId = deploymentId; } + + String getClientPrefix() { + if (this.deploymentId == null) { + return this.prefix; + } + return this.prefix + "_" + this.deploymentId; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java new file mode 100644 index 000000000..6ca10f52a --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** Abstract class for {@link ChannelConfigureRequest} and {@link ClientConfigureRequest} */ +abstract class ConfigureRequest implements StreamingIngestRequest { + @JsonProperty("role") + private String role; + + // File name for the GCS signed url request + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("file_name") + private String fileName; + + String getRole() { + return role; + } + + void setRole(String role) { + this.role = role; + } + + String getFileName() { + return fileName; + } + + void setFileName(String fileName) { + this.fileName = fileName; + } + + @Override + public abstract String getStringForLogging(); +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java new file mode 100644 index 000000000..25599999e --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import net.snowflake.ingest.streaming.DropChannelRequest; +import net.snowflake.ingest.utils.Utils; + +/** Class used to serialize the {@link DropChannelRequest} */ +class DropChannelRequestInternal implements StreamingIngestRequest { + @JsonProperty("request_id") + private String requestId; + + @JsonProperty("role") + private String role; + + @JsonProperty("channel") + private String channel; + + @JsonProperty("table") + private String table; + + @JsonProperty("database") + private String database; + + @JsonProperty("schema") + private String schema; + + @JsonProperty("is_iceberg") + private boolean isIceberg; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("client_sequencer") + Long clientSequencer; + + DropChannelRequestInternal( + String requestId, + String role, + String database, + String schema, + String table, + String channel, + Long clientSequencer, + boolean isIceberg) { + this.requestId = requestId; + this.role = role; + this.database = database; + this.schema = schema; + this.table = table; + this.channel = channel; + this.clientSequencer = clientSequencer; + this.isIceberg = isIceberg; + } + + String getRequestId() { + return requestId; + } + + String getRole() { + return role; + } + + String getChannel() { + return channel; + } + + String getTable() { + return table; + } + + String getDatabase() { + return database; + } + + String getSchema() { + return schema; + } + + boolean getIsIceberg() { + return isIceberg; + } + + Long getClientSequencer() { + return clientSequencer; + } + + String getFullyQualifiedTableName() { + return Utils.getFullyQualifiedTableName(database, schema, table); + } + + @Override + public String getStringForLogging() { + return String.format( + "DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s," + + " isIceberg=%s, clientSequencer=%s)", + requestId, role, database, schema, table, channel, isIceberg, clientSequencer); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java new file mode 100644 index 000000000..2b8796bb2 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.io.IOException; +import java.util.Calendar; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; + +class ExternalVolumeLocation { + public final String dbName; + public final String schemaName; + public final String tableName; + + public ExternalVolumeLocation(String dbName, String schemaName, String tableName) { + this.dbName = dbName; + this.schemaName = schemaName; + this.tableName = tableName; + } +} + +/** Class to manage multiple external volumes */ +class ExternalVolumeManager implements StorageManager { + // Reference to the external volume per table + private final Map> externalVolumeMap; + + // name of the owning client + private final String clientName; + + // role of the owning client + private final String role; + + // Reference to the Snowflake service client used for configure calls + private final SnowflakeServiceClient snowflakeServiceClient; + + // Client prefix generated by the Snowflake server + private final String clientPrefix; + + /** + * Constructor for ExternalVolumeManager + * + * @param isTestMode whether the manager in test mode + * @param role the role of the client + * @param clientName the name of the client + * @param snowflakeServiceClient the Snowflake service client used for configure calls + */ + ExternalVolumeManager( + boolean isTestMode, + String role, + String clientName, + SnowflakeServiceClient snowflakeServiceClient) { + this.role = role; + this.clientName = clientName; + this.snowflakeServiceClient = snowflakeServiceClient; + this.externalVolumeMap = new ConcurrentHashMap<>(); + try { + this.clientPrefix = + isTestMode + ? "testPrefix" + : this.snowflakeServiceClient + .clientConfigure(new ClientConfigureRequest(role)) + .getClientPrefix(); + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); + } + } + + /** + * Given a fully qualified table name, return the target storage by looking up the table name + * + * @param fullyQualifiedTableName the target fully qualified table name + * @return target storage + */ + @Override + public StreamingIngestStorage getStorage( + String fullyQualifiedTableName) { + // Only one chunk per blob in Iceberg mode. + StreamingIngestStorage stage = + this.externalVolumeMap.get(fullyQualifiedTableName); + + if (stage == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("No external volume found for table %s", fullyQualifiedTableName)); + } + + return stage; + } + + /** + * Add a storage to the manager by looking up the table name from the open channel response + * + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + * @param fileLocationInfo response from open channel + */ + @Override + public void addStorage( + String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) { + String fullyQualifiedTableName = + Utils.getFullyQualifiedTableName(dbName, schemaName, tableName); + + try { + this.externalVolumeMap.put( + fullyQualifiedTableName, + new StreamingIngestStorage( + this, + this.clientName, + fileLocationInfo, + new ExternalVolumeLocation(dbName, schemaName, tableName), + DEFAULT_MAX_UPLOAD_RETRIES)); + } catch (SnowflakeSQLException | IOException err) { + throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STORAGE); + } + } + + /** + * Gets the latest file location info (with a renewed short-lived access token) for the specified + * location + * + * @param location A reference to the target location + * @param fileName optional filename for single-file signed URL fetch from server + * @return the new location information + */ + @Override + public FileLocationInfo getRefreshedLocation( + ExternalVolumeLocation location, Optional fileName) { + try { + ChannelConfigureRequest request = + new ChannelConfigureRequest( + this.role, location.dbName, location.schemaName, location.tableName); + fileName.ifPresent(request::setFileName); + ChannelConfigureResponse response = this.snowflakeServiceClient.channelConfigure(request); + return response.getStageLocation(); + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); + } + } + + // TODO: SNOW-1502887 Blob path generation for iceberg table + @Override + public String generateBlobPath() { + return "snow_dummy_file_name"; + } + + // TODO: SNOW-1502887 Blob path generation for iceberg table + @Override + public void decrementBlobSequencer() {} + + // TODO: SNOW-1502887 Blob path generation for iceberg table + public String getBlobPath(Calendar calendar, String clientPrefix) { + return ""; + } + + /** + * Get the client prefix from first external volume in the map + * + * @return the client prefix + */ + @Override + public String getClientPrefix() { + return this.clientPrefix; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 05d72433d..454a106bb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -4,7 +4,6 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; import static net.snowflake.ingest.utils.Constants.DISABLE_BACKGROUND_FLUSH; import static net.snowflake.ingest.utils.Constants.MAX_BLOB_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.MAX_THREAD_COUNT; @@ -19,13 +18,11 @@ import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Calendar; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.TimeZone; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -34,11 +31,9 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; -import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; @@ -84,9 +79,6 @@ List>> getData() { private static final Logging logger = new Logging(FlushService.class); - // Increasing counter to generate a unique blob name per client - private final AtomicLong counter; - // The client that owns this flush service private final SnowflakeStreamingIngestClientInternal owningClient; @@ -102,8 +94,8 @@ List>> getData() { // Reference to the channel cache private final ChannelCache channelCache; - // Reference to the Streaming Ingest stage - private final StreamingIngestStage targetStage; + // Reference to the Streaming Ingest storage manager + private final StorageManager storageManager; // Reference to register service private final RegisterService registerService; @@ -124,56 +116,22 @@ List>> getData() { private final Constants.BdecVersion bdecVersion; /** - * Constructor for TESTING that takes (usually mocked) StreamingIngestStage + * Default constructor * - * @param client - * @param cache - * @param isTestMode + * @param client the owning client + * @param cache the channel cache + * @param storageManager the storage manager + * @param isTestMode whether the service is running in test mode */ FlushService( SnowflakeStreamingIngestClientInternal client, ChannelCache cache, - StreamingIngestStage targetStage, // For testing + StorageManager storageManager, boolean isTestMode) { this.owningClient = client; this.channelCache = cache; - this.targetStage = targetStage; - this.counter = new AtomicLong(0); - this.registerService = new RegisterService<>(client, isTestMode); - this.isNeedFlush = false; - this.lastFlushTime = System.currentTimeMillis(); - this.isTestMode = isTestMode; - this.latencyTimerContextMap = new ConcurrentHashMap<>(); - this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); - createWorkers(); - } - - /** - * Default constructor - * - * @param client - * @param cache - * @param isTestMode - */ - FlushService( - SnowflakeStreamingIngestClientInternal client, ChannelCache cache, boolean isTestMode) { - this.owningClient = client; - this.channelCache = cache; - try { - this.targetStage = - new StreamingIngestStage( - isTestMode, - client.getRole(), - client.getHttpClient(), - client.getRequestBuilder(), - client.getName(), - DEFAULT_MAX_UPLOAD_RETRIES); - } catch (SnowflakeSQLException | IOException err) { - throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE); - } - + this.storageManager = storageManager; this.registerService = new RegisterService<>(client, isTestMode); - this.counter = new AtomicLong(0); this.isNeedFlush = false; this.lastFlushTime = System.currentTimeMillis(); this.isTestMode = isTestMode; @@ -363,7 +321,7 @@ void distributeFlushTasks() { while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) { List>> blobData = new ArrayList<>(); float totalBufferSizeInBytes = 0F; - final String blobPath = getBlobPath(this.targetStage.getClientPrefix()); + final String blobPath = this.storageManager.generateBlobPath(); // Distribute work at table level, split the blob if reaching the blob size limit or the // channel has different encryption key ids @@ -445,9 +403,9 @@ && shouldStopProcessing( // Kick off a build job if (blobData.isEmpty()) { - // we decrement the counter so that we do not have gaps in the blob names created by this - // client. See method getBlobPath() below. - this.counter.decrementAndGet(); + // we decrement the blob sequencer so that we do not have gaps in the blob names created by + // this client. + this.storageManager.decrementBlobSequencer(); } else { long flushStartMs = System.currentTimeMillis(); if (this.owningClient.flushLatency != null) { @@ -459,7 +417,13 @@ && shouldStopProcessing( CompletableFuture.supplyAsync( () -> { try { - BlobMetadata blobMetadata = buildAndUpload(blobPath, blobData); + // Get the fully qualified table name from the first channel in the blob. + // This only matters when the client is in Iceberg mode. In Iceberg mode, + // all channels in the blob belong to the same table. + String fullyQualifiedTableName = + blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName(); + BlobMetadata blobMetadata = + buildAndUpload(blobPath, blobData, fullyQualifiedTableName); blobMetadata.getBlobStats().setFlushStartMs(flushStartMs); return blobMetadata; } catch (Throwable e) { @@ -542,9 +506,12 @@ private boolean shouldStopProcessing( * @param blobPath Path of the destination blob in cloud storage * @param blobData All the data for one blob. Assumes that all ChannelData in the inner List * belongs to the same table. Will error if this is not the case + * @param fullyQualifiedTableName the table name of the first channel in the blob, only matters in + * Iceberg mode * @return BlobMetadata for FlushService.upload */ - BlobMetadata buildAndUpload(String blobPath, List>> blobData) + BlobMetadata buildAndUpload( + String blobPath, List>> blobData, String fullyQualifiedTableName) throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException { @@ -560,12 +527,18 @@ BlobMetadata buildAndUpload(String blobPath, List>> blobData blob.blobStats.setBuildDurationMs(buildContext); - return upload(blobPath, blob.blobBytes, blob.chunksMetadataList, blob.blobStats); + return upload( + this.storageManager.getStorage(fullyQualifiedTableName), + blobPath, + blob.blobBytes, + blob.chunksMetadataList, + blob.blobStats); } /** * Upload a blob to Streaming Ingest dedicated stage * + * @param storage the storage to upload the blob * @param blobPath full path of the blob * @param blob blob data * @param metadata a list of chunk metadata @@ -573,13 +546,17 @@ BlobMetadata buildAndUpload(String blobPath, List>> blobData * @return BlobMetadata object used to create the register blob request */ BlobMetadata upload( - String blobPath, byte[] blob, List metadata, BlobStats blobStats) + StreamingIngestStorage storage, + String blobPath, + byte[] blob, + List metadata, + BlobStats blobStats) throws NoSuchAlgorithmException { logger.logInfo("Start uploading blob={}, size={}", blobPath, blob.length); long startTime = System.currentTimeMillis(); Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency); - this.targetStage.put(blobPath, blob); + storage.put(blobPath, blob); if (uploadContext != null) { blobStats.setUploadDurationMs(uploadContext); @@ -636,45 +613,6 @@ void setNeedFlush() { this.isNeedFlush = true; } - /** - * Generate a blob path, which is: "YEAR/MONTH/DAY_OF_MONTH/HOUR_OF_DAY/MINUTE/.BDEC" - * - * @return the generated blob file path - */ - private String getBlobPath(String clientPrefix) { - Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - return getBlobPath(calendar, clientPrefix); - } - - /** For TESTING */ - String getBlobPath(Calendar calendar, String clientPrefix) { - if (isTestMode && clientPrefix == null) { - clientPrefix = "testPrefix"; - } - - Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix); - int year = calendar.get(Calendar.YEAR); - int month = calendar.get(Calendar.MONTH) + 1; // Gregorian calendar starts from 0 - int day = calendar.get(Calendar.DAY_OF_MONTH); - int hour = calendar.get(Calendar.HOUR_OF_DAY); - int minute = calendar.get(Calendar.MINUTE); - long time = TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis()); - long threadId = Thread.currentThread().getId(); - // Create the blob short name, the clientPrefix contains the deployment id - String blobShortName = - Long.toString(time, 36) - + "_" - + clientPrefix - + "_" - + threadId - + "_" - + this.counter.getAndIncrement() - + "." - + BLOB_EXTENSION_TYPE; - return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + blobShortName; - } - /** * Invalidate all the channels in the blob data * @@ -698,11 +636,6 @@ void invalidateAllChannelsInBlob( })); } - /** Get the server generated unique prefix for this client */ - String getClientPrefix() { - return this.targetStage.getClientPrefix(); - } - /** * Throttle if the number of queued buildAndUpload tasks is bigger than the total number of * available processors diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java new file mode 100644 index 000000000..ea207b141 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Calendar; +import java.util.Optional; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; + +class InternalStageLocation { + public InternalStageLocation() {} +} + +/** Class to manage single Snowflake internal stage */ +class InternalStageManager implements StorageManager { + // Target stage for the client + private final StreamingIngestStorage targetStage; + + // Increasing counter to generate a unique blob name per client + private final AtomicLong counter; + + // Whether the manager in test mode + private final boolean isTestMode; + + // Snowflake service client used for configure calls + private final SnowflakeServiceClient snowflakeServiceClient; + + // The role of the client + private final String role; + + // Client prefix generated by the Snowflake server + private final String clientPrefix; + + /** + * Constructor for InternalStageManager + * + * @param isTestMode whether the manager in test mode + * @param role the role of the client + * @param clientName the name of the client + * @param snowflakeServiceClient the Snowflake service client to use for configure calls + */ + InternalStageManager( + boolean isTestMode, + String role, + String clientName, + SnowflakeServiceClient snowflakeServiceClient) { + this.snowflakeServiceClient = snowflakeServiceClient; + this.isTestMode = isTestMode; + this.role = role; + this.counter = new AtomicLong(0); + try { + if (!isTestMode) { + ClientConfigureResponse response = + this.snowflakeServiceClient.clientConfigure(new ClientConfigureRequest(role)); + this.clientPrefix = response.getClientPrefix(); + this.targetStage = + new StreamingIngestStorage( + this, + clientName, + response.getStageLocation(), + new InternalStageLocation(), + DEFAULT_MAX_UPLOAD_RETRIES); + } else { + this.clientPrefix = "testPrefix"; + this.targetStage = + new StreamingIngestStorage( + this, + "testClient", + (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, + new InternalStageLocation(), + DEFAULT_MAX_UPLOAD_RETRIES); + } + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); + } catch (SnowflakeSQLException e) { + throw new SFException(e, ErrorCode.UNABLE_TO_CONNECT_TO_STORAGE, e.getMessage()); + } + } + + /** + * Get the storage. In this case, the storage is always the target stage as there's only one stage + * in non-iceberg mode. + * + * @param fullyQualifiedTableName the target fully qualified table name + * @return the target storage + */ + @Override + @SuppressWarnings("unused") + public StreamingIngestStorage getStorage( + String fullyQualifiedTableName) { + // There's always only one stage for the client in non-iceberg mode + return targetStage; + } + + /** Add storage to the manager. Do nothing as there's only one stage in non-Iceberg mode. */ + @Override + public void addStorage( + String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {} + + /** + * Gets the latest file location info (with a renewed short-lived access token) for the specified + * location + * + * @param location A reference to the target location + * @param fileName optional filename for single-file signed URL fetch from server + * @return the new location information + */ + @Override + public FileLocationInfo getRefreshedLocation( + InternalStageLocation location, Optional fileName) { + try { + ClientConfigureRequest request = new ClientConfigureRequest(this.role); + fileName.ifPresent(request::setFileName); + ClientConfigureResponse response = snowflakeServiceClient.clientConfigure(request); + return response.getStageLocation(); + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); + } + } + + /** + * Generate a blob path, which is: "YEAR/MONTH/DAY_OF_MONTH/HOUR_OF_DAY/MINUTE/.BDEC" + * + * @return the generated blob file path + */ + @Override + public String generateBlobPath() { + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + return getBlobPath(calendar, this.clientPrefix); + } + + @Override + public void decrementBlobSequencer() { + this.counter.decrementAndGet(); + } + + /** For TESTING */ + @VisibleForTesting + public String getBlobPath(Calendar calendar, String clientPrefix) { + if (this.isTestMode && clientPrefix == null) { + clientPrefix = "testPrefix"; + } + + Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix); + int year = calendar.get(Calendar.YEAR); + int month = calendar.get(Calendar.MONTH) + 1; // Gregorian calendar starts from 0 + int day = calendar.get(Calendar.DAY_OF_MONTH); + int hour = calendar.get(Calendar.HOUR_OF_DAY); + int minute = calendar.get(Calendar.MINUTE); + long time = TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis()); + long threadId = Thread.currentThread().getId(); + // Create the blob short name, the clientPrefix contains the deployment id + String blobShortName = + Long.toString(time, 36) + + "_" + + clientPrefix + + "_" + + threadId + + "_" + + this.counter.getAndIncrement() + + "." + + BLOB_EXTENSION_TYPE; + return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + blobShortName; + } + + /** + * Get the unique client prefix generated by the Snowflake server + * + * @return the client prefix + */ + @Override + public String getClientPrefix() { + return this.clientPrefix; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java new file mode 100644 index 000000000..ffbb553f3 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.utils.Constants; + +/** Class used to serialize the {@link OpenChannelRequest} */ +class OpenChannelRequestInternal implements StreamingIngestRequest { + @JsonProperty("request_id") + private String requestId; + + @JsonProperty("role") + private String role; + + @JsonProperty("channel") + private String channel; + + @JsonProperty("table") + private String table; + + @JsonProperty("database") + private String database; + + @JsonProperty("schema") + private String schema; + + @JsonProperty("write_mode") + private String writeMode; + + @JsonProperty("is_iceberg") + private boolean isIceberg; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("offset_token") + private String offsetToken; + + OpenChannelRequestInternal( + String requestId, + String role, + String database, + String schema, + String table, + String channel, + Constants.WriteMode writeMode, + String offsetToken, + boolean isIceberg) { + this.requestId = requestId; + this.role = role; + this.database = database; + this.schema = schema; + this.table = table; + this.channel = channel; + this.writeMode = writeMode.name(); + this.isIceberg = isIceberg; + this.offsetToken = offsetToken; + } + + String getRequestId() { + return requestId; + } + + String getRole() { + return role; + } + + String getChannel() { + return channel; + } + + String getTable() { + return table; + } + + String getDatabase() { + return database; + } + + String getSchema() { + return schema; + } + + String getWriteMode() { + return writeMode; + } + + boolean getIsIceberg() { + return isIceberg; + } + + String getOffsetToken() { + return offsetToken; + } + + @Override + public String getStringForLogging() { + return String.format( + "OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s," + + " writeMode=%s, isIceberg=%s)", + requestId, role, database, schema, table, channel, writeMode, isIceberg); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java new file mode 100644 index 000000000..18fcb1fb2 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.stream.Collectors; + +/** Class used to serialize the blob register request */ +class RegisterBlobRequest implements StreamingIngestRequest { + @JsonProperty("request_id") + private String requestId; + + @JsonProperty("role") + private String role; + + @JsonProperty("blobs") + private List blobs; + + RegisterBlobRequest(String requestId, String role, List blobs) { + this.requestId = requestId; + this.role = role; + this.blobs = blobs; + } + + String getRequestId() { + return requestId; + } + + String getRole() { + return role; + } + + List getBlobs() { + return blobs; + } + + @Override + public String getStringForLogging() { + return String.format( + "RegisterBlobRequest(requestId=%s, role=%s, blobs=[%s])", + requestId, + role, + blobs.stream().map(BlobMetadata::getPath).collect(Collectors.joining(", "))); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java new file mode 100644 index 000000000..947c86dbb --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB; +import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; +import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; + +import java.io.IOException; +import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.connection.ServiceResponseHandler; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.Logging; +import net.snowflake.ingest.utils.SFException; + +/** + * The SnowflakeServiceClient class is responsible for making API requests to the Snowflake service. + */ +class SnowflakeServiceClient { + private static final Logging logger = new Logging(SnowflakeServiceClient.class); + + // HTTP client used for making requests + private final CloseableHttpClient httpClient; + + // Request builder for building streaming API request + private final RequestBuilder requestBuilder; + + /** + * Default constructor + * + * @param httpClient the HTTP client used for making requests + * @param requestBuilder the request builder for building streaming API requests + */ + SnowflakeServiceClient(CloseableHttpClient httpClient, RequestBuilder requestBuilder) { + this.httpClient = httpClient; + this.requestBuilder = requestBuilder; + } + + /** + * Configures the client given a {@link ClientConfigureRequest}. + * + * @param request the client configuration request + * @return the response from the configuration request + */ + ClientConfigureResponse clientConfigure(ClientConfigureRequest request) + throws IngestResponseException, IOException { + ClientConfigureResponse response = + executeApiRequestWithRetries( + ClientConfigureResponse.class, + request, + CLIENT_CONFIGURE_ENDPOINT, + "client configure", + STREAMING_CLIENT_CONFIGURE); + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Client configure request failed, request={}, message={}", + request.getStringForLogging(), + response.getMessage()); + throw new SFException(ErrorCode.CLIENT_CONFIGURE_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Configures a storage given a {@link ChannelConfigureRequest}. + * + * @param request the channel configuration request + * @return the response from the configuration request + */ + ChannelConfigureResponse channelConfigure(ChannelConfigureRequest request) + throws IngestResponseException, IOException { + ChannelConfigureResponse response = + executeApiRequestWithRetries( + ChannelConfigureResponse.class, + request, + CHANNEL_CONFIGURE_ENDPOINT, + "channel configure", + STREAMING_CHANNEL_CONFIGURE); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Channel configure request failed, request={}, response={}", + request.getStringForLogging(), + response.getMessage()); + throw new SFException(ErrorCode.CHANNEL_CONFIGURE_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Opens a channel given a {@link OpenChannelRequestInternal}. + * + * @param request the open channel request + * @return the response from the open channel request + */ + OpenChannelResponse openChannel(OpenChannelRequestInternal request) + throws IngestResponseException, IOException { + OpenChannelResponse response = + executeApiRequestWithRetries( + OpenChannelResponse.class, + request, + OPEN_CHANNEL_ENDPOINT, + "open channel", + STREAMING_OPEN_CHANNEL); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Open channel request failed, request={}, response={}", + request.getStringForLogging(), + response.getMessage()); + throw new SFException(ErrorCode.OPEN_CHANNEL_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Drops a channel given a {@link DropChannelRequestInternal}. + * + * @param request the drop channel request + * @return the response from the drop channel request + */ + DropChannelResponse dropChannel(DropChannelRequestInternal request) + throws IngestResponseException, IOException { + DropChannelResponse response = + executeApiRequestWithRetries( + DropChannelResponse.class, + request, + DROP_CHANNEL_ENDPOINT, + "drop channel", + STREAMING_DROP_CHANNEL); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Drop channel request failed, request={}, response={}", + request.getStringForLogging(), + response.getMessage()); + throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Gets the status of a channel given a {@link ChannelsStatusRequest}. + * + * @param request the channel status request + * @return the response from the channel status request + */ + ChannelsStatusResponse channelStatus(ChannelsStatusRequest request) + throws IngestResponseException, IOException { + ChannelsStatusResponse response = + executeApiRequestWithRetries( + ChannelsStatusResponse.class, + request, + CHANNEL_STATUS_ENDPOINT, + "channel status", + STREAMING_CHANNEL_STATUS); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Channel status request failed, request={}, response={}", + request.getStringForLogging(), + response.getMessage()); + throw new SFException(ErrorCode.CHANNEL_STATUS_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Registers a blob given a {@link RegisterBlobRequest}. + * + * @param request the register blob request + * @param executionCount the number of times the request has been executed, used for logging + * @return the response from the register blob request + */ + RegisterBlobResponse registerBlob(RegisterBlobRequest request, final int executionCount) + throws IngestResponseException, IOException { + RegisterBlobResponse response = + executeApiRequestWithRetries( + RegisterBlobResponse.class, + request, + REGISTER_BLOB_ENDPOINT, + "register blob", + STREAMING_REGISTER_BLOB); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Register blob request failed, request={}, response={}, executionCount={}", + request.getStringForLogging(), + response.getMessage(), + executionCount); + throw new SFException(ErrorCode.REGISTER_BLOB_FAILURE, response.getMessage()); + } + return response; + } + + private T executeApiRequestWithRetries( + Class responseClass, + StreamingIngestRequest request, + String endpoint, + String operation, + ServiceResponseHandler.ApiName apiName) + throws IngestResponseException, IOException { + return executeWithRetries( + responseClass, endpoint, request, operation, apiName, this.httpClient, this.requestBuilder); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 1de473da6..8583a94f6 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -4,20 +4,11 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB; -import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.sleepForRetry; -import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.COMMIT_MAX_RETRY_COUNT; import static net.snowflake.ingest.utils.Constants.COMMIT_RETRY_INTERVAL_IN_MS; -import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.ENABLE_TELEMETRY_TO_SF; import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY; -import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_ENQUEUE_TABLE_CHUNK_QUEUE_FULL; import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST; import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; @@ -121,6 +112,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Reference to the flush service private final FlushService flushService; + // Reference to storage manager + private final StorageManager storageManager; + // Indicates whether the client has closed private volatile boolean isClosed; @@ -151,6 +145,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Background thread that uploads telemetry data periodically private ScheduledExecutorService telemetryWorker; + // Snowflake service client to make API calls + private SnowflakeServiceClient snowflakeServiceClient; + /** * Constructor * @@ -238,8 +235,18 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.setupMetricsForClient(); } + this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder); + + this.storageManager = + isIcebergMode + ? new ExternalVolumeManager( + isTestMode, this.role, this.name, this.snowflakeServiceClient) + : new InternalStageManager( + isTestMode, this.role, this.name, this.snowflakeServiceClient); + try { - this.flushService = new FlushService<>(this, this.channelCache, this.isTestMode); + this.flushService = + new FlushService<>(this, this.channelCache, this.storageManager, this.isTestMode); } catch (Exception e) { // Need to clean up the resources before throwing any exceptions cleanUpResources(); @@ -286,6 +293,7 @@ public SnowflakeStreamingIngestClientInternal( @VisibleForTesting public void injectRequestBuilder(RequestBuilder requestBuilder) { this.requestBuilder = requestBuilder; + this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder); } /** @@ -332,40 +340,18 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest getName()); try { - Map payload = new HashMap<>(); - payload.put( - "request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); - payload.put("channel", request.getChannelName()); - payload.put("table", request.getTableName()); - payload.put("database", request.getDBName()); - payload.put("schema", request.getSchemaName()); - payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name()); - payload.put("role", this.role); - payload.put("is_iceberg", isIcebergMode); - if (request.isOffsetTokenProvided()) { - payload.put("offset_token", request.getOffsetToken()); - } - - OpenChannelResponse response = - executeWithRetries( - OpenChannelResponse.class, - OPEN_CHANNEL_ENDPOINT, - payload, - "open channel", - STREAMING_OPEN_CHANNEL, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - logger.logDebug( - "Open channel request failed, channel={}, table={}, client={}, message={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - getName(), - response.getMessage()); - throw new SFException(ErrorCode.OPEN_CHANNEL_FAILURE, response.getMessage()); - } + OpenChannelRequestInternal openChannelRequest = + new OpenChannelRequestInternal( + this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), + this.role, + request.getDBName(), + request.getSchemaName(), + request.getTableName(), + request.getChannelName(), + Constants.WriteMode.CLOUD_STORAGE, + request.getOffsetToken(), + isIcebergMode); + OpenChannelResponse response = snowflakeServiceClient.openChannel(openChannelRequest); logger.logInfo( "Open channel request succeeded, channel={}, table={}, clientSequencer={}," @@ -399,8 +385,15 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest // Add channel to the channel cache this.channelCache.addChannel(channel); + // Add storage to the storage manager, only for external volume + this.storageManager.addStorage( + response.getDBName(), + response.getSchemaName(), + response.getTableName(), + response.getStageLocation()); + return channel; - } catch (IOException | IngestResponseException e) { + } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); } } @@ -418,52 +411,29 @@ public void dropChannel(DropChannelRequest request) { getName()); try { - Map payload = new HashMap<>(); - payload.put( - "request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); - payload.put("channel", request.getChannelName()); - payload.put("table", request.getTableName()); - payload.put("database", request.getDBName()); - payload.put("schema", request.getSchemaName()); - payload.put("role", this.role); - payload.put("is_iceberg", isIcebergMode); - Long clientSequencer = null; - if (request instanceof DropChannelVersionRequest) { - clientSequencer = ((DropChannelVersionRequest) request).getClientSequencer(); - if (clientSequencer != null) { - payload.put("client_sequencer", clientSequencer); - } - } - - DropChannelResponse response = - executeWithRetries( - DropChannelResponse.class, - DROP_CHANNEL_ENDPOINT, - payload, - "drop channel", - STREAMING_DROP_CHANNEL, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - logger.logDebug( - "Drop channel request failed, channel={}, table={}, client={}, message={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - getName(), - response.getMessage()); - throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage()); - } + DropChannelRequestInternal dropChannelRequest = + new DropChannelRequestInternal( + this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), + this.role, + request.getDBName(), + request.getSchemaName(), + request.getTableName(), + request.getChannelName(), + request instanceof DropChannelVersionRequest + ? ((DropChannelVersionRequest) request).getClientSequencer() + : null, + isIcebergMode); + snowflakeServiceClient.dropChannel(dropChannelRequest); logger.logInfo( "Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}", request.getChannelName(), request.getFullyQualifiedTableName(), - clientSequencer, + request instanceof DropChannelVersionRequest + ? ((DropChannelVersionRequest) request).getClientSequencer() + : null, getName()); - - } catch (IOException | IngestResponseException e) { + } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.DROP_CHANNEL_FAILURE, e.getMessage()); } } @@ -507,24 +477,9 @@ ChannelsStatusResponse getChannelsStatus( .collect(Collectors.toList()); request.setChannels(requestDTOs); request.setRole(this.role); - request.setRequestId(this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); - - String payload = objectMapper.writeValueAsString(request); - - ChannelsStatusResponse response = - executeWithRetries( - ChannelsStatusResponse.class, - CHANNEL_STATUS_ENDPOINT, - payload, - "channel status", - STREAMING_CHANNEL_STATUS, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - throw new SFException(ErrorCode.CHANNEL_STATUS_FAILURE, response.getMessage()); - } + request.setRequestId(this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement()); + + ChannelsStatusResponse response = snowflakeServiceClient.channelStatus(request); for (int idx = 0; idx < channels.size(); idx++) { SnowflakeStreamingIngestChannelInternal channel = channels.get(idx); @@ -546,7 +501,7 @@ ChannelsStatusResponse getChannelsStatus( } return response; - } catch (IOException | IngestResponseException e) { + } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.CHANNEL_STATUS_FAILURE, e.getMessage()); } } @@ -619,35 +574,15 @@ void registerBlobs(List blobs, final int executionCount) { this.name, executionCount); - RegisterBlobResponse response = null; + RegisterBlobResponse response; try { - Map payload = new HashMap<>(); - payload.put( - "request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); - payload.put("blobs", blobs); - payload.put("role", this.role); - - response = - executeWithRetries( - RegisterBlobResponse.class, - REGISTER_BLOB_ENDPOINT, - payload, - "register blob", - STREAMING_REGISTER_BLOB, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - logger.logDebug( - "Register blob request failed for blob={}, client={}, message={}, executionCount={}", - blobs.stream().map(BlobMetadata::getPath).collect(Collectors.toList()), - this.name, - response.getMessage(), - executionCount); - throw new SFException(ErrorCode.REGISTER_BLOB_FAILURE, response.getMessage()); - } - } catch (IOException | IngestResponseException e) { + RegisterBlobRequest request = + new RegisterBlobRequest( + this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), + this.role, + blobs); + response = snowflakeServiceClient.registerBlob(request, executionCount); + } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage()); } @@ -839,7 +774,7 @@ void setNeedFlush() { this.flushService.setNeedFlush(); } - /** Remove the channel in the channel cache if the channel sequencer matches */ + /** Remove the channel in the channel cache if the channel sequencer matches. Update storage */ void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal channel) { this.channelCache.removeChannelIfSequencersMatch(channel); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java new file mode 100644 index 000000000..ea0b8843f --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.util.Optional; + +/** + * Interface to manage {@link StreamingIngestStorage} for {@link FlushService} + * + * @param The type of chunk data + * @param the type of location that's being managed (internal stage / external volume) + */ +interface StorageManager { + // Default max upload retries for streaming ingest storage + int DEFAULT_MAX_UPLOAD_RETRIES = 5; + + /** + * Given a fully qualified table name, return the target storage + * + * @param fullyQualifiedTableName the target fully qualified table name + * @return target stage + */ + StreamingIngestStorage getStorage(String fullyQualifiedTableName); + + /** + * Add a storage to the manager + * + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + * @param fileLocationInfo file location info from configure response + */ + void addStorage( + String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo); + + /** + * Gets the latest file location info (with a renewed short-lived access token) for the specified + * location + * + * @param location A reference to the target location + * @param fileName optional filename for single-file signed URL fetch from server + * @return the new location information + */ + FileLocationInfo getRefreshedLocation(TLocation location, Optional fileName); + + /** + * Generate a unique blob path and increment the blob sequencer + * + * @return the blob path + */ + String generateBlobPath(); + + /** + * Decrement the blob sequencer, this method is needed to prevent gap between file name sequencer. + * See {@link StorageManager#generateBlobPath()} for more details. + */ + void decrementBlobSequencer(); + + /** + * Get the unique client prefix generated by the Snowflake server + * + * @return the client prefix + */ + String getClientPrefix(); +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java new file mode 100644 index 000000000..3dd30b5e2 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** + * The StreamingIngestRequest interface is a marker interface used for type safety in the {@link + * SnowflakeServiceClient} for streaming ingest API request. + */ +interface StreamingIngestRequest { + String getStringForLogging(); +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestResponse.java index 1ec01fceb..6c4df8c6d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestResponse.java @@ -1,9 +1,16 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; +/** + * The StreamingIngestResponse class is an abstract class that represents a response from the + * Snowflake streaming ingest API. This class provides a common structure for all types of responses + * that can be received from the {@link SnowflakeServiceClient}. + */ abstract class StreamingIngestResponse { abstract Long getStatusCode(); + + abstract String getMessage(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java similarity index 75% rename from src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java rename to src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java index 9752b311c..eb9f10826 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java @@ -1,13 +1,9 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; -import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; -import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; import static net.snowflake.ingest.utils.HttpUtil.generateProxyPropertiesForJDBC; import static net.snowflake.ingest.utils.Utils.getStackTrace; @@ -21,8 +17,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -33,16 +27,13 @@ import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.client.jdbc.cloud.storage.StageInfo; import net.snowflake.client.jdbc.internal.apache.commons.io.FileUtils; -import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; -import net.snowflake.ingest.connection.IngestResponseException; -import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; -/** Handles uploading files to the Snowflake Streaming Ingest Stage */ -class StreamingIngestStage { +/** Handles uploading files to the Snowflake Streaming Ingest Storage */ +class StreamingIngestStorage { private static final ObjectMapper mapper = new ObjectMapper(); // Object mapper for parsing the client/configure response to Jackson version the same as @@ -54,7 +45,7 @@ class StreamingIngestStage { private static final long REFRESH_THRESHOLD_IN_MS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); - private static final Logging logger = new Logging(StreamingIngestStage.class); + private static final Logging logger = new Logging(StreamingIngestStorage.class); /** * Wrapper class containing SnowflakeFileTransferMetadata and the timestamp at which the metadata @@ -86,59 +77,61 @@ state to record unknown age. } private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; - private final CloseableHttpClient httpClient; - private final RequestBuilder requestBuilder; - private final String role; + private final StorageManager owningManager; + private final TLocation location; private final String clientName; - private String clientPrefix; private final int maxUploadRetries; // Proxy parameters that we set while calling the Snowflake JDBC to upload the streams private final Properties proxyProperties; - StreamingIngestStage( - boolean isTestMode, - String role, - CloseableHttpClient httpClient, - RequestBuilder requestBuilder, + /** + * Default constructor + * + * @param owningManager the storage manager owning this storage + * @param clientName The client name + * @param fileLocationInfo The file location information from open channel response + * @param location A reference to the target location + * @param maxUploadRetries The maximum number of retries to attempt + */ + StreamingIngestStorage( + StorageManager owningManager, String clientName, + FileLocationInfo fileLocationInfo, + TLocation location, int maxUploadRetries) throws SnowflakeSQLException, IOException { - this.httpClient = httpClient; - this.role = role; - this.requestBuilder = requestBuilder; - this.clientName = clientName; - this.proxyProperties = generateProxyPropertiesForJDBC(); - this.maxUploadRetries = maxUploadRetries; - if (!isTestMode) { - refreshSnowflakeMetadata(); - } + this( + owningManager, + clientName, + (SnowflakeFileTransferMetadataWithAge) null, + location, + maxUploadRetries); + createFileTransferMetadataWithAge(fileLocationInfo); } /** * Constructor for TESTING that takes SnowflakeFileTransferMetadataWithAge as input * - * @param isTestMode must be true - * @param role Snowflake role used by the Client - * @param httpClient http client reference - * @param requestBuilder request builder to build the HTTP request + * @param owningManager the storage manager owning this storage * @param clientName the client name * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with + * @param location A reference to the target location + * @param maxUploadRetries the maximum number of retries to attempt */ - StreamingIngestStage( - boolean isTestMode, - String role, - CloseableHttpClient httpClient, - RequestBuilder requestBuilder, + StreamingIngestStorage( + StorageManager owningManager, String clientName, SnowflakeFileTransferMetadataWithAge testMetadata, - int maxRetryCount) + TLocation location, + int maxUploadRetries) throws SnowflakeSQLException, IOException { - this(isTestMode, role, httpClient, requestBuilder, clientName, maxRetryCount); - if (!isTestMode) { - throw new SFException(ErrorCode.INTERNAL_ERROR); - } + this.owningManager = owningManager; + this.clientName = clientName; + this.maxUploadRetries = maxUploadRetries; + this.proxyProperties = generateProxyPropertiesForJDBC(); + this.location = location; this.fileTransferMetadataWithAge = testMetadata; } @@ -192,7 +185,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) .setUploadStream(inStream) .setRequireCompress(false) .setOcspMode(OCSPMode.FAIL_OPEN) - .setStreamingIngestClientKey(this.clientPrefix) + .setStreamingIngestClientKey(this.owningManager.getClientPrefix()) .setStreamingIngestClientName(this.clientName) .setProxyProperties(this.proxyProperties) .setDestFileName(fullFilePath) @@ -250,26 +243,26 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - Map payload = new HashMap<>(); - payload.put("role", this.role); - ConfigureResponse response = this.makeClientConfigureCall(payload); + FileLocationInfo location = + this.owningManager.getRefreshedLocation(this.location, Optional.empty()); + return createFileTransferMetadataWithAge(location); + } - // Do not change the prefix everytime we have to refresh credentials - if (Utils.isNullOrEmpty(this.clientPrefix)) { - this.clientPrefix = createClientPrefix(response); - } - Utils.assertStringNotNullOrEmpty("client prefix", this.clientPrefix); + private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( + FileLocationInfo fileLocationInfo) + throws JsonProcessingException, + net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException, + SnowflakeSQLException { + Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix()); - if (response - .getStageLocation() + if (fileLocationInfo .getLocationType() .replaceAll( "^[\"]|[\"]$", "") // Replace the first and last character if they're double quotes .equals(StageInfo.StageType.LOCAL_FS.name())) { this.fileTransferMetadataWithAge = new SnowflakeFileTransferMetadataWithAge( - response - .getStageLocation() + fileLocationInfo .getLocation() .replaceAll( "^[\"]|[\"]$", @@ -280,7 +273,7 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole new SnowflakeFileTransferMetadataWithAge( (SnowflakeFileTransferMetadataV1) SnowflakeFileTransferAgent.getFileTransferMetadatas( - parseClientConfigureResponse(response)) + parseFileLocationInfo(fileLocationInfo)) .get(0), Optional.of(System.currentTimeMillis())); } @@ -296,8 +289,8 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole * @param response the client/configure response from the server * @return the client prefix. */ - private String createClientPrefix(final ConfigureResponse response) { - final String prefix = response.getPrefix(); + private String createClientPrefix(final ClientConfigureResponse response) { + final String prefix = response.getPrefix() == null ? "" : response.getPrefix(); final String deploymentId = response.getDeploymentId() != null ? "_" + response.getDeploymentId() : ""; return prefix + deploymentId; @@ -312,15 +305,12 @@ private String createClientPrefix(final ConfigureResponse response) { SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { - Map payload = new HashMap<>(); - payload.put("role", this.role); - payload.put("file_name", fileName); - ConfigureResponse response = this.makeClientConfigureCall(payload); + FileLocationInfo location = + this.owningManager.getRefreshedLocation(this.location, Optional.of(fileName)); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) - SnowflakeFileTransferAgent.getFileTransferMetadatas( - parseClientConfigureResponse(response)) + SnowflakeFileTransferAgent.getFileTransferMetadatas(parseFileLocationInfo(location)) .get(0); // Transfer agent trims path for fileName metadata.setPresignedUrlFileName(fileName); @@ -328,51 +318,28 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) } private net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode - parseClientConfigureResponse(ConfigureResponse response) + parseFileLocationInfo(FileLocationInfo fileLocationInfo) throws JsonProcessingException, net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException { - JsonNode responseNode = mapper.valueToTree(response); + JsonNode fileLocationInfoNode = mapper.valueToTree(fileLocationInfo); // Currently there are a few mismatches between the client/configure response and what // SnowflakeFileTransferAgent expects - ObjectNode mutable = (ObjectNode) responseNode; - mutable.putObject("data"); - ObjectNode dataNode = (ObjectNode) mutable.get("data"); - dataNode.set("stageInfo", responseNode.get("stage_location")); + + ObjectNode node = mapper.createObjectNode(); + node.putObject("data"); + ObjectNode dataNode = (ObjectNode) node.get("data"); + dataNode.set("stageInfo", fileLocationInfoNode); // JDBC expects this field which maps to presignedFileUrlName. We will set this later dataNode.putArray("src_locations").add("placeholder"); // use String as intermediate object to avoid Jackson version mismatch // TODO: SNOW-1493470 Align Jackson version - String responseString = mapper.writeValueAsString(responseNode); + String responseString = mapper.writeValueAsString(node); return parseConfigureResponseMapper.readTree(responseString); } - private ConfigureResponse makeClientConfigureCall(Map payload) - throws IOException { - try { - - ConfigureResponse response = - executeWithRetries( - ConfigureResponse.class, - CLIENT_CONFIGURE_ENDPOINT, - mapper.writeValueAsString(payload), - "client configure", - STREAMING_CLIENT_CONFIGURE, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - throw new SFException(ErrorCode.CLIENT_CONFIGURE_FAILURE, response.getMessage()); - } - return response; - } catch (IngestResponseException e) { - throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); - } - } - /** * Upload file to internal stage * @@ -416,9 +383,4 @@ void putLocal(String fullFilePath, byte[] data) { throw new SFException(ex, ErrorCode.BLOB_UPLOAD_FAILURE); } } - - /** Get the server generated unique prefix for this client */ - String getClientPrefix() { - return this.clientPrefix; - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java index 56e960064..ebfffa234 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY; @@ -6,7 +10,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.util.Map; import java.util.function.Function; import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpUriRequest; @@ -77,7 +80,7 @@ public static void sleepForRetry(int executionCount) { static T executeWithRetries( Class targetClass, String endpoint, - Map payload, + StreamingIngestRequest payload, String message, ServiceResponseHandler.ApiName apiName, CloseableHttpClient httpClient, diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 3579e4d24..3d09d9a2a 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.utils; @@ -52,6 +52,7 @@ public class Constants { public static final String BLOB_EXTENSION_TYPE = "bdec"; public static final int MAX_THREAD_COUNT = Integer.MAX_VALUE; public static final String CLIENT_CONFIGURE_ENDPOINT = "/v1/streaming/client/configure/"; + public static final String CHANNEL_CONFIGURE_ENDPOINT = "/v1/streaming/channels/configure/"; public static final int COMMIT_MAX_RETRY_COUNT = 60; public static final int COMMIT_RETRY_INTERVAL_IN_MS = 1000; public static final String ENCRYPTION_ALGORITHM = "AES/CTR/NoPadding"; diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index b863717e9..4091e98bf 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.utils; @@ -26,7 +26,7 @@ public enum ErrorCode { INVALID_ENCRYPTED_KEY("0018"), INVALID_DATA_IN_CHUNK("0019"), IO_ERROR("0020"), - UNABLE_TO_CONNECT_TO_STAGE("0021"), + UNABLE_TO_CONNECT_TO_STORAGE("0021"), KEYPAIR_CREATION_FAILURE("0022"), MD5_HASHING_NOT_AVAILABLE("0023"), CHANNEL_STATUS_FAILURE("0024"), @@ -41,7 +41,8 @@ public enum ErrorCode { OAUTH_REFRESH_TOKEN_ERROR("0033"), INVALID_CONFIG_PARAMETER("0034"), CRYPTO_PROVIDER_ERROR("0035"), - DROP_CHANNEL_FAILURE("0036"); + DROP_CHANNEL_FAILURE("0036"), + CHANNEL_CONFIGURE_FAILURE("0037"); public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages"; diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index a06df4027..5220625da 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.utils; @@ -384,4 +384,31 @@ public static String getStackTrace(Throwable e) { } return stackTrace.toString(); } + + /** + * Get the fully qualified table name + * + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + * @return the fully qualified table name + */ + public static String getFullyQualifiedTableName( + String dbName, String schemaName, String tableName) { + return String.format("%s.%s.%s", dbName, schemaName, tableName); + } + + /** + * Get the fully qualified channel name + * + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + * @param channelName the channel name + * @return the fully qualified channel name + */ + public static String getFullyQualifiedChannelName( + String dbName, String schemaName, String tableName, String channelName) { + return String.format("%s.%s.%s.%s", dbName, schemaName, tableName, channelName); + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 5a64e05a0..f59f9e076 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -93,7 +93,8 @@ private abstract static class TestContext implements AutoCloseable { ChannelCache channelCache; final Map> channels = new HashMap<>(); FlushService flushService; - StreamingIngestStage stage; + StorageManager storageManager; + StreamingIngestStorage storage; ParameterProvider parameterProvider; InternalParameterProvider internalParameterProvider; RegisterService registerService; @@ -101,17 +102,23 @@ private abstract static class TestContext implements AutoCloseable { final List> channelData = new ArrayList<>(); TestContext() { - stage = Mockito.mock(StreamingIngestStage.class); - Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); + storage = Mockito.mock(StreamingIngestStorage.class); parameterProvider = new ParameterProvider(isIcebergMode); internalParameterProvider = new InternalParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); + storageManager = + Mockito.spy( + isIcebergMode + ? new ExternalVolumeManager<>(true, "role", "client", null) + : new InternalStageManager<>(true, "role", "client", null)); + Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any()); + Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); registerService = Mockito.spy(new RegisterService(client, client.isTestMode())); - flushService = Mockito.spy(new FlushService<>(client, channelCache, stage, true)); + flushService = Mockito.spy(new FlushService<>(client, channelCache, storageManager, true)); } ChannelData flushChannel(String name) { @@ -124,7 +131,10 @@ ChannelData flushChannel(String name) { BlobMetadata buildAndUpload() throws Exception { List>> blobData = Collections.singletonList(channelData); - return flushService.buildAndUpload("file_name", blobData); + return flushService.buildAndUpload( + "file_name", + blobData, + blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName()); } abstract SnowflakeStreamingIngestChannelInternal createChannel( @@ -408,35 +418,41 @@ private static ColumnMetadata createLargeTestTextColumn(String name) { @Test public void testGetFilePath() { TestContext testContext = testContextFactory.create(); - FlushService flushService = testContext.flushService; + StorageManager storageManager = testContext.storageManager; Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; - String outputString = flushService.getBlobPath(calendar, clientPrefix); - Path outputPath = Paths.get(outputString); - Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); - Assert.assertTrue( - calendar.get(Calendar.MINUTE) - - Integer.parseInt(outputPath.getParent().getFileName().toString()) - <= 1); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)), - outputPath.getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)), - outputPath.getParent().getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.MONTH) + 1), - outputPath.getParent().getParent().getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.YEAR)), - outputPath - .getParent() - .getParent() - .getParent() - .getParent() - .getParent() - .getFileName() - .toString()); + if (isIcebergMode) { + // TODO: SNOW-1502887 Blob path generation for iceberg table + String outputString = storageManager.generateBlobPath(); + } else { + String outputString = + ((InternalStageManager) storageManager).getBlobPath(calendar, clientPrefix); + Path outputPath = Paths.get(outputString); + Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); + Assert.assertTrue( + calendar.get(Calendar.MINUTE) + - Integer.parseInt(outputPath.getParent().getFileName().toString()) + <= 1); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)), + outputPath.getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)), + outputPath.getParent().getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.MONTH) + 1), + outputPath.getParent().getParent().getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.YEAR)), + outputPath + .getParent() + .getParent() + .getParent() + .getParent() + .getParent() + .getFileName() + .toString()); + } } @Test @@ -499,7 +515,8 @@ public void testBlobCreation() throws Exception { // Force = true flushes flushService.flush(true).get(); - Mockito.verify(flushService, Mockito.atLeast(2)).buildAndUpload(Mockito.any(), Mockito.any()); + Mockito.verify(flushService, Mockito.atLeast(2)) + .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); } @Test @@ -548,7 +565,8 @@ public void testBlobSplitDueToDifferentSchema() throws Exception { // Force = true flushes flushService.flush(true).get(); - Mockito.verify(flushService, Mockito.atLeast(2)).buildAndUpload(Mockito.any(), Mockito.any()); + Mockito.verify(flushService, Mockito.atLeast(2)) + .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); } @Test @@ -582,7 +600,8 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception { // Force = true flushes flushService.flush(true).get(); - Mockito.verify(flushService, Mockito.times(2)).buildAndUpload(Mockito.any(), Mockito.any()); + Mockito.verify(flushService, Mockito.times(2)) + .buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any()); } @Test @@ -624,7 +643,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti ArgumentCaptor>>>>> blobDataCaptor = ArgumentCaptor.forClass(List.class); Mockito.verify(flushService, Mockito.times(expectedBlobs)) - .buildAndUpload(Mockito.any(), blobDataCaptor.capture()); + .buildAndUpload(Mockito.any(), blobDataCaptor.capture(), Mockito.any()); // 1. list => blobs; 2. list => chunks; 3. list => channels; 4. list => rows, 5. list => columns List>>>>> allUploadedBlobs = @@ -667,7 +686,7 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except ArgumentCaptor>>>>> blobDataCaptor = ArgumentCaptor.forClass(List.class); Mockito.verify(flushService, Mockito.atLeast(2)) - .buildAndUpload(Mockito.any(), blobDataCaptor.capture()); + .buildAndUpload(Mockito.any(), blobDataCaptor.capture(), Mockito.any()); // 1. list => blobs; 2. list => chunks; 3. list => channels; 4. list => rows, 5. list => columns List>>>>> allUploadedBlobs = @@ -785,12 +804,15 @@ public void testBuildAndUpload() throws Exception { .build(); // Check FlushService.upload called with correct arguments + final ArgumentCaptor storageCaptor = + ArgumentCaptor.forClass(StreamingIngestStorage.class); final ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class); final ArgumentCaptor blobCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor> metadataCaptor = ArgumentCaptor.forClass(List.class); Mockito.verify(testContext.flushService) .upload( + storageCaptor.capture(), nameCaptor.capture(), blobCaptor.capture(), metadataCaptor.capture(), @@ -933,10 +955,10 @@ public void testInvalidateChannels() { innerData.add(channel1Data); innerData.add(channel2Data); - StreamingIngestStage stage = Mockito.mock(StreamingIngestStage.class); - Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); + StorageManager storageManager = + Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); FlushService flushService = - new FlushService<>(client, channelCache, stage, false); + new FlushService<>(client, channelCache, storageManager, false); flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test"); Assert.assertFalse(channel1.isValid()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 5beb0662f..f577c75c1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -274,7 +278,7 @@ public void testOpenChannelRequestCreationSuccess() { Assert.assertEquals( "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); - Assert.assertFalse(request.isOffsetTokenProvided()); + Assert.assertNull(request.getOffsetToken()); } @Test @@ -291,7 +295,6 @@ public void testOpenChannelRequesCreationtWithOffsetToken() { Assert.assertEquals( "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); Assert.assertEquals("TEST_TOKEN", request.getOffsetToken()); - Assert.assertTrue(request.isOffsetTokenProvided()); } @Test @@ -451,6 +454,10 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { @Test public void testOpenChannelSuccessResponse() throws Exception { + // TODO: SNOW-1490151 Iceberg testing gaps + if (isIcebergMode) { + return; + } String name = "CHANNEL"; String dbName = "STREAMINGINGEST_TEST"; String schemaName = "PUBLIC"; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 4e66d8a15..6325b3144 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -382,7 +386,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); ChannelsStatusRequest request = new ChannelsStatusRequest(); - request.setRequestId("null_0"); + request.setRequestId("testPrefix_0"); request.setChannels(Collections.singletonList(dto)); ChannelsStatusResponse result = client.getChannelsStatus(Collections.singletonList(channel)); Assert.assertEquals(response.getMessage(), result.getMessage()); @@ -1458,7 +1462,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); ChannelsStatusRequest request = new ChannelsStatusRequest(); - request.setRequestId("null_0"); + request.setRequestId("testPrefix_0"); request.setChannels(Collections.singletonList(dto)); Map result = client.getLatestCommittedOffsetTokens(Collections.singletonList(channel)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java similarity index 83% rename from src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java rename to src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java index d12c6231c..478934f4b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java @@ -1,6 +1,11 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED; +import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_PASSWORD; import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_USER; import static net.snowflake.ingest.utils.HttpUtil.NON_PROXY_HOSTS; @@ -33,6 +38,7 @@ import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.client.jdbc.cloud.storage.StageInfo; import net.snowflake.client.jdbc.internal.amazonaws.util.IOUtils; +import net.snowflake.client.jdbc.internal.apache.http.HttpEntity; import net.snowflake.client.jdbc.internal.apache.http.StatusLine; import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; import net.snowflake.client.jdbc.internal.apache.http.entity.BasicHttpEntity; @@ -42,7 +48,6 @@ import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.connection.RequestBuilder; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; import org.junit.Assert; @@ -56,7 +61,7 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({TestUtils.class, HttpUtil.class, SnowflakeFileTransferAgent.class}) -public class StreamingIngestStageTest { +public class StreamingIngestStorageTest { private final String prefix = "EXAMPLE_PREFIX"; @@ -114,15 +119,16 @@ public void testPutRemote() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStage stage = - new StreamingIngestStage( - true, - "role", - null, - null, + StorageManager storageManager = Mockito.mock(StorageManager.class); + Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); + + StreamingIngestStorage stage = + new StreamingIngestStorage( + storageManager, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), + null, 1); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); @@ -156,16 +162,14 @@ public void testPutLocal() throws Exception { String fullFilePath = "testOutput"; String fileName = "putLocalOutput"; - StreamingIngestStage stage = + StreamingIngestStorage stage = Mockito.spy( - new StreamingIngestStage( - true, - "role", - null, + new StreamingIngestStorage( null, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( fullFilePath, Optional.of(System.currentTimeMillis())), + null, 1)); Mockito.doReturn(true).when(stage).isLocalFS(); @@ -186,15 +190,16 @@ public void doTestPutRemoteRefreshes() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStage stage = - new StreamingIngestStage( - true, - "role", - null, - null, + StorageManager storageManager = Mockito.mock(StorageManager.class); + Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); + + StreamingIngestStorage stage = + new StreamingIngestStorage( + storageManager, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), + null, maxUploadRetryCount); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeSQLException e = @@ -240,16 +245,17 @@ public void testPutRemoteGCS() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStage stage = + StorageManager storageManager = Mockito.mock(StorageManager.class); + Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); + + StreamingIngestStorage stage = Mockito.spy( - new StreamingIngestStage( - true, - "role", - null, - null, + new StreamingIngestStorage( + storageManager, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), + null, 1)); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeFileTransferMetadataV1 metaMock = Mockito.mock(SnowflakeFileTransferMetadataV1.class); @@ -265,22 +271,31 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + SnowflakeStreamingIngestClientInternal mockClientInternal = + Mockito.mock(SnowflakeStreamingIngestClientInternal.class); + Mockito.when(mockClientInternal.getRole()).thenReturn("role"); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); - BasicHttpEntity entity = new BasicHttpEntity(); - entity.setContent( - new ByteArrayInputStream(exampleRemoteMetaResponse.getBytes(StandardCharsets.UTF_8))); - Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); - Mockito.when(mockResponse.getEntity()).thenReturn(entity); + Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); + SnowflakeServiceClient snowflakeServiceClient = + new SnowflakeServiceClient(mockClient, mockBuilder); + StorageManager storageManager = + new InternalStageManager(true, "role", "client", snowflakeServiceClient); + ParameterProvider parameterProvider = new ParameterProvider(false); - StreamingIngestStage stage = - new StreamingIngestStage(true, "role", mockClient, mockBuilder, "clientName", 1); + StreamingIngestStorage stage = + new StreamingIngestStorage( + storageManager, + "clientName", + (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, + null, + 1); - StreamingIngestStage.SnowflakeFileTransferMetadataWithAge metadataWithAge = + StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge metadataWithAge = stage.refreshSnowflakeMetadata(true); final ArgumentCaptor endpointCaptor = ArgumentCaptor.forClass(String.class); @@ -288,7 +303,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Mockito.verify(mockBuilder) .generateStreamingIngestPostRequest( stringCaptor.capture(), endpointCaptor.capture(), Mockito.any()); - Assert.assertEquals(Constants.CLIENT_CONFIGURE_ENDPOINT, endpointCaptor.getValue()); + Assert.assertEquals(CLIENT_CONFIGURE_ENDPOINT, endpointCaptor.getValue()); Assert.assertTrue(metadataWithAge.timestamp.isPresent()); Assert.assertEquals( StageInfo.StageType.S3, metadataWithAge.fileTransferMetadata.getStageInfo().getStageType()); @@ -299,7 +314,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Assert.assertEquals( Paths.get("placeholder").toAbsolutePath(), Paths.get(metadataWithAge.fileTransferMetadata.getPresignedUrlFileName()).toAbsolutePath()); - Assert.assertEquals(prefix + "_" + deploymentId, stage.getClientPrefix()); + Assert.assertEquals("testPrefix", storageManager.getClientPrefix()); } @Test @@ -307,19 +322,27 @@ public void testFetchSignedURL() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + SnowflakeStreamingIngestClientInternal mockClientInternal = + Mockito.mock(SnowflakeStreamingIngestClientInternal.class); + Mockito.when(mockClientInternal.getRole()).thenReturn("role"); + SnowflakeServiceClient snowflakeServiceClient = + new SnowflakeServiceClient(mockClient, mockBuilder); + StorageManager storageManager = + new InternalStageManager(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); - BasicHttpEntity entity = new BasicHttpEntity(); - entity.setContent( - new ByteArrayInputStream(exampleRemoteMetaResponse.getBytes(StandardCharsets.UTF_8))); - Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); - Mockito.when(mockResponse.getEntity()).thenReturn(entity); + Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - StreamingIngestStage stage = - new StreamingIngestStage(true, "role", mockClient, mockBuilder, "clientName", 1); + StreamingIngestStorage stage = + new StreamingIngestStorage( + storageManager, + "clientName", + (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, + null, + 1); SnowflakeFileTransferMetadataV1 metadata = stage.fetchSignedURL("path/fileName"); @@ -328,7 +351,7 @@ public void testFetchSignedURL() throws Exception { Mockito.verify(mockBuilder) .generateStreamingIngestPostRequest( stringCaptor.capture(), endpointCaptor.capture(), Mockito.any()); - Assert.assertEquals(Constants.CLIENT_CONFIGURE_ENDPOINT, endpointCaptor.getValue()); + Assert.assertEquals(CLIENT_CONFIGURE_ENDPOINT, endpointCaptor.getValue()); Assert.assertEquals(StageInfo.StageType.S3, metadata.getStageInfo().getStageType()); Assert.assertEquals("foo/streaming_ingest/", metadata.getStageInfo().getLocation()); Assert.assertEquals("path/fileName", metadata.getPresignedUrlFileName()); @@ -345,26 +368,26 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + SnowflakeStreamingIngestClientInternal mockClientInternal = + Mockito.mock(SnowflakeStreamingIngestClientInternal.class); + Mockito.when(mockClientInternal.getRole()).thenReturn("role"); + SnowflakeServiceClient snowflakeServiceClient = + new SnowflakeServiceClient(mockClient, mockBuilder); + StorageManager storageManager = + new InternalStageManager(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); - BasicHttpEntity entity = new BasicHttpEntity(); - entity.setContent( - new ByteArrayInputStream(exampleRemoteMetaResponse.getBytes(StandardCharsets.UTF_8))); - Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); - Mockito.when(mockResponse.getEntity()).thenReturn(entity); + Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - StreamingIngestStage stage = - new StreamingIngestStage( - true, - "role", - mockClient, - mockBuilder, + StreamingIngestStorage stage = + new StreamingIngestStorage( + storageManager, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( - originalMetadata, Optional.of(0L)), + (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, + null, 1); ThreadFactory buildUploadThreadFactory = @@ -493,15 +516,16 @@ public void testRefreshMetadataOnFirstPutException() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStage stage = - new StreamingIngestStage( - true, - "role", - null, - null, + StorageManager storageManager = Mockito.mock(StorageManager.class); + Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); + + StreamingIngestStorage stage = + new StreamingIngestStorage( + storageManager, "clientName", - new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge( + new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), + null, maxUploadRetryCount); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeSQLException e = @@ -546,4 +570,10 @@ public Object answer(org.mockito.invocation.InvocationOnMock invocation) InputStream capturedInput = capturedConfig.getUploadStream(); Assert.assertEquals("Hello Upload", IOUtils.toString(capturedInput)); } + + private HttpEntity createHttpEntity(String content) { + BasicHttpEntity entity = new BasicHttpEntity(); + entity.setContent(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8))); + return entity; + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java index 4e054c209..b40cdad82 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; @@ -5,9 +9,6 @@ import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.HashMap; -import java.util.Map; import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.connection.IngestResponseException; @@ -53,11 +54,11 @@ public void testJWTRetries() throws Exception { "testJWTRetries")); // build payload - Map payload = new HashMap<>(); - if (!TestUtils.getRole().isEmpty() && !TestUtils.getRole().equals("DEFAULT_ROLE")) { - payload.put("role", TestUtils.getRole()); - } - ObjectMapper mapper = new ObjectMapper(); + ClientConfigureRequest request = + new ClientConfigureRequest( + !TestUtils.getRole().isEmpty() && !TestUtils.getRole().equals("DEFAULT_ROLE") + ? TestUtils.getRole() + : null); // request wih invalid token, should get 401 3 times PowerMockito.doReturn("invalid_token").when(spyManager).getToken(); @@ -66,7 +67,7 @@ public void testJWTRetries() throws Exception { executeWithRetries( ChannelsStatusResponse.class, CLIENT_CONFIGURE_ENDPOINT, - mapper.writeValueAsString(payload), + request, "client configure", STREAMING_CLIENT_CONFIGURE, httpClient, @@ -84,7 +85,7 @@ public void testJWTRetries() throws Exception { executeWithRetries( ChannelsStatusResponse.class, CLIENT_CONFIGURE_ENDPOINT, - mapper.writeValueAsString(payload), + request, "client configure", STREAMING_CLIENT_CONFIGURE, httpClient, @@ -101,7 +102,7 @@ public void testJWTRetries() throws Exception { executeWithRetries( ChannelsStatusResponse.class, CLIENT_CONFIGURE_ENDPOINT, - mapper.writeValueAsString(payload), + request, "client configure", STREAMING_CLIENT_CONFIGURE, httpClient,