From ceee1a2637eb1a9af285bd7bb185b095cbd6d1bb Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Fri, 20 Sep 2024 22:30:08 +0000 Subject: [PATCH] add extvol and extvolmanager support fix unit tests change non_default to non_null --- .../connection/ServiceResponseHandler.java | 2 +- .../internal/ChannelConfigureRequest.java | 53 --- .../internal/ChannelConfigureResponse.java | 46 -- .../streaming/internal/ChunkMetadata.java | 4 +- .../streaming/internal/ColumnMetadata.java | 2 +- .../streaming/internal/ExternalVolume.java | 402 +++++++++++++++++- .../internal/ExternalVolumeManager.java | 34 +- .../GeneratePresignedUrlsRequest.java | 93 ++++ .../GeneratePresignedUrlsResponse.java | 47 ++ .../internal/OpenChannelResponse.java | 10 +- .../internal/SnowflakeServiceClient.java | 27 +- ...nowflakeStreamingIngestClientInternal.java | 3 +- .../net/snowflake/ingest/utils/Constants.java | 3 +- .../net/snowflake/ingest/utils/ErrorCode.java | 2 +- .../ingest/ingest_error_messages.properties | 2 +- .../internal/ExternalVolumeManagerTest.java | 122 ++++++ .../internal/MockSnowflakeServiceClient.java | 28 +- .../internal/SnowflakeServiceClientTest.java | 14 +- .../SnowflakeStreamingIngestClientTest.java | 5 +- 19 files changed, 740 insertions(+), 159 deletions(-) delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsRequest.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsResponse.java create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManagerTest.java diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java index 034b4a6f0..822c969a1 100644 --- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java +++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java @@ -43,7 +43,7 @@ public enum ApiName { STREAMING_CHANNEL_STATUS("POST"), STREAMING_REGISTER_BLOB("POST"), STREAMING_CLIENT_CONFIGURE("POST"), - STREAMING_CHANNEL_CONFIGURE("POST"); + GENERATE_PRESIGNED_URLS("POST"); private final String httpMethod; private ApiName(String httpMethod) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java deleted file mode 100644 index f6ea570ec..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import com.fasterxml.jackson.annotation.JsonProperty; - -/** Class used to serialize the channel configure request. */ -class ChannelConfigureRequest extends ClientConfigureRequest { - @JsonProperty("database") - private String database; - - @JsonProperty("schema") - private String schema; - - @JsonProperty("table") - private String table; - - /** - * Constructor for channel configure request - * - * @param role Role to be used for the request. - * @param database Database name. - * @param schema Schema name. - * @param table Table name. - */ - ChannelConfigureRequest(String role, String database, String schema, String table) { - super(role); - this.database = database; - this.schema = schema; - this.table = table; - } - - String getDatabase() { - return database; - } - - String getSchema() { - return schema; - } - - String getTable() { - return table; - } - - @Override - public String getStringForLogging() { - return String.format( - "ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)", - getRole(), database, schema, table, getFileName()); - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java deleted file mode 100644 index da65960b4..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** Class used to deserialize responses from channel configure endpoint */ -@JsonIgnoreProperties(ignoreUnknown = true) -class ChannelConfigureResponse extends StreamingIngestResponse { - @JsonProperty("status_code") - private Long statusCode; - - @JsonProperty("message") - private String message; - - @JsonProperty("stage_location") - private FileLocationInfo stageLocation; - - @Override - Long getStatusCode() { - return statusCode; - } - - void setStatusCode(Long statusCode) { - this.statusCode = statusCode; - } - - String getMessage() { - return message; - } - - void setMessage(String message) { - this.message = message; - } - - FileLocationInfo getStageLocation() { - return stageLocation; - } - - void setStageLocation(FileLocationInfo stageLocation) { - this.stageLocation = stageLocation; - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java index c0cb218ac..059cc58fd 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java @@ -219,13 +219,13 @@ Long getLastInsertTimeInMs() { // Snowflake service had a bug that did not allow the client to add new json fields in some // contracts; thus these new fields have a NON_DEFAULT attribute. @JsonProperty("major_vers") - @JsonInclude(JsonInclude.Include.NON_DEFAULT) + @JsonInclude(JsonInclude.Include.NON_NULL) Integer getMajorVersion() { return this.parquetMajorVersion; } @JsonProperty("minor_vers") - @JsonInclude(JsonInclude.Include.NON_DEFAULT) + @JsonInclude(JsonInclude.Include.NON_NULL) Integer getMinorVersion() { return this.parquetMinorVersion; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java index 1231247b5..0f19922fe 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java @@ -160,7 +160,7 @@ public String toString() { map.put("byte_length", this.byteLength); map.put("length", this.length); map.put("nullable", this.nullable); - map.put("source_iceberg_datatype", this.sourceIcebergDataType); + map.put("source_iceberg_data_type", this.sourceIcebergDataType); return map.toString(); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java index 0f1c1a934..6ae71bcf7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java @@ -1,9 +1,409 @@ package net.snowflake.ingest.streaming.internal; +import static net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse.PresignedUrlInfo; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import net.snowflake.client.core.ExecTimeTelemetryData; +import net.snowflake.client.core.HttpClientSettingsKey; +import net.snowflake.client.core.OCSPMode; +import net.snowflake.client.jdbc.RestRequest; +import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.client.jdbc.SnowflakeUtil; +import net.snowflake.client.jdbc.cloud.storage.SnowflakeStorageClient; +import net.snowflake.client.jdbc.cloud.storage.StageInfo; +import net.snowflake.client.jdbc.cloud.storage.StorageClientFactory; +import net.snowflake.client.jdbc.internal.apache.http.client.HttpResponseException; +import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; +import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPut; +import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder; +import net.snowflake.client.jdbc.internal.apache.http.entity.ByteArrayEntity; +import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; +import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils; +import net.snowflake.client.jdbc.internal.google.api.client.http.HttpStatusCodes; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.HttpUtil; +import net.snowflake.ingest.utils.Logging; +import net.snowflake.ingest.utils.SFException; + /** Handles uploading files to the Iceberg Table's external volume's table data path */ class ExternalVolume implements IStorage { + // TODO everywhere: static final should be named in all capitals + private static final Logging logger = new Logging(ExternalVolume.class); + private static final int DEFAULT_PRESIGNED_URL_COUNT = 10; + private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900; + + // Allowing concurrent generate URL requests is a weak form of adapting to high throughput + // clients. + // The low watermark ideally should be adaptive too for such clients,will wait for perf runs to + // show its necessary. + private static final int MAX_CONCURRENT_GENERATE_URLS_REQUESTS = 10; + private static final int LOW_WATERMARK_FOR_EARLY_REFRESH = 5; + + private final String clientName; + private final String clientPrefix; + private final Long deploymentId; + private final String role; + + // The db name, schema name and table name for this storage location + private final TableRef tableRef; + + // The RPC client for snowflake cloud service + private final SnowflakeServiceClient serviceClient; + + // semaphore to limit how many RPCs go out for one location concurrently + private final Semaphore generateUrlsSemaphore; + + // thread-safe queue of unused URLs, to be disbursed whenever flush codepath is cutting the next + // file + private final ConcurrentLinkedQueue presignedUrlInfos; + + // sometimes-stale counter of how many URLs are remaining, to avoid calling presignedUrls.size() + // and increasing + // lock contention / volatile reads on the internal data structures inside ConcurrentLinkedQueue + private final AtomicInteger numUrlsInQueue; + + private final FileLocationInfo locationInfo; + private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata; + + ExternalVolume( + String clientName, + String clientPrefix, + Long deploymentId, + String role, + TableRef tableRef, + FileLocationInfo locationInfo, + SnowflakeServiceClient serviceClient) { + this.clientName = clientName; + this.clientPrefix = clientPrefix; + this.deploymentId = deploymentId; + this.role = role; + this.tableRef = tableRef; + this.serviceClient = serviceClient; + this.locationInfo = locationInfo; + this.presignedUrlInfos = new ConcurrentLinkedQueue<>(); + this.numUrlsInQueue = new AtomicInteger(0); + this.generateUrlsSemaphore = new Semaphore(MAX_CONCURRENT_GENERATE_URLS_REQUESTS); + + if (this.locationInfo.getIsClientSideEncrypted()) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Cannot ingest into an external volume that requests client side encryption"); + } + if ("S3".equalsIgnoreCase(this.locationInfo.getLocationType())) { + // add dummy values so that JDBC's S3 client creation doesn't barf on initialization. + this.locationInfo.getCredentials().put("AWS_KEY_ID", "key"); + this.locationInfo.getCredentials().put("AWS_SECRET_KEY", "secret"); + } + + try { + this.fileTransferMetadata = + InternalStage.createFileTransferMetadataWithAge(this.locationInfo); + } catch (JsonProcessingException + | SnowflakeSQLException + | net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException e) { + throw new SFException(e, ErrorCode.INTERNAL_ERROR); + } + + generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH); + } + + // TODO : Add timing ; add logging ; add retries ; add http exception handling better than + // client.handleEx? @Override public void put(BlobPath blobPath, byte[] blob) { - throw new RuntimeException("not implemented"); + if (this.fileTransferMetadata.isLocalFS) { + InternalStage.putLocal(this.fileTransferMetadata.localLocation, blobPath.fileName, blob); + return; + } + + try { + putRemote(blobPath.blobPath, blob); + } catch (Throwable e) { + throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE); + } + } + + private void putRemote(String blobPath, byte[] blob) + throws SnowflakeSQLException, URISyntaxException, IOException { + // TODO: Add a backlog item for somehow doing multipart upload with presigned URLs (each part + // has its own URL) + // for large files + + // already verified that client side encryption is disabled, in the ctor's call to generateUrls + final Properties proxyProperties = HttpUtil.generateProxyPropertiesForJDBC(); + final HttpClientSettingsKey key = + SnowflakeUtil.convertProxyPropertiesToHttpClientKey(OCSPMode.FAIL_OPEN, proxyProperties); + + StageInfo stageInfo = fileTransferMetadata.fileTransferMetadata.getStageInfo(); + SnowflakeStorageClient client = + StorageClientFactory.getFactory().createClient(stageInfo, 1, null, null); + + URIBuilder uriBuilder = new URIBuilder(blobPath); + HttpPut httpRequest = new HttpPut(uriBuilder.build()); + httpRequest.setEntity(new ByteArrayEntity(blob)); + + addHeadersToHttpRequest(httpRequest, blob, stageInfo, client); + + if (stageInfo.getStageType().equals(StageInfo.StageType.AZURE)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, "Azure based external volumes are not yet supported."); + + /* commenting out unverified code, will fix this when server changes are in preprod / some test deplo + URI storageEndpoint = + new URI( + "https", + stageInfo.getStorageAccount() + "." + stageInfo.getEndPoint() + "/", + null, + null); + String sasToken = blobPath.substring(blobPath.indexOf("?")); + StorageCredentials azCreds = new StorageCredentialsSharedAccessSignature(sasToken); + CloudBlobClient azClient = new CloudBlobClient(storageEndpoint, azCreds); + + CloudBlobContainer container = azClient.getContainerReference(stageInfo.getLocation().substring(0, stageInfo.getLocation().indexOf("/"))); + CloudBlockBlob azBlob = container.getBlockBlobReference(); + azBlob.setMetadata((HashMap) meta.getUserMetadata()); + + OperationContext opContext = new OperationContext(); + net.snowflake.client.core.HttpUtil.setSessionlessProxyForAzure(proxyProperties, opContext); + + BlobRequestOptions options = new BlobRequestOptions(); + + try { + azBlob.uploadFromByteArray(blob, 0, blob.length, null, options, opContext); + } catch (Exception ex) { + ((SnowflakeAzureClient) client).handleStorageException(ex, 0, "upload", null, null, null); + } + */ + } + + CloseableHttpClient httpClient = net.snowflake.client.core.HttpUtil.getHttpClient(key); + CloseableHttpResponse response; + try { + response = + RestRequest.execute( + httpClient, + httpRequest, + 0, // retry timeout + 0, // auth timeout + (int) + net.snowflake.client.core.HttpUtil.getSocketTimeout() + .toMillis(), // socket timeout in ms + 1, // max retries + 0, // no socket timeout injection + null, // no canceling signaler, TODO: wire up thread interrupt with setting this + // AtomicBoolean to avoid retries/sleeps + false, // no cookie + false, // no url retry query parameters + false, // no request_guid + true, // retry on HTTP 403 + true, // no retry + new ExecTimeTelemetryData()); + } catch (Throwable e) { + throw e; + } + + int statusCode = response.getStatusLine().getStatusCode(); + if (!HttpStatusCodes.isSuccess(statusCode)) { + Exception ex = + new HttpResponseException( + response.getStatusLine().getStatusCode(), + String.format( + "%s, body: %s", + response.getStatusLine().getReasonPhrase(), + EntityUtils.toString(response.getEntity()))); + + client.handleStorageException(ex, 0, "upload", null, null, null); + } + } + + private void addHeadersToHttpRequest( + HttpPut httpRequest, byte[] blob, StageInfo stageInfo, SnowflakeStorageClient client) { + // no need to set this as it causes a Content-length header is already set error in apache's + // http client. + // httpRequest.setHeader("Content-Length", "" + blob.length); + + // TODO: These custom headers need to be a part of the presigned URL HMAC computation in S3, + // we'll disable them for now until we can do presigned URL generation AFTER we have the digest. + + /* + final String clientKey = this.clientPrefix; + final String clientName = this.clientName; + + final byte[] digestBytes; + try { + digestBytes = MessageDigest.getInstance("SHA-256").digest(blob); + } catch (NoSuchAlgorithmException e) { + throw new SFException(e, ErrorCode.INTERNAL_ERROR); + } + + final String digest = Base64.getEncoder().encodeToString(digestBytes); + + StorageObjectMetadata meta = StorageClientFactory.getFactory().createStorageMetadataObj(stageInfo.getStageType()); + + client.addDigestMetadata(meta, digest); + client.addStreamingIngestMetadata(meta, clientName, clientKey); + + switch (stageInfo.getStageType()) { + case S3: + httpRequest.setHeader("x-amz-server-side-encryption", ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + httpRequest.setHeader("x-amz-checksum-sha256", digest); // TODO why does JDBC use a custom x–amz-meta-sfc-digest header for this + for (Map.Entry entry : meta.getUserMetadata().entrySet()) { + httpRequest.setHeader("x-amz-meta-" + entry.getKey(), entry.getValue()); + } + + break; + + case AZURE: + for (Map.Entry entry : meta.getUserMetadata().entrySet()) { + httpRequest.setHeader("x-ms-meta-" + entry.getKey(), entry.getValue()); + } + break; + + case GCS: + for (Map.Entry entry : meta.getUserMetadata().entrySet()) { + httpRequest.setHeader("x-goog-meta-" + entry.getKey(), entry.getValue()); + } + break; + } + */ + } + + PresignedUrlInfo dequeueUrlInfo() { + PresignedUrlInfo info = this.presignedUrlInfos.poll(); + boolean generate = false; + if (info == null) { + generate = true; + } else { + // Since the queue had a non-null entry, there is no way numUrlsInQueue is <=0 + int remainingUrlsInQueue = this.numUrlsInQueue.decrementAndGet(); + if (remainingUrlsInQueue <= LOW_WATERMARK_FOR_EARLY_REFRESH) { + generate = true; + // assert remaininUrlsInQueue >= 0 + } + } + if (generate) { + // TODO: do this generation on a background thread to allow the current thread to make + // progress ? + // Will wait for perf runs to know this is an issue that needs addressal. + generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH); + } + return info; + } + + // NOTE : We are intentionally NOT re-enqueuing unused URLs here as that can cause correctness + // issues by + // accidentally enqueuing a URL that was actually used to write data out. Its okay to allow an + // unused URL to go waste + // as we'll just go out and generate new URLs. Do NOT add an enqueueUrl() method for this reason. + + private void generateUrls(int minCountToSkipGeneration) { + int numAcquireAttempts = 0; + boolean acquired = false; + + while (!acquired && numAcquireAttempts++ < 300) { + // Use an aggressive timeout value as its possible that the other requests finished and added + // enough + // URLs to the queue. If we use a higher timeout value, this calling thread's flush is going + // to + // unnecessarily be blocked when URLs have already been added to the queue. + try { + acquired = this.generateUrlsSemaphore.tryAcquire(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // if the thread was interrupted there's nothing we can do about it, definitely shouldn't + // continue processing. + String message = + String.format( + "Semaphore acquisition in ExternalVolume.generateUrls was interrupted, " + + "likely because the process is shutting down. TableRef=%s", + tableRef); + logger.logError(message); + throw new SFException(ErrorCode.INTERNAL_ERROR, message); + } + + // In case Acquire took time because no permits were available, it implies we already had N + // other threads + // fetching more URLs. In that case we should be content with what's in the buffer instead of + // doing another RPC + // potentially unnecessarily. + if (this.numUrlsInQueue.get() >= minCountToSkipGeneration) { + // release the semaphore before early-exiting to avoid a leak in semaphore permits. + if (acquired) { + this.generateUrlsSemaphore.release(); + } + + return; + } + } + + // if we're here without acquiring, that implies the numAcquireAttempts went over 300. We're at + // an impasse + // and so there's nothing more to be done except error out, as that gives the client a chance to + // restart. + if (!acquired) { + String message = + String.format("Could not acquire semaphore to generate URLs. TableRef=%s", tableRef); + logger.logError(message); + throw new SFException(ErrorCode.INTERNAL_ERROR, message); + } + + // we have acquired a semaphore permit at this point, must release before returning + + try { + GeneratePresignedUrlsResponse response = doGenerateUrls(); + List urlInfos = response.getPresignedUrlInfos(); + urlInfos = + urlInfos.stream() + .filter( + info -> { + if (info == null + || info.url == null + || info.fileName == null + || info.url.isEmpty()) { + logger.logError( + "Received unexpected null or empty URL in externalVolume.generateUrls" + + " tableRef=%s", + this.tableRef); + return false; + } + + return true; + }) + .collect(Collectors.toList()); + + // these are both thread-safe operations individually, and there is no need to do them inside + // a lock. + // For an infinitesimal time the numUrlsInQueue will under represent the number of entries in + // the queue. + this.presignedUrlInfos.addAll(urlInfos); + this.numUrlsInQueue.addAndGet(urlInfos.size()); + } finally { + this.generateUrlsSemaphore.release(); + } + } + + private GeneratePresignedUrlsResponse doGenerateUrls() { + try { + return this.serviceClient.generatePresignedUrls( + new GeneratePresignedUrlsRequest( + tableRef, + role, + DEFAULT_PRESIGNED_URL_COUNT, + DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS, + deploymentId, + true)); + + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.GENERATE_PRESIGNED_URLS_FAILURE, e.getMessage()); + } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java index 3c6bf3f9d..556d02b9b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java @@ -4,6 +4,8 @@ package net.snowflake.ingest.streaming.internal; +import static net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse.PresignedUrlInfo; + import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -16,7 +18,6 @@ class ExternalVolumeManager implements IStorageManager { // TODO: Rename all logger members to LOGGER and checkin code formatting rules private static final Logging logger = new Logging(ExternalVolumeManager.class); - // Reference to the external volume per table private final Map externalVolumeMap; @@ -31,6 +32,9 @@ class ExternalVolumeManager implements IStorageManager { // Client prefix generated by the Snowflake server private final String clientPrefix; + // Deployment ID returned by the Snowflake server + private final Long deploymentId; + // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if // openChannel is called // multiple times concurrently) @@ -54,16 +58,13 @@ class ExternalVolumeManager implements IStorageManager { this.serviceClient = snowflakeServiceClient; this.externalVolumeMap = new ConcurrentHashMap<>(); try { - this.clientPrefix = - isTestMode - ? "testPrefix" - : this.serviceClient - .clientConfigure(new ClientConfigureRequest(role)) - .getClientPrefix(); + ClientConfigureResponse response = + this.serviceClient.clientConfigure(new ClientConfigureRequest(role)); + this.clientPrefix = isTestMode ? "testPrefix" : response.getClientPrefix(); + this.deploymentId = response.getDeploymentId(); } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } - logger.logDebug( "Created ExternalVolumeManager with clientName=%s and clientPrefix=%s", clientName, clientPrefix); @@ -76,7 +77,7 @@ class ExternalVolumeManager implements IStorageManager { * @return target storage */ @Override - public IStorage getStorage(String fullyQualifiedTableName) { + public ExternalVolume getStorage(String fullyQualifiedTableName) { // Only one chunk per blob in Iceberg mode. return getVolumeSafe(fullyQualifiedTableName); } @@ -103,7 +104,15 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { } try { - ExternalVolume externalVolume = new ExternalVolume(); + ExternalVolume externalVolume = + new ExternalVolume( + clientName, + getClientPrefix(), + deploymentId, + role, + tableRef, + locationInfo, + serviceClient); this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume); } catch (SFException ex) { logger.logError( @@ -113,7 +122,6 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { } catch (Exception err) { logger.logError( "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); - throw new SFException( err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, @@ -124,7 +132,9 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { @Override public BlobPath generateBlobPath(String fullyQualifiedTableName) { - throw new RuntimeException("not implemented"); + ExternalVolume volume = getVolumeSafe(fullyQualifiedTableName); + PresignedUrlInfo urlInfo = volume.dequeueUrlInfo(); + return BlobPath.presignedUrlWithToken(urlInfo.fileName, urlInfo.url); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsRequest.java new file mode 100644 index 000000000..05a085ed6 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsRequest.java @@ -0,0 +1,93 @@ +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; + +class GeneratePresignedUrlsRequest implements IStreamingIngestRequest { + @JsonProperty("database") + private String dbName; + + @JsonProperty("schema") + private String schemaName; + + @JsonProperty("table") + private String tableName; + + @JsonProperty("role") + private String role; + + @JsonProperty("count") + private Integer count; + + @JsonProperty("timeout_in_seconds") + private Integer timeoutInSeconds; + + @JsonProperty("deployment_global_id") + private Long deploymentGlobalId; + + @JsonProperty("is_iceberg") + private boolean isIceberg; + + public GeneratePresignedUrlsRequest( + TableRef tableRef, + String role, + int count, + int timeoutInSeconds, + Long deploymentGlobalId, + boolean isIceberg) { + this.dbName = tableRef.dbName; + this.schemaName = tableRef.schemaName; + this.tableName = tableRef.tableName; + this.count = count; + this.role = role; + this.timeoutInSeconds = timeoutInSeconds; + this.deploymentGlobalId = deploymentGlobalId; + this.isIceberg = isIceberg; + } + + String getDBName() { + return this.dbName; + } + + String getSchemaName() { + return this.schemaName; + } + + String getTableName() { + return this.tableName; + } + + String getRole() { + return this.role; + } + + Integer getCount() { + return this.count; + } + + Long getDeploymentGlobalId() { + return this.deploymentGlobalId; + } + + Integer getTimeoutInSeconds() { + return this.timeoutInSeconds; + } + + boolean getIsIceberg() { + return this.isIceberg; + } + + @Override + public String getStringForLogging() { + return String.format( + "GetPresignedUrlsRequest(db=%s, schema=%s, table=%s, count=%s, timeoutInSeconds=%s" + + " deploymentGlobalId=%s role=%s, isIceberg=%s)", + dbName, + schemaName, + tableName, + count, + timeoutInSeconds, + deploymentGlobalId, + role, + isIceberg); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsResponse.java new file mode 100644 index 000000000..32bf24104 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/GeneratePresignedUrlsResponse.java @@ -0,0 +1,47 @@ +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +@JsonIgnoreProperties(ignoreUnknown = true) +class GeneratePresignedUrlsResponse extends StreamingIngestResponse { + @JsonIgnoreProperties(ignoreUnknown = true) + public static class PresignedUrlInfo { + @JsonProperty("file_name") + public String fileName; + + @JsonProperty("url") + public String url; + + // default constructor for jackson deserialization + public PresignedUrlInfo() {} + + public PresignedUrlInfo(String fileName, String url) { + this.fileName = fileName; + this.url = url; + } + } + + @JsonProperty("status_code") + private Long statusCode; + + @JsonProperty("message") + private String message; + + @JsonProperty("presigned_url_infos") + private List presignedUrlInfos; + + @Override + Long getStatusCode() { + return this.statusCode; + } + + String getMessage() { + return this.message; + } + + List getPresignedUrlInfos() { + return this.presignedUrlInfos; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java index 89bf3d70d..92f7ea8c5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java @@ -21,7 +21,7 @@ class OpenChannelResponse extends StreamingIngestResponse { private List tableColumns; private String encryptionKey; private Long encryptionKeyId; - private FileLocationInfo externalVolumeLocation; + private FileLocationInfo icebergLocationInfo; @JsonProperty("status_code") void setStatusCode(Long statusCode) { @@ -133,11 +133,11 @@ Long getEncryptionKeyId() { } @JsonProperty("iceberg_location") - void setExternalVolumeLocation(FileLocationInfo externalVolumeLocation) { - this.externalVolumeLocation = externalVolumeLocation; + void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) { + this.icebergLocationInfo = icebergLocationInfo; } - FileLocationInfo getExternalVolumeLocation() { - return this.externalVolumeLocation; + FileLocationInfo getIcebergLocationInfo() { + return this.icebergLocationInfo; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java index a511407ea..7d4b52ef9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java @@ -4,17 +4,17 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.GENERATE_PRESIGNED_URLS; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB; import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; -import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.GENERATE_PRESIGNED_URLS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; @@ -76,28 +76,23 @@ ClientConfigureResponse clientConfigure(ClientConfigureRequest request) return response; } - /** - * Configures a channel's storage info given a {@link ChannelConfigureRequest}. - * - * @param request the channel configuration request - * @return the response from the configuration request - */ - ChannelConfigureResponse channelConfigure(ChannelConfigureRequest request) + /** Generates a batch of presigned URLs for a table */ + GeneratePresignedUrlsResponse generatePresignedUrls(GeneratePresignedUrlsRequest request) throws IngestResponseException, IOException { - ChannelConfigureResponse response = + GeneratePresignedUrlsResponse response = executeApiRequestWithRetries( - ChannelConfigureResponse.class, + GeneratePresignedUrlsResponse.class, request, - CHANNEL_CONFIGURE_ENDPOINT, - "channel configure", - STREAMING_CHANNEL_CONFIGURE); + GENERATE_PRESIGNED_URLS_ENDPOINT, + "generate presigned urls", + GENERATE_PRESIGNED_URLS); if (response.getStatusCode() != RESPONSE_SUCCESS) { logger.logDebug( - "Channel configure request failed, request={}, response={}", + "GeneratePresignedUrls request failed, request={}, response={}", request.getStringForLogging(), response.getMessage()); - throw new SFException(ErrorCode.CHANNEL_CONFIGURE_FAILURE, response.getMessage()); + throw new SFException(ErrorCode.GENERATE_PRESIGNED_URLS_FAILURE, response.getMessage()); } return response; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index b15053fae..5e122587b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -393,9 +393,10 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest // Add channel to the channel cache this.channelCache.addChannel(channel); + this.storageManager.registerTable( new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), - response.getExternalVolumeLocation()); + response.getIcebergLocationInfo()); return channel; } diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 4504c1c01..369a6546a 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -53,7 +53,8 @@ public class Constants { public static final String BLOB_EXTENSION_TYPE = "bdec"; public static final int MAX_THREAD_COUNT = Integer.MAX_VALUE; public static final String CLIENT_CONFIGURE_ENDPOINT = "/v1/streaming/client/configure/"; - public static final String CHANNEL_CONFIGURE_ENDPOINT = "/v1/streaming/channels/configure/"; + public static final String GENERATE_PRESIGNED_URLS_ENDPOINT = + "/v1/streaming/presignedurls/generate/"; public static final int COMMIT_MAX_RETRY_COUNT = 60; public static final int COMMIT_RETRY_INTERVAL_IN_MS = 1000; public static final String ENCRYPTION_ALGORITHM = "AES/CTR/NoPadding"; diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index 478189016..dc7d1c631 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -43,7 +43,7 @@ public enum ErrorCode { CRYPTO_PROVIDER_ERROR("0035"), DROP_CHANNEL_FAILURE("0036"), CLIENT_DEPLOYMENT_ID_MISMATCH("0037"), - CHANNEL_CONFIGURE_FAILURE("0038"); + GENERATE_PRESIGNED_URLS_FAILURE("0038"); public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages"; diff --git a/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties b/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties index 7b4bc08ee..193ae0105 100644 --- a/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties +++ b/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties @@ -44,4 +44,4 @@ 0035=Failed to load {0}. If you use FIPS, import BouncyCastleFipsProvider in the application: {1} 0036=Failed to drop channel: {0} 0037=Deployment ID mismatch, Client was created on: {0}, Got upload location for: {1}. Please restart client: {2}. -0038=Channel configure request failed: {0}. \ No newline at end of file +0038=Generate presigned URLs request failed: {0}. \ No newline at end of file diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManagerTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManagerTest.java new file mode 100644 index 000000000..4ebec800f --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManagerTest.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static org.junit.Assert.*; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import net.snowflake.ingest.utils.SFException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ExternalVolumeManagerTest { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private ExternalVolumeManager manager; + private FileLocationInfo fileLocationInfo; + + @Before + public void setup() throws JsonProcessingException { + this.manager = + new ExternalVolumeManager( + false /* isTestMode */, "role", "clientName", MockSnowflakeServiceClient.create()); + + Map fileLocationInfoMap = MockSnowflakeServiceClient.getStageLocationMap(); + fileLocationInfoMap.put("isClientSideEncrypted", false); + String fileLocationInfoStr = objectMapper.writeValueAsString(fileLocationInfoMap); + this.fileLocationInfo = objectMapper.readValue(fileLocationInfoStr, FileLocationInfo.class); + } + + @After + public void teardown() {} + + @Test + public void testRegister() { + Exception ex = null; + try { + this.manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); + } catch (Exception e) { + ex = e; + } + + assertNull(ex); + } + + @Test + public void testConcurrentRegisterTable() throws Exception { + int numThreads = 50; + ExecutorService executorService = + new ThreadPoolExecutor( + 50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + List> tasks = new ArrayList<>(); + final CyclicBarrier startBarrier = new CyclicBarrier(numThreads); + final CyclicBarrier endBarrier = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + tasks.add( + () -> { + startBarrier.await(30, TimeUnit.SECONDS); + manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); + endBarrier.await(); + return manager.getStorage("db.schema.table"); + }); + } + + List> allResults = executorService.invokeAll(tasks); + allResults.get(0).get(30, TimeUnit.SECONDS); + + ExternalVolume extvol = manager.getStorage("db.schema.table"); + assertNotNull(extvol); + for (int i = 0; i < numThreads; i++) { + assertSame("" + i, extvol, allResults.get(i).get(30, TimeUnit.SECONDS)); + } + } + + @Test + public void testGetStorage() { + this.manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); + ExternalVolume extvol = this.manager.getStorage("db.schema.table"); + assertNotNull(extvol); + } + + @Test + public void testGetStorageWithoutRegister() { + SFException ex = null; + try { + manager.getStorage("db.schema.table"); + } catch (SFException e) { + ex = e; + } + + assertNotNull(ex); + assertTrue(ex.getVendorCode().equals("0001")); + assertTrue(ex.getMessage().contains("No external volume found for tableRef=db.schema.table")); + } + + @Test + public void testGenerateBlobPath() { + manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); + BlobPath blobPath = manager.generateBlobPath("db.schema.table"); + assertNotNull(blobPath); + assertTrue(blobPath.hasToken); + assertEquals(blobPath.fileName, "f1"); + assertEquals(blobPath.blobPath, "http://f1.com?token=t1"); + } + + @Test + public void testGetClientPrefix() { + assertEquals(manager.getClientPrefix(), "test_prefix_123"); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java b/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java index 5f8243299..0c2017d01 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java @@ -1,9 +1,9 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.GENERATE_PRESIGNED_URLS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; @@ -11,6 +11,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -106,8 +107,8 @@ public static CloseableHttpClient createHttpClient(ApiOverride apiOverride) { LOGGER.debug( request.toString() + IOUtils.toString( - ((HttpPost) request).getEntity().getContent(), - StandardCharsets.UTF_8)); + ((HttpPost) request).getEntity().getContent(), + StandardCharsets.UTF_8)); } String path = request.getURI().getPath(); @@ -129,14 +130,21 @@ public static CloseableHttpClient createHttpClient(ApiOverride apiOverride) { clientConfigresponseMap.put("deployment_id", 123L); return buildStreamingIngestResponse( HttpStatus.SC_OK, clientConfigresponseMap); - case CHANNEL_CONFIGURE_ENDPOINT: - Map channelConfigResponseMap = new HashMap<>(); - channelConfigResponseMap.put("status_code", 0L); - channelConfigResponseMap.put("message", "OK"); - channelConfigResponseMap.put("stage_location", getStageLocationMap()); + case GENERATE_PRESIGNED_URLS_ENDPOINT: + Map generateUrlsResponseMap = new HashMap<>(); + generateUrlsResponseMap.put("status_code", 0L); + generateUrlsResponseMap.put("message", "OK"); + generateUrlsResponseMap.put( + "presigned_url_infos", + Arrays.asList( + new GeneratePresignedUrlsResponse.PresignedUrlInfo( + "f1", "http://f1.com?token=t1"), + new GeneratePresignedUrlsResponse.PresignedUrlInfo( + "f2", "http://f2.com?token=t2"), + new GeneratePresignedUrlsResponse.PresignedUrlInfo( + "f3", "http://f3.com?token=t3"))); return buildStreamingIngestResponse( - HttpStatus.SC_OK, channelConfigResponseMap); - case OPEN_CHANNEL_ENDPOINT: + HttpStatus.SC_OK, generateUrlsResponseMap); case OPEN_CHANNEL_ENDPOINT: List> tableColumnsLists = new ArrayList<>(); Map tableColumnMap = new HashMap<>(); tableColumnMap.put("byteLength", 123L); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java index 8fa0399e4..253dee01c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java @@ -30,13 +30,13 @@ public void testClientConfigure() throws IngestResponseException, IOException { } @Test - public void testChannelConfigure() throws IngestResponseException, IOException { - ChannelConfigureRequest channelConfigureRequest = - new ChannelConfigureRequest("test_channel", "test_db", "test_schema", "test_table"); - ChannelConfigureResponse channelConfigureResponse = - snowflakeServiceClient.channelConfigure(channelConfigureRequest); - assert channelConfigureResponse.getStatusCode() == 0L; - assert channelConfigureResponse.getMessage().equals("OK"); + public void testGeneratePresignedUrls() throws IngestResponseException, IOException { + GeneratePresignedUrlsRequest request = + new GeneratePresignedUrlsRequest( + new TableRef("test_db", "test_schema", "test_table"), "role", 10, 600, 1031L, true); + GeneratePresignedUrlsResponse response = snowflakeServiceClient.generatePresignedUrls(request); + assert response.getStatusCode() == 0L; + assert response.getMessage().equals("OK"); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index a24ceb2a9..0dbeeebee 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -807,7 +807,10 @@ public void testRegisterBlobsRetries() throws Exception { client.getChannelCache().addChannel(channel3); client.getChannelCache().addChannel(channel4); client.registerBlobs(blobs); - Mockito.verify(requestBuilder, Mockito.times(MAX_STREAMING_INGEST_API_CHANNEL_RETRY + 1)) + Mockito.verify( + requestBuilder, + // isIcebergMode results in a clientconfigure call from ExtVol ctor, thus the extra +1 + Mockito.times(MAX_STREAMING_INGEST_API_CHANNEL_RETRY + 1 + (isIcebergMode ? 1 : 0))) .generateStreamingIngestPostRequest(Mockito.anyString(), Mockito.any(), Mockito.any()); Assert.assertFalse(channel1.isValid()); Assert.assertFalse(channel2.isValid());