diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index cc8782dbd..4d3ea19aa 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming; @@ -150,7 +150,7 @@ public ZoneId getDefaultTimezone() { } public String getFullyQualifiedTableName() { - return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName); + return Utils.getFullyQualifiedTableName(this.dbName, this.schemaName, this.tableName); } public OnErrorOption getOnErrorOption() { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java index 3e5265719..fe9542267 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java @@ -1,9 +1,11 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; +import net.snowflake.ingest.utils.Utils; + /** * Channel immutable identification and encryption attributes. * @@ -36,12 +38,12 @@ class ChannelFlushContext { String encryptionKey, Long encryptionKeyId) { this.name = name; - this.fullyQualifiedName = String.format("%s.%s.%s.%s", dbName, schemaName, tableName, name); + this.fullyQualifiedName = + Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name); this.dbName = dbName; this.schemaName = schemaName; this.tableName = tableName; - this.fullyQualifiedTableName = - String.format("%s.%s.%s", this.getDbName(), this.getSchemaName(), this.getTableName()); + this.fullyQualifiedTableName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName); this.channelSequencer = channelSequencer; this.encryptionKey = encryptionKey; this.encryptionKeyId = encryptionKeyId; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java index b98782ab9..48bab63ff 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -8,7 +8,7 @@ import java.util.List; /** Class to deserialize a request from a channel status request */ -class ChannelsStatusRequest { +class ChannelsStatusRequest implements StreamingIngestRequest { // Used to deserialize a channel request static class ChannelStatusRequestDTO { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureCallHandler.java b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureCallHandler.java deleted file mode 100644 index 28398f709..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureCallHandler.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; -import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; -import net.snowflake.ingest.connection.IngestResponseException; -import net.snowflake.ingest.connection.RequestBuilder; -import net.snowflake.ingest.connection.ServiceResponseHandler; -import net.snowflake.ingest.utils.ErrorCode; -import net.snowflake.ingest.utils.SFException; -import net.snowflake.ingest.utils.Utils; - -/** A class that is used to configure a Snowflake Ingest Storage */ -class ConfigureCallHandler { - - // Object mapper for creating payload, ignore null fields - private static final ObjectMapper mapper = - new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); - - private final CloseableHttpClient httpClient; - private final RequestBuilder requestBuilder; - private final ServiceResponseHandler.ApiName apiName; - private final String configureEndpoint; - private final String role; - private final String database; - private final String schema; - private final String table; - - /** - * Builder method to create a {@link ConfigureCallHandler} - * - * @param httpClient the HTTP client - * @param requestBuilder the request builder - * @param apiName the API name, used for logging - * @param configureEndpoint the configure endpoint - * @return a {@link ConfigureCallHandlerBuilder} object - */ - static ConfigureCallHandlerBuilder builder( - CloseableHttpClient httpClient, - RequestBuilder requestBuilder, - ServiceResponseHandler.ApiName apiName, - String configureEndpoint) { - return new ConfigureCallHandlerBuilder(httpClient, requestBuilder, apiName, configureEndpoint); - } - - /** Builder class to build a {@link ConfigureCallHandler} */ - static class ConfigureCallHandlerBuilder { - private final CloseableHttpClient httpClient; - private final RequestBuilder requestBuilder; - private final ServiceResponseHandler.ApiName ApiName; - private final String configureEndpoint; - private String role; - private String database; - private String schema; - private String table; - private boolean isTestMode; - - /** - * Constructor for ConfigureCallHandlerBuilder - * - * @param httpClient the HTTP client - * @param requestBuilder the request builder - * @param apiName the API name, used for logging - * @param configureEndpoint the configure endpoint - */ - ConfigureCallHandlerBuilder( - CloseableHttpClient httpClient, - RequestBuilder requestBuilder, - ServiceResponseHandler.ApiName apiName, - String configureEndpoint) { - this.httpClient = httpClient; - this.requestBuilder = requestBuilder; - this.ApiName = apiName; - this.configureEndpoint = configureEndpoint; - } - - public ConfigureCallHandlerBuilder setRole(String role) { - this.role = role; - return this; - } - - public ConfigureCallHandlerBuilder setDatabase(String database) { - this.database = database; - return this; - } - - public ConfigureCallHandlerBuilder setSchema(String schema) { - this.schema = schema; - return this; - } - - public ConfigureCallHandlerBuilder setTable(String table) { - this.table = table; - return this; - } - - public ConfigureCallHandlerBuilder setIsTestMode(boolean isTestMode) { - this.isTestMode = isTestMode; - return this; - } - - public ConfigureCallHandler build() { - return new ConfigureCallHandler(this); - } - } - - ConfigureCallHandler(ConfigureCallHandlerBuilder builder) { - if (!builder.isTestMode) { - Utils.assertNotNull("http client", builder.httpClient); - Utils.assertNotNull("request builder", builder.requestBuilder); - Utils.assertNotNull("api name", builder.ApiName); - Utils.assertStringNotNullOrEmpty("configure endpoint", builder.configureEndpoint); - } - - this.httpClient = builder.httpClient; - this.requestBuilder = builder.requestBuilder; - this.apiName = builder.ApiName; - this.configureEndpoint = builder.configureEndpoint; - this.role = builder.role; - this.database = builder.database; - this.schema = builder.schema; - this.table = builder.table; - } - - /** - * Make a configure call to the Snowflake service - * - * @return the configure response - * @throws IOException - */ - ConfigureResponse makeConfigureCall() throws IOException { - return makeConfigureCall(makeConfigurePayload()); - } - - /** - * Make a configure call to the Snowflake service with a file name, used for GCS - * - * @param fileName the file name - * @return the configure response - * @throws IOException - */ - ConfigureResponse makeConfigureCall(String fileName) throws IOException { - Map payload = makeConfigurePayload(); - payload.put("file_name", fileName); - return makeConfigureCall(payload); - } - - private ConfigureResponse makeConfigureCall(Map payload) throws IOException { - try { - ConfigureResponse response = - executeWithRetries( - ConfigureResponse.class, - this.configureEndpoint, - mapper.writeValueAsString(payload), - "client configure", - this.apiName, - this.httpClient, - this.requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - throw new SFException(ErrorCode.CONFIGURE_FAILURE, response.getMessage()); - } - return response; - } catch (IngestResponseException e) { - throw new SFException(e, ErrorCode.CONFIGURE_FAILURE, e.getMessage()); - } - } - - private Map makeConfigurePayload() { - Map payload = new HashMap<>(); - payload.put("role", this.role); - payload.put("database", this.database); - payload.put("schema", this.schema); - payload.put("table", this.table); - return payload; - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java new file mode 100644 index 000000000..0443549fd --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import net.snowflake.ingest.utils.Utils; + +/** Class used to serialize the client / channel configure request. */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ConfigureRequest implements StreamingIngestRequest { + @JsonProperty("role") + private String role; + + @JsonProperty("database") + private String database; + + @JsonProperty("schema") + private String schema; + + @JsonProperty("table") + private String table; + + @JsonProperty("file_name") + private String fileName; + + /** + * Constructor for client configure request + * + * @param role Role to be used for the request. + */ + ConfigureRequest(String role) { + this.role = role; + } + + /** + * Constructor for channel configure request + * + * @param role Role to be used for the request. + * @param database Database name. + * @param schema Schema name. + * @param table Table name. + */ + ConfigureRequest(String role, String database, String schema, String table) { + this.role = role; + this.database = database; + this.schema = schema; + this.table = table; + } + + String getRole() { + return role; + } + + String getDatabase() { + return database; + } + + String getSchema() { + return schema; + } + + String getTable() { + return table; + } + + String getFileName() { + return fileName; + } + + /** Set the file name for the GCS signed url request. */ + void setFileName(String fileName) { + this.fileName = fileName; + } + + String getFullyQualifiedTableName() { + return Utils.getFullyQualifiedTableName(database, schema, table); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java index 6fe73fbd5..6fa2394a3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java @@ -4,9 +4,11 @@ package net.snowflake.ingest.streaming.internal; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; /** Class used to deserialize responses from configure endpoint */ +@JsonIgnoreProperties(ignoreUnknown = true) class ConfigureResponse extends StreamingIngestResponse { @JsonProperty("prefix") private String prefix; @@ -63,4 +65,11 @@ Long getDeploymentId() { void setDeploymentId(Long deploymentId) { this.deploymentId = deploymentId; } + + String getClientPrefix() { + if (this.deploymentId == null) { + return this.prefix; + } + return this.prefix + "_" + this.deploymentId; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java new file mode 100644 index 000000000..d064efad4 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; +import net.snowflake.ingest.streaming.DropChannelRequest; +import net.snowflake.ingest.utils.Utils; + +/** Class used to serialize the {@link DropChannelRequest} */ +public class DropChannelRequestInternal implements StreamingIngestRequest { + @JsonProperty("request_id") + private String requestId; + + @JsonProperty("role") + private String role; + + @JsonProperty("channel") + private String channel; + + @JsonProperty("table") + private String table; + + @JsonProperty("database") + private String database; + + @JsonProperty("schema") + private String schema; + + @JsonProperty("is_iceberg") + private boolean isIceberg; + + @JsonProperty("client_sequencer") + Long clientSequencer; + + DropChannelRequestInternal( + String requestId, String role, DropChannelRequest request, boolean isIceberg) { + this.requestId = requestId; + this.role = role; + this.channel = request.getChannelName(); + this.database = request.getDBName(); + this.schema = request.getSchemaName(); + this.isIceberg = isIceberg; + if (request instanceof DropChannelVersionRequest) { + this.clientSequencer = ((DropChannelVersionRequest) request).getClientSequencer(); + } + } + + String getRequestId() { + return requestId; + } + + String getRole() { + return role; + } + + String getChannel() { + return channel; + } + + String getTable() { + return table; + } + + String getDatabase() { + return database; + } + + String getSchema() { + return schema; + } + + boolean getIsIceberg() { + return isIceberg; + } + + Long getClientSequencer() { + return clientSequencer; + } + + String getFullyQualifiedTableName() { + return Utils.getFullyQualifiedTableName(database, schema, table); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java index 32414fb53..8d060a223 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java @@ -4,12 +4,8 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE; -import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT; - import java.io.IOException; import java.util.Calendar; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import net.snowflake.client.jdbc.SnowflakeSQLException; @@ -20,41 +16,56 @@ class ExternalVolumeManager implements StorageManager { // Reference to the external volume per table - private final Map externalVolumeMap; + private final Map> externalVolumeMap; + + // Reference to the owning client private final SnowflakeStreamingIngestClientInternal owningClient; - private final boolean isTestMode; + // Reference to the Snowflake service client used for configure calls + private final SnowflakeServiceClient snowflakeServiceClient; + + // Client prefix generated by the Snowflake server + private final String clientPrefix; /** * Constructor for ExternalVolumeManager * * @param isTestMode whether the manager in test mode * @param client the owning client + * @param snowflakeServiceClient the Snowflake service client used for configure calls */ - ExternalVolumeManager(boolean isTestMode, SnowflakeStreamingIngestClientInternal client) { + ExternalVolumeManager( + boolean isTestMode, + SnowflakeStreamingIngestClientInternal client, + SnowflakeServiceClient snowflakeServiceClient) { this.owningClient = client; - this.isTestMode = isTestMode; + this.snowflakeServiceClient = snowflakeServiceClient; this.externalVolumeMap = new ConcurrentHashMap<>(); + this.clientPrefix = + isTestMode + ? "testPrefix" + : this.snowflakeServiceClient + .clientConfigure(new ConfigureRequest(client.getRole())) + .getClientPrefix(); } /** - * Given a blob, return the target storage by looking up the table name from the channel context + * Given a channel context, return the target storage by looking up the table name * - * @param blobData the blob to upload + * @param channelFlushContext the channel flush context containing the table name * @return target storage */ @Override - public StreamingIngestStorage getStorage(List>> blobData) { + public StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext) { // Only one chunk per blob in Iceberg mode. - ChannelFlushContext channelContext = blobData.get(0).get(0).getChannelContext(); - StreamingIngestStorage stage = - this.externalVolumeMap.get(channelContext.getFullyQualifiedTableName()); + StreamingIngestStorage stage = + this.externalVolumeMap.get(channelFlushContext.getFullyQualifiedTableName()); if (stage == null) { throw new SFException( ErrorCode.INTERNAL_ERROR, String.format( - "No storage found for table %s", channelContext.getFullyQualifiedTableName())); + "No storage found for table %s", channelFlushContext.getFullyQualifiedTableName())); } return stage; @@ -63,42 +74,41 @@ public StreamingIngestStorage getStorage(List>> blobData) { /** * Add a storage to the manager by looking up the table name from the open channel response * - * @param openChannelResponse response from open channel + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + * @param fileLocationInfo response from open channel */ @Override - public void addStorage(OpenChannelResponse openChannelResponse) { - String fullyQualifiedTableName = - String.format( - "%s.%s.%s", - openChannelResponse.getDBName(), - openChannelResponse.getSchemaName(), - openChannelResponse.getTableName()); - if (!this.externalVolumeMap.containsKey(fullyQualifiedTableName)) { - try { - ConfigureCallHandler configureCallHandler = - ConfigureCallHandler.builder( - this.owningClient.getHttpClient(), - this.owningClient.getRequestBuilder(), - STREAMING_CHANNEL_CONFIGURE, - CHANNEL_CONFIGURE_ENDPOINT) - .setRole(this.owningClient.getRole()) - .setDatabase(openChannelResponse.getDBName()) - .setSchema(openChannelResponse.getSchemaName()) - .setTable(openChannelResponse.getTableName()) - .build(); - this.externalVolumeMap.put( - fullyQualifiedTableName, - new StreamingIngestStorage( - isTestMode, - configureCallHandler, - this.owningClient.getName(), - DEFAULT_MAX_UPLOAD_RETRIES)); - } catch (SnowflakeSQLException | IOException err) { - throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE); - } + public void addStorage( + String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) { + String fullyQualifiedTableName = String.format("%s.%s.%s", dbName, schemaName, tableName); + + try { + this.externalVolumeMap.putIfAbsent( + fullyQualifiedTableName, + new StreamingIngestStorage<>( + this, + this.owningClient.getName(), + fileLocationInfo, + new ConfigureRequest(this.owningClient.getRole(), dbName, schemaName, tableName), + DEFAULT_MAX_UPLOAD_RETRIES)); + } catch (SnowflakeSQLException | IOException err) { + throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE); } } + /** + * Configure method for storage + * + * @param request the configure request + * @return the configure response + */ + @Override + public ConfigureResponse configure(ConfigureRequest request) { + return this.snowflakeServiceClient.channelConfigure(request); + } + // TODO: SNOW-1502887 Blob path generation for iceberg table @Override public String generateBlobPath() { @@ -107,6 +117,9 @@ public String generateBlobPath() { // TODO: SNOW-1502887 Blob path generation for iceberg table @Override + public void decrementBlobSequencer() {} + + // TODO: SNOW-1502887 Blob path generation for iceberg table public String getBlobPath(Calendar calendar, String clientPrefix) { return ""; } @@ -118,9 +131,6 @@ public String getBlobPath(Calendar calendar, String clientPrefix) { */ @Override public String getClientPrefix() { - if (this.externalVolumeMap.isEmpty()) { - return null; - } - return this.externalVolumeMap.values().iterator().next().getClientPrefix(); + return this.clientPrefix; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index b1de58431..6b98f064f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -317,17 +317,11 @@ void distributeFlushTasks() { itr = this.channelCache.iterator(); List, CompletableFuture>> blobs = new ArrayList<>(); List> leftoverChannelsDataPerTable = new ArrayList<>(); - boolean isBlobCreated = true; - String currentBlobPath = null; while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) { List>> blobData = new ArrayList<>(); float totalBufferSizeInBytes = 0F; - if (isBlobCreated) { - currentBlobPath = this.storageManager.generateBlobPath(); - } - isBlobCreated = false; - final String blobPath = currentBlobPath; + final String blobPath = this.storageManager.generateBlobPath(); // Distribute work at table level, split the blob if reaching the blob size limit or the // channel has different encryption key ids @@ -408,8 +402,11 @@ && shouldStopProcessing( } // Kick off a build job - if (!blobData.isEmpty()) { - isBlobCreated = true; + if (blobData.isEmpty()) { + // we decrement the blob sequencer so that we do not have gaps in the blob names created by + // this client. + this.storageManager.decrementBlobSequencer(); + } else { long flushStartMs = System.currentTimeMillis(); if (this.owningClient.flushLatency != null) { latencyTimerContextMap.putIfAbsent(blobPath, this.owningClient.flushLatency.time()); @@ -522,7 +519,7 @@ BlobMetadata buildAndUpload(String blobPath, List>> blobData blob.blobStats.setBuildDurationMs(buildContext); return upload( - this.storageManager.getStorage(blobData), + this.storageManager.getStorage(blobData.get(0).get(0).getChannelContext()), blobPath, blob.blobBytes, blob.chunksMetadataList, @@ -630,11 +627,6 @@ void invalidateAllChannelsInBlob( })); } - /** Get the server generated unique prefix for this client */ - String getClientPrefix() { - return this.storageManager.getClientPrefix(); - } - /** * Throttle if the number of queued buildAndUpload tasks is bigger than the total number of * available processors diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index c7d74cc52..709622b13 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -4,14 +4,11 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; -import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Calendar; -import java.util.List; import java.util.TimeZone; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -23,7 +20,7 @@ /** Class to manage single Snowflake internal stage */ class InternalStageManager implements StorageManager { // Target stage for the client - private final StreamingIngestStorage targetStage; + private final StreamingIngestStorage targetStage; // Increasing counter to generate a unique blob name per client private final AtomicLong counter; @@ -31,28 +28,49 @@ class InternalStageManager implements StorageManager { // Whether the manager in test mode private final boolean isTestMode; + // Snowflake service client used for configure calls + private final SnowflakeServiceClient snowflakeServiceClient; + + // Client prefix generated by the Snowflake server + private final String clientPrefix; + /** * Constructor for InternalStageManager * * @param isTestMode whether the manager in test mode * @param client the owning client + * @param snowflakeServiceClient the Snowflake service client to use for configure calls */ - InternalStageManager(boolean isTestMode, SnowflakeStreamingIngestClientInternal client) { - ConfigureCallHandler configureCallHandler = - ConfigureCallHandler.builder( - client.getHttpClient(), - client.getRequestBuilder(), - STREAMING_CLIENT_CONFIGURE, - CLIENT_CONFIGURE_ENDPOINT) - .setRole(client.getRole()) - .setIsTestMode(isTestMode) - .build(); + InternalStageManager( + boolean isTestMode, + SnowflakeStreamingIngestClientInternal client, + SnowflakeServiceClient snowflakeServiceClient) { + this.snowflakeServiceClient = snowflakeServiceClient; this.isTestMode = isTestMode; this.counter = new AtomicLong(0); try { - targetStage = - new StreamingIngestStorage( - isTestMode, configureCallHandler, client.getName(), DEFAULT_MAX_UPLOAD_RETRIES); + if (!isTestMode) { + ConfigureRequest request = new ConfigureRequest(client.getRole()); + ConfigureResponse response = + this.snowflakeServiceClient.clientConfigure(new ConfigureRequest(client.getRole())); + this.clientPrefix = response.getClientPrefix(); + this.targetStage = + new StreamingIngestStorage<>( + this, + client.getName(), + response.getStageLocation(), + request, + DEFAULT_MAX_UPLOAD_RETRIES); + } else { + this.clientPrefix = "testPrefix"; + this.targetStage = + new StreamingIngestStorage<>( + this, + "testClient", + (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, + new ConfigureRequest(client.getRole()), + DEFAULT_MAX_UPLOAD_RETRIES); + } } catch (SnowflakeSQLException | IOException err) { throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE); } @@ -62,23 +80,38 @@ class InternalStageManager implements StorageManager { * Get the storage. In this case, the storage is always the target stage as there's only one stage * in non-iceberg mode. * - * @param blobData this parameter does not affect the method outcome + * @param channelFlushContext this parameter does not affect the method outcome * @return the target storage */ @Override @SuppressWarnings("unused") - public StreamingIngestStorage getStorage(List>> blobData) { + public StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext) { // There's always only one stage for the client in non-iceberg mode return targetStage; } /** - * Add storage to the manager. Should not be called in non-iceberg mode. + * Add storage to the manager. Do nothing as there's only one stage in non-Iceberg mode. + * + * @param dbName + * @param schemaName + * @param tableName + * @param fileLocationInfo + */ + @Override + public void addStorage( + String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {} + + /** + * Configure method for storage * - * @param openChannelResponse this parameter does not affect the instance + * @param request the configure request + * @return the configure response */ @Override - public void addStorage(OpenChannelResponse openChannelResponse) {} + public ConfigureResponse configure(ConfigureRequest request) { + return snowflakeServiceClient.clientConfigure(request); + } /** * Generate a blob path, which is: "YEAR/MONTH/DAY_OF_MONTH/HOUR_OF_DAY/MINUTE/ blobs; + + RegisterBlobRequest(String requestId, String role, List blobs) { + this.requestId = requestId; + this.role = role; + this.blobs = blobs; + } + + String getRequestId() { + return requestId; + } + + String getRole() { + return role; + } + + List getBlobs() { + return blobs; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java new file mode 100644 index 000000000..477abd17e --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB; +import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; +import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; + +import java.io.IOException; +import java.util.stream.Collectors; +import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.connection.ServiceResponseHandler; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.Logging; +import net.snowflake.ingest.utils.SFException; + +/** + * The SnowflakeServiceClient class is responsible for making API requests to the Snowflake service. + */ +class SnowflakeServiceClient { + private static final Logging logger = new Logging(SnowflakeServiceClient.class); + + // HTTP client used for making requests + private final CloseableHttpClient httpClient; + + // Request builder for building streaming API request + private final RequestBuilder requestBuilder; + + /** + * Default constructor + * + * @param httpClient the HTTP client used for making requests + * @param requestBuilder the request builder for building streaming API requests + */ + SnowflakeServiceClient(CloseableHttpClient httpClient, RequestBuilder requestBuilder) { + this.httpClient = httpClient; + this.requestBuilder = requestBuilder; + } + + /** + * Configures the client given a {@link ConfigureRequest}. + * + * @param request the client configuration request + * @return the response from the configuration request + */ + ConfigureResponse clientConfigure(ConfigureRequest request) { + ConfigureResponse response = + executeApiRequestWithRetries( + request, + ConfigureResponse.class, + CLIENT_CONFIGURE_ENDPOINT, + "client configure", + STREAMING_CLIENT_CONFIGURE, + ErrorCode.CONFIGURE_FAILURE); + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug("Client configure request failed, message={}", response.getMessage()); + throw new SFException(ErrorCode.CONFIGURE_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Configures the channel given a {@link ConfigureRequest}. + * + * @param request the channel configuration request + * @return the response from the configuration request + */ + ConfigureResponse channelConfigure(ConfigureRequest request) { + ConfigureResponse response = + executeApiRequestWithRetries( + request, + ConfigureResponse.class, + CHANNEL_CONFIGURE_ENDPOINT, + "channel configure", + STREAMING_CHANNEL_CONFIGURE, + ErrorCode.CONFIGURE_FAILURE); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Channel configure request failed, table={}, message={}", + request.getFullyQualifiedTableName(), + response.getMessage()); + throw new SFException(ErrorCode.CONFIGURE_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Opens a channel given a {@link OpenChannelRequestInternal}. + * + * @param request the open channel request + * @return the response from the open channel request + */ + OpenChannelResponse openChannel(OpenChannelRequestInternal request) { + OpenChannelResponse response = + executeApiRequestWithRetries( + request, + OpenChannelResponse.class, + OPEN_CHANNEL_ENDPOINT, + "open channel", + STREAMING_OPEN_CHANNEL, + ErrorCode.OPEN_CHANNEL_FAILURE); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Open channel request failed, table={}, message={}", + request.getFullyQualifiedTableName(), + response.getMessage()); + throw new SFException(ErrorCode.OPEN_CHANNEL_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Drops a channel given a {@link DropChannelRequestInternal}. + * + * @param request the drop channel request + * @return the response from the drop channel request + */ + DropChannelResponse dropChannel(DropChannelRequestInternal request) { + DropChannelResponse response = + executeApiRequestWithRetries( + request, + DropChannelResponse.class, + DROP_CHANNEL_ENDPOINT, + "drop channel", + STREAMING_DROP_CHANNEL, + ErrorCode.DROP_CHANNEL_FAILURE); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Drop channel request failed, table={}, message={}", + request.getFullyQualifiedTableName(), + response.getMessage()); + throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Gets the status of a channel given a {@link ChannelsStatusRequest}. + * + * @param request the channel status request + * @return the response from the channel status request + */ + ChannelsStatusResponse channelStatus(ChannelsStatusRequest request) { + ChannelsStatusResponse response = + executeApiRequestWithRetries( + request, + ChannelsStatusResponse.class, + CHANNEL_STATUS_ENDPOINT, + "channel status", + STREAMING_CHANNEL_STATUS, + ErrorCode.CHANNEL_STATUS_FAILURE); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug("Channel status request failed, message={}", response.getMessage()); + throw new SFException(ErrorCode.CHANNEL_STATUS_FAILURE, response.getMessage()); + } + return response; + } + + /** + * Registers a blob given a {@link RegisterBlobRequest}. + * + * @param request the register blob request + * @param executionCount the number of times the request has been executed, used for logging + * @return the response from the register blob request + */ + RegisterBlobResponse registerBlob(RegisterBlobRequest request, final int executionCount) { + RegisterBlobResponse response = + executeApiRequestWithRetries( + request, + RegisterBlobResponse.class, + REGISTER_BLOB_ENDPOINT, + "register blob", + STREAMING_REGISTER_BLOB, + ErrorCode.REGISTER_BLOB_FAILURE); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Register blob request failed for blob={}, message={}, executionCount={}", + request.getBlobs().stream().map(BlobMetadata::getPath).collect(Collectors.toList()), + response.getMessage(), + executionCount); + throw new SFException(ErrorCode.REGISTER_BLOB_FAILURE, response.getMessage()); + } + return response; + } + + private T executeApiRequestWithRetries( + StreamingIngestRequest request, + Class responseClass, + String endpoint, + String operation, + ServiceResponseHandler.ApiName apiName, + ErrorCode errorCode) { + try { + return executeWithRetries( + responseClass, + endpoint, + request, + operation, + apiName, + this.httpClient, + this.requestBuilder); + } catch (IngestResponseException | IOException e) { + throw new SFException(e, errorCode, e.getMessage()); + } + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 1831c5f36..996117158 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -4,20 +4,11 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB; -import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.sleepForRetry; -import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.COMMIT_MAX_RETRY_COUNT; import static net.snowflake.ingest.utils.Constants.COMMIT_RETRY_INTERVAL_IN_MS; -import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.ENABLE_TELEMETRY_TO_SF; import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY; -import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_ENQUEUE_TABLE_CHUNK_QUEUE_FULL; import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST; import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; @@ -39,7 +30,6 @@ import com.codahale.metrics.jvm.ThreadStatesGaugeSet; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.security.NoSuchAlgorithmException; @@ -64,7 +54,6 @@ import net.snowflake.client.core.SFSessionProperty; import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder; import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; -import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.connection.OAuthCredential; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.connection.TelemetryService; @@ -154,6 +143,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Background thread that uploads telemetry data periodically private ScheduledExecutorService telemetryWorker; + // Snowflake service client to make API calls + private SnowflakeServiceClient snowflakeServiceClient; + /** * Constructor * @@ -241,10 +233,12 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.setupMetricsForClient(); } + this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder); + this.storageManager = isIcebergMode - ? new ExternalVolumeManager<>(isTestMode, this) - : new InternalStageManager<>(isTestMode, this); + ? new ExternalVolumeManager<>(isTestMode, this, this.snowflakeServiceClient) + : new InternalStageManager<>(isTestMode, this, this.snowflakeServiceClient); try { this.flushService = @@ -295,6 +289,7 @@ public SnowflakeStreamingIngestClientInternal( @VisibleForTesting public void injectRequestBuilder(RequestBuilder requestBuilder) { this.requestBuilder = requestBuilder; + this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder); } /** @@ -340,81 +335,55 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request.getFullyQualifiedTableName(), getName()); - try { - Map payload = new HashMap<>(); - payload.put( - "request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); - payload.put("channel", request.getChannelName()); - payload.put("table", request.getTableName()); - payload.put("database", request.getDBName()); - payload.put("schema", request.getSchemaName()); - payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name()); - payload.put("role", this.role); - payload.put("is_iceberg", isIcebergMode); - if (request.isOffsetTokenProvided()) { - payload.put("offset_token", request.getOffsetToken()); - } - - OpenChannelResponse response = - executeWithRetries( - OpenChannelResponse.class, - OPEN_CHANNEL_ENDPOINT, - payload, - "open channel", - STREAMING_OPEN_CHANNEL, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - logger.logDebug( - "Open channel request failed, channel={}, table={}, client={}, message={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - getName(), - response.getMessage()); - throw new SFException(ErrorCode.OPEN_CHANNEL_FAILURE, response.getMessage()); - } - - logger.logInfo( - "Open channel request succeeded, channel={}, table={}, clientSequencer={}," - + " rowSequencer={}, client={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - response.getClientSequencer(), - response.getRowSequencer(), - getName()); - - // Channel is now registered, add it to the in-memory channel pool - SnowflakeStreamingIngestChannelInternal channel = - SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) - .setDBName(response.getDBName()) - .setSchemaName(response.getSchemaName()) - .setTableName(response.getTableName()) - .setOffsetToken(response.getOffsetToken()) - .setRowSequencer(response.getRowSequencer()) - .setChannelSequencer(response.getClientSequencer()) - .setOwningClient(this) - .setEncryptionKey(response.getEncryptionKey()) - .setEncryptionKeyId(response.getEncryptionKeyId()) - .setOnErrorOption(request.getOnErrorOption()) - .setDefaultTimezone(request.getDefaultTimezone()) - .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) - .build(); - - // Setup the row buffer schema - channel.setupSchema(response.getTableColumns()); - - // Add channel to the channel cache - this.channelCache.addChannel(channel); + OpenChannelRequestInternal openChannelRequest = + new OpenChannelRequestInternal( + this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), + this.role, + request, + Constants.WriteMode.CLOUD_STORAGE, + isIcebergMode); + OpenChannelResponse response = snowflakeServiceClient.openChannel(openChannelRequest); - // Add storage to the storage manager, only for external volume - this.storageManager.addStorage(response); + logger.logInfo( + "Open channel request succeeded, channel={}, table={}, clientSequencer={}," + + " rowSequencer={}, client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + response.getClientSequencer(), + response.getRowSequencer(), + getName()); - return channel; - } catch (IOException | IngestResponseException e) { - throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); - } + // Channel is now registered, add it to the in-memory channel pool + SnowflakeStreamingIngestChannelInternal channel = + SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) + .setDBName(response.getDBName()) + .setSchemaName(response.getSchemaName()) + .setTableName(response.getTableName()) + .setOffsetToken(response.getOffsetToken()) + .setRowSequencer(response.getRowSequencer()) + .setChannelSequencer(response.getClientSequencer()) + .setOwningClient(this) + .setEncryptionKey(response.getEncryptionKey()) + .setEncryptionKeyId(response.getEncryptionKeyId()) + .setOnErrorOption(request.getOnErrorOption()) + .setDefaultTimezone(request.getDefaultTimezone()) + .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) + .build(); + + // Setup the row buffer schema + channel.setupSchema(response.getTableColumns()); + + // Add channel to the channel cache + this.channelCache.addChannel(channel); + + // Add storage to the storage manager, only for external volume + this.storageManager.addStorage( + response.getDBName(), + response.getSchemaName(), + response.getTableName(), + response.getStageLocation()); + + return channel; } @Override @@ -429,55 +398,22 @@ public void dropChannel(DropChannelRequest request) { request.getFullyQualifiedTableName(), getName()); - try { - Map payload = new HashMap<>(); - payload.put( - "request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); - payload.put("channel", request.getChannelName()); - payload.put("table", request.getTableName()); - payload.put("database", request.getDBName()); - payload.put("schema", request.getSchemaName()); - payload.put("role", this.role); - payload.put("is_iceberg", isIcebergMode); - Long clientSequencer = null; - if (request instanceof DropChannelVersionRequest) { - clientSequencer = ((DropChannelVersionRequest) request).getClientSequencer(); - if (clientSequencer != null) { - payload.put("client_sequencer", clientSequencer); - } - } + DropChannelRequestInternal dropChannelRequest = + new DropChannelRequestInternal( + this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), + this.role, + request, + isIcebergMode); + snowflakeServiceClient.dropChannel(dropChannelRequest); - DropChannelResponse response = - executeWithRetries( - DropChannelResponse.class, - DROP_CHANNEL_ENDPOINT, - payload, - "drop channel", - STREAMING_DROP_CHANNEL, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - logger.logDebug( - "Drop channel request failed, channel={}, table={}, client={}, message={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - getName(), - response.getMessage()); - throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage()); - } - - logger.logInfo( - "Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - clientSequencer, - getName()); - - } catch (IOException | IngestResponseException e) { - throw new SFException(e, ErrorCode.DROP_CHANNEL_FAILURE, e.getMessage()); - } + logger.logInfo( + "Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + request instanceof DropChannelVersionRequest + ? ((DropChannelVersionRequest) request).getClientSequencer() + : null, + getName()); } /** @@ -511,56 +447,36 @@ public Map getLatestCommittedOffsetTokens( */ ChannelsStatusResponse getChannelsStatus( List> channels) { - try { - ChannelsStatusRequest request = new ChannelsStatusRequest(); - List requestDTOs = - channels.stream() - .map(ChannelsStatusRequest.ChannelStatusRequestDTO::new) - .collect(Collectors.toList()); - request.setChannels(requestDTOs); - request.setRole(this.role); - request.setRequestId(this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); - - String payload = objectMapper.writeValueAsString(request); - - ChannelsStatusResponse response = - executeWithRetries( - ChannelsStatusResponse.class, - CHANNEL_STATUS_ENDPOINT, - payload, - "channel status", - STREAMING_CHANNEL_STATUS, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - throw new SFException(ErrorCode.CHANNEL_STATUS_FAILURE, response.getMessage()); - } + ChannelsStatusRequest request = new ChannelsStatusRequest(); + List requestDTOs = + channels.stream() + .map(ChannelsStatusRequest.ChannelStatusRequestDTO::new) + .collect(Collectors.toList()); + request.setChannels(requestDTOs); + request.setRole(this.role); + request.setRequestId(this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement()); - for (int idx = 0; idx < channels.size(); idx++) { - SnowflakeStreamingIngestChannelInternal channel = channels.get(idx); - ChannelsStatusResponse.ChannelStatusResponseDTO channelStatus = - response.getChannels().get(idx); - if (channelStatus.getStatusCode() != RESPONSE_SUCCESS) { - String errorMessage = - String.format( - "Channel has failure status_code, name=%s, channel_sequencer=%d, status_code=%d", - channel.getFullyQualifiedName(), - channel.getChannelSequencer(), - channelStatus.getStatusCode()); - logger.logWarn(errorMessage); - if (getTelemetryService() != null) { - getTelemetryService() - .reportClientFailure(this.getClass().getSimpleName(), errorMessage); - } + ChannelsStatusResponse response = snowflakeServiceClient.channelStatus(request); + + for (int idx = 0; idx < channels.size(); idx++) { + SnowflakeStreamingIngestChannelInternal channel = channels.get(idx); + ChannelsStatusResponse.ChannelStatusResponseDTO channelStatus = + response.getChannels().get(idx); + if (channelStatus.getStatusCode() != RESPONSE_SUCCESS) { + String errorMessage = + String.format( + "Channel has failure status_code, name=%s, channel_sequencer=%d, status_code=%d", + channel.getFullyQualifiedName(), + channel.getChannelSequencer(), + channelStatus.getStatusCode()); + logger.logWarn(errorMessage); + if (getTelemetryService() != null) { + getTelemetryService().reportClientFailure(this.getClass().getSimpleName(), errorMessage); } } - - return response; - } catch (IOException | IngestResponseException e) { - throw new SFException(e, ErrorCode.CHANNEL_STATUS_FAILURE, e.getMessage()); } + + return response; } /** @@ -631,37 +547,13 @@ void registerBlobs(List blobs, final int executionCount) { this.name, executionCount); - RegisterBlobResponse response = null; - try { - Map payload = new HashMap<>(); - payload.put( - "request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); - payload.put("blobs", blobs); - payload.put("role", this.role); - - response = - executeWithRetries( - RegisterBlobResponse.class, - REGISTER_BLOB_ENDPOINT, - payload, - "register blob", - STREAMING_REGISTER_BLOB, - httpClient, - requestBuilder); - - // Check for Snowflake specific response code - if (response.getStatusCode() != RESPONSE_SUCCESS) { - logger.logDebug( - "Register blob request failed for blob={}, client={}, message={}, executionCount={}", - blobs.stream().map(BlobMetadata::getPath).collect(Collectors.toList()), - this.name, - response.getMessage(), - executionCount); - throw new SFException(ErrorCode.REGISTER_BLOB_FAILURE, response.getMessage()); - } - } catch (IOException | IngestResponseException e) { - throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage()); - } + RegisterBlobRequest request = + new RegisterBlobRequest( + this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), + this.role, + blobs); + + RegisterBlobResponse response = snowflakeServiceClient.registerBlob(request, executionCount); logger.logInfo( "Register blob request returned for blob={}, client={}, executionCount={}", diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java index aca34e72a..bf2aceac0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java @@ -4,10 +4,6 @@ package net.snowflake.ingest.streaming.internal; -import com.google.common.annotations.VisibleForTesting; -import java.util.Calendar; -import java.util.List; - /** Interface to manage {@link StreamingIngestStorage} for {@link FlushService} */ interface StorageManager { // Default max upload retries for streaming ingest storage @@ -16,34 +12,39 @@ interface StorageManager { /** * Given a blob, return the target storage * - * @param blobData the blob to upload + * @param channelFlushContext the blob to upload * @return target stage */ - StreamingIngestStorage getStorage(List>> blobData); + StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext); /** * Add a storage to the manager * - * @param openChannelResponse response from open channel + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + * @param fileLocationInfo file location info from configure response */ - void addStorage(OpenChannelResponse openChannelResponse); + void addStorage( + String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo); /** - * Generate a unique blob path for the blob + * Configure method for storage * - * @return the blob path + * @param request the configure request + * @return the configure response */ - String generateBlobPath(); + ConfigureResponse configure(ConfigureRequest request); /** - * Get the blob path given time and client prefix, used for testing only + * Generate a unique blob path and increment the blob sequencer * - * @param calendar the time - * @param clientPrefix the client prefix * @return the blob path */ - @VisibleForTesting - String getBlobPath(Calendar calendar, String clientPrefix); + String generateBlobPath(); + + /** Decrement the blob sequencer */ + void decrementBlobSequencer(); /** * Get the unique client prefix generated by the Snowflake server diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java new file mode 100644 index 000000000..66daee1d9 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** + * The StreamingIngestRequest interface is a marker interface used for type safety in the {@link + * SnowflakeServiceClient} for streaming ingest API request. + */ +public interface StreamingIngestRequest {} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestResponse.java index 1ec01fceb..6c4df8c6d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestResponse.java @@ -1,9 +1,16 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; +/** + * The StreamingIngestResponse class is an abstract class that represents a response from the + * Snowflake streaming ingest API. This class provides a common structure for all types of responses + * that can be received from the {@link SnowflakeServiceClient}. + */ abstract class StreamingIngestResponse { abstract Long getStatusCode(); + + abstract String getMessage(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java index 13bac5386..78e5d9efc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java @@ -33,7 +33,7 @@ import net.snowflake.ingest.utils.Utils; /** Handles uploading files to the Snowflake Streaming Ingest Storage */ -class StreamingIngestStorage { +class StreamingIngestStorage { private static final ObjectMapper mapper = new ObjectMapper(); // Object mapper for parsing the client/configure response to Jackson version the same as @@ -77,9 +77,9 @@ state to record unknown age. } private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; - private final ConfigureCallHandler configureCallHandler; + private final StorageManager owningManager; + private final ConfigureRequest configureRequest; private final String clientName; - private String clientPrefix; private final int maxUploadRetries; @@ -87,70 +87,51 @@ state to record unknown age. private final Properties proxyProperties; /** - * Default constructor for internal stage + * Default constructor * - * @param isTestMode Whether it's in test mode - * @param configureCallHandler The ConfigureCallHandler to use - * @param clientName The client name - * @param maxUploadRetries The maximum number of retries to attempt - * @throws SnowflakeSQLException - * @throws IOException - */ - StreamingIngestStorage( - boolean isTestMode, - ConfigureCallHandler configureCallHandler, - String clientName, - int maxUploadRetries) - throws SnowflakeSQLException, IOException { - this.configureCallHandler = configureCallHandler; - this.clientName = clientName; - this.proxyProperties = generateProxyPropertiesForJDBC(); - this.maxUploadRetries = maxUploadRetries; - if (!isTestMode) { - refreshSnowflakeMetadata(); - } - } - - /** - * Default constructor for external volume - * - * @param configureCallHandler The ConfigureCallHandler to use + * @param owningManager the storage manager owning this storage * @param clientName The client name * @param fileLocationInfo The file location information from open channel response + * @param configureRequest The configure request for configure call * @param maxUploadRetries The maximum number of retries to attempt */ StreamingIngestStorage( - ConfigureCallHandler configureCallHandler, + StorageManager owningManager, String clientName, FileLocationInfo fileLocationInfo, + ConfigureRequest configureRequest, int maxUploadRetries) throws SnowflakeSQLException, IOException { - this.configureCallHandler = configureCallHandler; - this.clientName = clientName; - this.proxyProperties = generateProxyPropertiesForJDBC(); - this.maxUploadRetries = maxUploadRetries; + this( + owningManager, + clientName, + (SnowflakeFileTransferMetadataWithAge) null, + configureRequest, + maxUploadRetries); createFileTransferMetadataWithAge(fileLocationInfo); } /** * Constructor for TESTING that takes SnowflakeFileTransferMetadataWithAge as input * - * @param isTestMode must be true - * @param configureCallHandler the ConfigureCallHandler to use + * @param owningManager the storage manager owning this storage * @param clientName the client name * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with + * @param configureRequest the configure request for configure call + * @param maxUploadRetries the maximum number of retries to attempt */ StreamingIngestStorage( - boolean isTestMode, - ConfigureCallHandler configureCallHandler, + StorageManager owningManager, String clientName, SnowflakeFileTransferMetadataWithAge testMetadata, - int maxRetryCount) + ConfigureRequest configureRequest, + int maxUploadRetries) throws SnowflakeSQLException, IOException { - this(isTestMode, configureCallHandler, clientName, maxRetryCount); - if (!isTestMode) { - throw new SFException(ErrorCode.INTERNAL_ERROR); - } + this.owningManager = owningManager; + this.clientName = clientName; + this.maxUploadRetries = maxUploadRetries; + this.proxyProperties = generateProxyPropertiesForJDBC(); + this.configureRequest = configureRequest; this.fileTransferMetadataWithAge = testMetadata; } @@ -204,7 +185,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) .setUploadStream(inStream) .setRequireCompress(false) .setOcspMode(OCSPMode.FAIL_OPEN) - .setStreamingIngestClientKey(this.clientPrefix) + .setStreamingIngestClientKey(this.owningManager.getClientPrefix()) .setStreamingIngestClientName(this.clientName) .setProxyProperties(this.proxyProperties) .setDestFileName(fullFilePath) @@ -262,11 +243,7 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - ConfigureResponse response = this.configureCallHandler.makeConfigureCall(); - // Do not change the prefix everytime we have to refresh credentials - if (Utils.isNullOrEmpty(this.clientPrefix)) { - this.clientPrefix = createClientPrefix(response); - } + ConfigureResponse response = this.owningManager.configure(this.configureRequest); return createFileTransferMetadataWithAge(response.getStageLocation()); } @@ -275,7 +252,7 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( throws JsonProcessingException, net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException, SnowflakeSQLException { - Utils.assertStringNotNullOrEmpty("client prefix", this.clientPrefix); + Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix()); if (fileLocationInfo .getLocationType() @@ -327,7 +304,8 @@ private String createClientPrefix(final ConfigureResponse response) { SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { - ConfigureResponse response = this.configureCallHandler.makeConfigureCall(fileName); + this.configureRequest.setFileName(fileName); + ConfigureResponse response = this.owningManager.configure(this.configureRequest); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) @@ -405,9 +383,4 @@ void putLocal(String fullFilePath, byte[] data) { throw new SFException(ex, ErrorCode.BLOB_UPLOAD_FAILURE); } } - - /** Get the server generated unique prefix for this client */ - String getClientPrefix() { - return this.clientPrefix; - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java index 56e960064..ebfffa234 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY; @@ -6,7 +10,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.util.Map; import java.util.function.Function; import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpUriRequest; @@ -77,7 +80,7 @@ public static void sleepForRetry(int executionCount) { static T executeWithRetries( Class targetClass, String endpoint, - Map payload, + StreamingIngestRequest payload, String message, ServiceResponseHandler.ApiName apiName, CloseableHttpClient httpClient, diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index a06df4027..5220625da 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.utils; @@ -384,4 +384,31 @@ public static String getStackTrace(Throwable e) { } return stackTrace.toString(); } + + /** + * Get the fully qualified table name + * + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + * @return the fully qualified table name + */ + public static String getFullyQualifiedTableName( + String dbName, String schemaName, String tableName) { + return String.format("%s.%s.%s", dbName, schemaName, tableName); + } + + /** + * Get the fully qualified channel name + * + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + * @param channelName the channel name + * @return the fully qualified channel name + */ + public static String getFullyQualifiedChannelName( + String dbName, String schemaName, String tableName, String channelName) { + return String.format("%s.%s.%s.%s", dbName, schemaName, tableName, channelName); + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 8778977fe..a1ff4d12f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -103,14 +103,14 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { storage = Mockito.mock(StreamingIngestStorage.class); - Mockito.when(storage.getClientPrefix()).thenReturn("client_prefix"); parameterProvider = new ParameterProvider(isIcebergMode); internalParameterProvider = new InternalParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); - storageManager = Mockito.spy(new InternalStageManager<>(true, client)); + storageManager = Mockito.spy(new InternalStageManager<>(true, client, null)); Mockito.when(storageManager.getStorage(ArgumentMatchers.any())).thenReturn(storage); + Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); registerService = Mockito.spy(new RegisterService(client, client.isTestMode())); @@ -414,32 +414,38 @@ public void testGetFilePath() { StorageManager storageManager = testContext.storageManager; Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; - String outputString = storageManager.getBlobPath(calendar, clientPrefix); - Path outputPath = Paths.get(outputString); - Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); - Assert.assertTrue( - calendar.get(Calendar.MINUTE) - - Integer.parseInt(outputPath.getParent().getFileName().toString()) - <= 1); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)), - outputPath.getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)), - outputPath.getParent().getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.MONTH) + 1), - outputPath.getParent().getParent().getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.YEAR)), - outputPath - .getParent() - .getParent() - .getParent() - .getParent() - .getParent() - .getFileName() - .toString()); + if (isIcebergMode) { + // TODO: SNOW-1502887 Blob path generation for iceberg table + String outputString = storageManager.generateBlobPath(); + } else { + String outputString = + ((InternalStageManager) storageManager).getBlobPath(calendar, clientPrefix); + Path outputPath = Paths.get(outputString); + Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); + Assert.assertTrue( + calendar.get(Calendar.MINUTE) + - Integer.parseInt(outputPath.getParent().getFileName().toString()) + <= 1); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)), + outputPath.getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)), + outputPath.getParent().getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.MONTH) + 1), + outputPath.getParent().getParent().getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.YEAR)), + outputPath + .getParent() + .getParent() + .getParent() + .getParent() + .getParent() + .getFileName() + .toString()); + } } @Test @@ -939,10 +945,8 @@ public void testInvalidateChannels() { innerData.add(channel1Data); innerData.add(channel2Data); - StreamingIngestStorage stage = Mockito.mock(StreamingIngestStorage.class); - Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); StorageManager storageManager = - Mockito.spy(new InternalStageManager<>(true, client)); + Mockito.spy(new InternalStageManager<>(true, client, null)); FlushService flushService = new FlushService<>(client, channelCache, storageManager, false); flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 5beb0662f..999c7e3c7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -451,6 +455,10 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { @Test public void testOpenChannelSuccessResponse() throws Exception { + // TODO: SNOW-1490151 Iceberg testing gaps + if (isIcebergMode) { + return; + } String name = "CHANNEL"; String dbName = "STREAMINGINGEST_TEST"; String schemaName = "PUBLIC"; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 4e66d8a15..6325b3144 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -382,7 +386,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); ChannelsStatusRequest request = new ChannelsStatusRequest(); - request.setRequestId("null_0"); + request.setRequestId("testPrefix_0"); request.setChannels(Collections.singletonList(dto)); ChannelsStatusResponse result = client.getChannelsStatus(Collections.singletonList(channel)); Assert.assertEquals(response.getMessage(), result.getMessage()); @@ -1458,7 +1462,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); ChannelsStatusRequest request = new ChannelsStatusRequest(); - request.setRequestId("null_0"); + request.setRequestId("testPrefix_0"); request.setChannels(Collections.singletonList(dto)); Map result = client.getLatestCommittedOffsetTokens(Collections.singletonList(channel)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java index b018025b4..3e922f66c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java @@ -5,7 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; +import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_PASSWORD; import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_USER; import static net.snowflake.ingest.utils.HttpUtil.NON_PROXY_HOSTS; @@ -38,6 +38,7 @@ import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.client.jdbc.cloud.storage.StageInfo; import net.snowflake.client.jdbc.internal.amazonaws.util.IOUtils; +import net.snowflake.client.jdbc.internal.apache.http.HttpEntity; import net.snowflake.client.jdbc.internal.apache.http.StatusLine; import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; import net.snowflake.client.jdbc.internal.apache.http.entity.BasicHttpEntity; @@ -118,13 +119,16 @@ public void testPutRemote() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStorage stage = + StorageManager storageManager = Mockito.mock(StorageManager.class); + Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); + + StreamingIngestStorage stage = new StreamingIngestStorage( - true, - null, + storageManager, "clientName", new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), + null, 1); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); @@ -158,14 +162,14 @@ public void testPutLocal() throws Exception { String fullFilePath = "testOutput"; String fileName = "putLocalOutput"; - StreamingIngestStorage stage = + StreamingIngestStorage stage = Mockito.spy( new StreamingIngestStorage( - true, null, "clientName", new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( fullFilePath, Optional.of(System.currentTimeMillis())), + null, 1)); Mockito.doReturn(true).when(stage).isLocalFS(); @@ -186,13 +190,16 @@ public void doTestPutRemoteRefreshes() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStorage stage = + StorageManager storageManager = Mockito.mock(StorageManager.class); + Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); + + StreamingIngestStorage stage = new StreamingIngestStorage( - true, - null, + storageManager, "clientName", new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), + null, maxUploadRetryCount); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeSQLException e = @@ -238,14 +245,17 @@ public void testPutRemoteGCS() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StreamingIngestStorage stage = + StorageManager storageManager = Mockito.mock(StorageManager.class); + Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); + + StreamingIngestStorage stage = Mockito.spy( new StreamingIngestStorage( - true, - null, + storageManager, "clientName", new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), + null, 1)); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeFileTransferMetadataV1 metaMock = Mockito.mock(SnowflakeFileTransferMetadataV1.class); @@ -261,25 +271,29 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); - ConfigureCallHandler configureCallHandler = - ConfigureCallHandler.builder( - mockClient, mockBuilder, STREAMING_CLIENT_CONFIGURE, "endpoint") - .setRole("role") - .build(); + SnowflakeStreamingIngestClientInternal mockClientInternal = + Mockito.mock(SnowflakeStreamingIngestClientInternal.class); + Mockito.when(mockClientInternal.getRole()).thenReturn("role"); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); - BasicHttpEntity entity = new BasicHttpEntity(); - entity.setContent( - new ByteArrayInputStream(exampleRemoteMetaResponse.getBytes(StandardCharsets.UTF_8))); - Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); - Mockito.when(mockResponse.getEntity()).thenReturn(entity); + Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); + SnowflakeServiceClient snowflakeServiceClient = + new SnowflakeServiceClient(mockClient, mockBuilder); + StorageManager storageManager = + new InternalStageManager(true, mockClientInternal, snowflakeServiceClient); + ParameterProvider parameterProvider = new ParameterProvider(false); - StreamingIngestStorage stage = - new StreamingIngestStorage(true, configureCallHandler, "clientName", 1); + StreamingIngestStorage stage = + new StreamingIngestStorage( + storageManager, + "clientName", + (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, + null, + 1); StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge metadataWithAge = stage.refreshSnowflakeMetadata(true); @@ -289,7 +303,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Mockito.verify(mockBuilder) .generateStreamingIngestPostRequest( stringCaptor.capture(), endpointCaptor.capture(), Mockito.any()); - Assert.assertEquals("endpoint", endpointCaptor.getValue()); + Assert.assertEquals(CLIENT_CONFIGURE_ENDPOINT, endpointCaptor.getValue()); Assert.assertTrue(metadataWithAge.timestamp.isPresent()); Assert.assertEquals( StageInfo.StageType.S3, metadataWithAge.fileTransferMetadata.getStageInfo().getStageType()); @@ -300,7 +314,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Assert.assertEquals( Paths.get("placeholder").toAbsolutePath(), Paths.get(metadataWithAge.fileTransferMetadata.getPresignedUrlFileName()).toAbsolutePath()); - Assert.assertEquals(prefix + "_" + deploymentId, stage.getClientPrefix()); + Assert.assertEquals("testPrefix", storageManager.getClientPrefix()); } @Test @@ -308,24 +322,27 @@ public void testFetchSignedURL() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); - ConfigureCallHandler configureCallHandler = - ConfigureCallHandler.builder( - mockClient, mockBuilder, STREAMING_CLIENT_CONFIGURE, "endpoint") - .setRole("role") - .build(); + SnowflakeStreamingIngestClientInternal mockClientInternal = + Mockito.mock(SnowflakeStreamingIngestClientInternal.class); + Mockito.when(mockClientInternal.getRole()).thenReturn("role"); + SnowflakeServiceClient snowflakeServiceClient = + new SnowflakeServiceClient(mockClient, mockBuilder); + StorageManager storageManager = + new InternalStageManager(true, mockClientInternal, snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); - BasicHttpEntity entity = new BasicHttpEntity(); - entity.setContent( - new ByteArrayInputStream(exampleRemoteMetaResponse.getBytes(StandardCharsets.UTF_8))); - Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); - Mockito.when(mockResponse.getEntity()).thenReturn(entity); + Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - StreamingIngestStorage stage = - new StreamingIngestStorage(true, configureCallHandler, "clientName", 1); + StreamingIngestStorage stage = + new StreamingIngestStorage( + storageManager, + "clientName", + (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, + new ConfigureRequest(mockClientInternal.getRole()), + 1); SnowflakeFileTransferMetadataV1 metadata = stage.fetchSignedURL("path/fileName"); @@ -334,7 +351,7 @@ public void testFetchSignedURL() throws Exception { Mockito.verify(mockBuilder) .generateStreamingIngestPostRequest( stringCaptor.capture(), endpointCaptor.capture(), Mockito.any()); - Assert.assertEquals("endpoint", endpointCaptor.getValue()); + Assert.assertEquals(CLIENT_CONFIGURE_ENDPOINT, endpointCaptor.getValue()); Assert.assertEquals(StageInfo.StageType.S3, metadata.getStageInfo().getStageType()); Assert.assertEquals("foo/streaming_ingest/", metadata.getStageInfo().getLocation()); Assert.assertEquals("path/fileName", metadata.getPresignedUrlFileName()); @@ -351,29 +368,26 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { RequestBuilder mockBuilder = Mockito.mock(RequestBuilder.class); CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); - ConfigureCallHandler configureCallHandler = - ConfigureCallHandler.builder( - mockClient, mockBuilder, STREAMING_CLIENT_CONFIGURE, "endpoint") - .setRole("role") - .build(); + SnowflakeStreamingIngestClientInternal mockClientInternal = + Mockito.mock(SnowflakeStreamingIngestClientInternal.class); + Mockito.when(mockClientInternal.getRole()).thenReturn("role"); + SnowflakeServiceClient snowflakeServiceClient = + new SnowflakeServiceClient(mockClient, mockBuilder); + StorageManager storageManager = + new InternalStageManager(true, mockClientInternal, snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); - BasicHttpEntity entity = new BasicHttpEntity(); - entity.setContent( - new ByteArrayInputStream(exampleRemoteMetaResponse.getBytes(StandardCharsets.UTF_8))); - Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); - Mockito.when(mockResponse.getEntity()).thenReturn(entity); + Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); StreamingIngestStorage stage = new StreamingIngestStorage( - true, - configureCallHandler, + storageManager, "clientName", - new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( - originalMetadata, Optional.of(0L)), + (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, + null, 1); ThreadFactory buildUploadThreadFactory = @@ -502,13 +516,16 @@ public void testRefreshMetadataOnFirstPutException() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); + StorageManager storageManager = Mockito.mock(StorageManager.class); + Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); + StreamingIngestStorage stage = new StreamingIngestStorage( - true, - null, + storageManager, "clientName", new StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), + null, maxUploadRetryCount); PowerMockito.mockStatic(SnowflakeFileTransferAgent.class); SnowflakeSQLException e = @@ -553,4 +570,10 @@ public Object answer(org.mockito.invocation.InvocationOnMock invocation) InputStream capturedInput = capturedConfig.getUploadStream(); Assert.assertEquals("Hello Upload", IOUtils.toString(capturedInput)); } + + private HttpEntity createHttpEntity(String content) { + BasicHttpEntity entity = new BasicHttpEntity(); + entity.setContent(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8))); + return entity; + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java index 4e054c209..09f806fcb 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; @@ -5,9 +9,6 @@ import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.HashMap; -import java.util.Map; import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.connection.IngestResponseException; @@ -53,11 +54,11 @@ public void testJWTRetries() throws Exception { "testJWTRetries")); // build payload - Map payload = new HashMap<>(); - if (!TestUtils.getRole().isEmpty() && !TestUtils.getRole().equals("DEFAULT_ROLE")) { - payload.put("role", TestUtils.getRole()); - } - ObjectMapper mapper = new ObjectMapper(); + ConfigureRequest request = + new ConfigureRequest( + !TestUtils.getRole().isEmpty() && !TestUtils.getRole().equals("DEFAULT_ROLE") + ? TestUtils.getRole() + : null); // request wih invalid token, should get 401 3 times PowerMockito.doReturn("invalid_token").when(spyManager).getToken(); @@ -66,7 +67,7 @@ public void testJWTRetries() throws Exception { executeWithRetries( ChannelsStatusResponse.class, CLIENT_CONFIGURE_ENDPOINT, - mapper.writeValueAsString(payload), + request, "client configure", STREAMING_CLIENT_CONFIGURE, httpClient, @@ -84,7 +85,7 @@ public void testJWTRetries() throws Exception { executeWithRetries( ChannelsStatusResponse.class, CLIENT_CONFIGURE_ENDPOINT, - mapper.writeValueAsString(payload), + request, "client configure", STREAMING_CLIENT_CONFIGURE, httpClient, @@ -101,7 +102,7 @@ public void testJWTRetries() throws Exception { executeWithRetries( ChannelsStatusResponse.class, CLIENT_CONFIGURE_ENDPOINT, - mapper.writeValueAsString(payload), + request, "client configure", STREAMING_CLIENT_CONFIGURE, httpClient,