diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java index 822c969a1..c9968bcf3 100644 --- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java +++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java @@ -43,7 +43,9 @@ public enum ApiName { STREAMING_CHANNEL_STATUS("POST"), STREAMING_REGISTER_BLOB("POST"), STREAMING_CLIENT_CONFIGURE("POST"), - GENERATE_PRESIGNED_URLS("POST"); + GENERATE_PRESIGNED_URLS("POST"), + REFRESH_TABLE_INFORMATION("POST"); + private final String httpMethod; private ApiName(String httpMethod) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java index 9a173365b..34c1c9102 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java @@ -4,6 +4,8 @@ package net.snowflake.ingest.streaming.internal; +import java.util.Optional; + /** * Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link * FlushService} @@ -24,7 +26,7 @@ interface IStorageManager { IStorage getStorage(String fullyQualifiedTableName); /** Informs the storage manager about a new table that's being ingested into by the client. */ - void registerTable(TableRef tableRef, FileLocationInfo locationInfo); + void registerTable(TableRef tableRef); /** * Generate a unique blob path and increment the blob sequencer @@ -42,4 +44,14 @@ interface IStorageManager { * @return the client prefix */ String getClientPrefix(); + + /** + * Get the updated subscoped tokens and location info for this table + * + * @param tableRef The table for which to get the location + * @param fileName Legacy, was used by deprecated GCS codepaths when it didn't support subscoped + * tokens. Not in use. + * @return + */ + FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional fileName); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java index adf7a0256..1dc65cd09 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java @@ -35,7 +35,7 @@ import net.snowflake.ingest.utils.Utils; /** Handles uploading files to the Snowflake Streaming Ingest Storage */ -class InternalStage implements IStorage { +class InternalStage implements IStorage { private static final ObjectMapper mapper = new ObjectMapper(); /** @@ -59,32 +59,45 @@ class InternalStage implements IStorage { private static final Logging logger = new Logging(InternalStage.class); - private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; - private final InternalStageManager owningManager; + private final IStorageManager owningManager; private final String clientName; - + private final String clientPrefix; + private final TableRef tableRef; private final int maxUploadRetries; // Proxy parameters that we set while calling the Snowflake JDBC to upload the streams private final Properties proxyProperties; + private FileLocationInfo fileLocationInfo; + private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; + /** * Default constructor * * @param owningManager the storage manager owning this storage * @param clientName The client name + * @param clientPrefix client prefix + * @param tableRef * @param fileLocationInfo The file location information from open channel response * @param maxUploadRetries The maximum number of retries to attempt */ InternalStage( - InternalStageManager owningManager, + IStorageManager owningManager, String clientName, + String clientPrefix, + TableRef tableRef, FileLocationInfo fileLocationInfo, int maxUploadRetries) throws SnowflakeSQLException, IOException { - this(owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries); - Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix()); - this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo); + this( + owningManager, + clientName, + clientPrefix, + tableRef, + (SnowflakeFileTransferMetadataWithAge) null, + maxUploadRetries); + Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix); + setFileLocationInfo(fileLocationInfo); } /** @@ -92,17 +105,23 @@ class InternalStage implements IStorage { * * @param owningManager the storage manager owning this storage * @param clientName the client name + * @param clientPrefix + * @param tableRef * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with * @param maxUploadRetries the maximum number of retries to attempt */ InternalStage( - InternalStageManager owningManager, + IStorageManager owningManager, String clientName, + String clientPrefix, + TableRef tableRef, SnowflakeFileTransferMetadataWithAge testMetadata, int maxUploadRetries) throws SnowflakeSQLException, IOException { this.owningManager = owningManager; this.clientName = clientName; + this.clientPrefix = clientPrefix; + this.tableRef = tableRef; this.maxUploadRetries = maxUploadRetries; this.proxyProperties = generateProxyPropertiesForJDBC(); this.fileTransferMetadataWithAge = testMetadata; @@ -153,7 +172,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) .setUploadStream(inStream) .setRequireCompress(false) .setOcspMode(OCSPMode.FAIL_OPEN) - .setStreamingIngestClientKey(this.owningManager.getClientPrefix()) + .setStreamingIngestClientKey(this.clientPrefix) .setStreamingIngestClientName(this.clientName) .setProxyProperties(this.proxyProperties) .setDestFileName(fullFilePath) @@ -194,8 +213,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) */ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boolean force) throws SnowflakeSQLException, IOException { - logger.logInfo("Refresh Snowflake metadata, client={} force={}", clientName, force); - if (!force && fileTransferMetadataWithAge != null && fileTransferMetadataWithAge.timestamp.isPresent() @@ -204,10 +221,19 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.empty()); - SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location); - this.fileTransferMetadataWithAge = metadata; - return metadata; + logger.logInfo( + "Refresh Snowflake metadata, client={} force={} tableRef={}", clientName, force, tableRef); + + FileLocationInfo location = + this.owningManager.getRefreshedLocation(this.tableRef, Optional.empty()); + setFileLocationInfo(location); + return this.fileTransferMetadataWithAge; + } + + private synchronized void setFileLocationInfo(FileLocationInfo fileLocationInfo) + throws SnowflakeSQLException, IOException { + this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo); + this.fileLocationInfo = fileLocationInfo; } static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( @@ -265,7 +291,8 @@ static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { - FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.of(fileName)); + FileLocationInfo location = + this.owningManager.getRefreshedLocation(this.tableRef, Optional.of(fileName)); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) @@ -336,4 +363,8 @@ static void putLocal(String stageLocation, String fullFilePath, byte[] data) { throw new SFException(ex, ErrorCode.BLOB_UPLOAD_FAILURE); } } + + FileLocationInfo getFileLocationInfo() { + return this.fileLocationInfo; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index aa770c92f..b1a5e7665 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -20,9 +20,10 @@ import net.snowflake.ingest.utils.Utils; /** Class to manage single Snowflake internal stage */ -class InternalStageManager implements IStorageManager { +class InternalStageManager implements IStorageManager { + public static final TableRef NO_TABLE_REF = new TableRef("$NO_DB$", "$NO_SCH$", "$NO_TABLE$"); /** Target stage for the client */ - private final InternalStage targetStage; + private final InternalStage targetStage; /** Increasing counter to generate a unique blob name per client */ private final AtomicLong counter; @@ -70,15 +71,22 @@ class InternalStageManager implements IStorageManager { this.clientPrefix = response.getClientPrefix(); this.deploymentId = response.getDeploymentId(); this.targetStage = - new InternalStage( - this, clientName, response.getStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES); + new InternalStage( + this, + clientName, + clientPrefix, + NO_TABLE_REF, + response.getStageLocation(), + DEFAULT_MAX_UPLOAD_RETRIES); } else { this.clientPrefix = null; this.deploymentId = null; this.targetStage = - new InternalStage( + new InternalStage( this, "testClient", + null /* clientPrefix */, + NO_TABLE_REF, (SnowflakeFileTransferMetadataWithAge) null, DEFAULT_MAX_UPLOAD_RETRIES); } @@ -98,7 +106,7 @@ class InternalStageManager implements IStorageManager { */ @Override @SuppressWarnings("unused") - public InternalStage getStorage(String fullyQualifiedTableName) { + public InternalStage getStorage(String fullyQualifiedTableName) { // There's always only one stage for the client in non-iceberg mode return targetStage; } @@ -108,7 +116,7 @@ public InternalStage getStorage(String fullyQualifiedTableName) { * nothing as there's no per-table state yet for FDN tables (that use internal stages). */ @Override - public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {} + public void registerTable(TableRef tableRef) {} /** * Gets the latest file location info (with a renewed short-lived access token) for the specified @@ -117,7 +125,15 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {} * @param fileName optional filename for single-file signed URL fetch from server * @return the new location information */ - FileLocationInfo getRefreshedLocation(Optional fileName) { + @Override + public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional fileName) { + if (!tableRef.equals(NO_TABLE_REF)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format( + "getRefreshedLocation received TableRef=%s and expected=%s", tableRef, NO_TABLE_REF)); + } + try { ClientConfigureRequest request = new ClientConfigureRequest(this.role); fileName.ifPresent(request::setFileName); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java deleted file mode 100644 index ed8e2891f..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java +++ /dev/null @@ -1,442 +0,0 @@ -package net.snowflake.ingest.streaming.internal; - -import static net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse.PresignedUrlInfo; - -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import net.snowflake.client.core.ExecTimeTelemetryData; -import net.snowflake.client.core.HttpClientSettingsKey; -import net.snowflake.client.core.OCSPMode; -import net.snowflake.client.jdbc.RestRequest; -import net.snowflake.client.jdbc.SnowflakeSQLException; -import net.snowflake.client.jdbc.SnowflakeUtil; -import net.snowflake.client.jdbc.cloud.storage.SnowflakeStorageClient; -import net.snowflake.client.jdbc.cloud.storage.StageInfo; -import net.snowflake.client.jdbc.cloud.storage.StorageClientFactory; -import net.snowflake.client.jdbc.internal.apache.http.client.HttpResponseException; -import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; -import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPut; -import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder; -import net.snowflake.client.jdbc.internal.apache.http.entity.ByteArrayEntity; -import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; -import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils; -import net.snowflake.client.jdbc.internal.google.api.client.http.HttpStatusCodes; -import net.snowflake.ingest.connection.IngestResponseException; -import net.snowflake.ingest.utils.ErrorCode; -import net.snowflake.ingest.utils.HttpUtil; -import net.snowflake.ingest.utils.Logging; -import net.snowflake.ingest.utils.SFException; - -/** Handles uploading files to the Iceberg Table's external volume's table data path */ -class PresignedUrlExternalVolume implements IStorage { - // TODO everywhere: static final should be named in all capitals - private static final Logging logger = new Logging(PresignedUrlExternalVolume.class); - private static final int DEFAULT_PRESIGNED_URL_COUNT = 10; - private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900; - - // Allowing concurrent generate URL requests is a weak form of adapting to high throughput - // clients. The low watermark ideally should be adaptive too for such clients,will wait for perf - // runs to show its necessary. - private static final int MAX_CONCURRENT_GENERATE_URLS_REQUESTS = 10; - private static final int LOW_WATERMARK_FOR_EARLY_REFRESH = 5; - - private final String clientName; - private final String clientPrefix; - private final Long deploymentId; - private final String role; - - // The db name, schema name and table name for this storage location - private final TableRef tableRef; - - // The RPC client for snowflake cloud service - private final SnowflakeServiceClient serviceClient; - - // semaphore to limit how many RPCs go out for one location concurrently - private final Semaphore generateUrlsSemaphore; - - // thread-safe queue of unused URLs, to be disbursed whenever flush codepath is cutting the next - // file - private final ConcurrentLinkedQueue presignedUrlInfos; - - // sometimes-stale counter of how many URLs are remaining, to avoid calling presignedUrls.size() - // and increasing lock contention / volatile reads on the internal data structures inside - // ConcurrentLinkedQueue - private final AtomicInteger numUrlsInQueue; - - private final FileLocationInfo locationInfo; - private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata; - - PresignedUrlExternalVolume( - String clientName, - String clientPrefix, - Long deploymentId, - String role, - TableRef tableRef, - FileLocationInfo locationInfo, - SnowflakeServiceClient serviceClient) { - this.clientName = clientName; - this.clientPrefix = clientPrefix; - this.deploymentId = deploymentId; - this.role = role; - this.tableRef = tableRef; - this.serviceClient = serviceClient; - this.locationInfo = locationInfo; - this.presignedUrlInfos = new ConcurrentLinkedQueue<>(); - this.numUrlsInQueue = new AtomicInteger(0); - this.generateUrlsSemaphore = new Semaphore(MAX_CONCURRENT_GENERATE_URLS_REQUESTS); - - if (this.locationInfo.getIsClientSideEncrypted()) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - "Cannot ingest into an external volume that requests client side encryption"); - } - if ("S3".equalsIgnoreCase(this.locationInfo.getLocationType())) { - // add dummy values so that JDBC's S3 client creation doesn't barf on initialization. - this.locationInfo.getCredentials().put("AWS_KEY_ID", "key"); - this.locationInfo.getCredentials().put("AWS_SECRET_KEY", "secret"); - } - - try { - this.fileTransferMetadata = - InternalStage.createFileTransferMetadataWithAge(this.locationInfo); - } catch (JsonProcessingException - | SnowflakeSQLException - | net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException e) { - throw new SFException(e, ErrorCode.INTERNAL_ERROR); - } - - // the caller is just setting up this object and not expecting a URL to be returned (unlike in - // dequeueUrlInfo), thus waitUntilAcquired=false. - generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH, false /* waitUntilAcquired */); - } - - // TODO : Add timing ; add logging ; add retries ; add http exception handling better than - // client.handleEx? - @Override - public void put(BlobPath blobPath, byte[] blob) { - if (this.fileTransferMetadata.isLocalFS) { - InternalStage.putLocal( - this.fileTransferMetadata.localLocation, blobPath.fileRegistrationPath, blob); - return; - } - - try { - putRemote(blobPath.uploadPath, blob); - } catch (Throwable e) { - throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE); - } - } - - private void putRemote(String blobPath, byte[] blob) - throws SnowflakeSQLException, URISyntaxException, IOException { - // TODO: Add a backlog item for somehow doing multipart upload with presigned URLs (each part - // has its own URL) for large files - - // already verified that client side encryption is disabled, in the ctor's call to generateUrls - final Properties proxyProperties = HttpUtil.generateProxyPropertiesForJDBC(); - final HttpClientSettingsKey key = - SnowflakeUtil.convertProxyPropertiesToHttpClientKey(OCSPMode.FAIL_OPEN, proxyProperties); - - StageInfo stageInfo = fileTransferMetadata.fileTransferMetadata.getStageInfo(); - SnowflakeStorageClient client = - StorageClientFactory.getFactory().createClient(stageInfo, 1, null, null); - - URIBuilder uriBuilder = new URIBuilder(blobPath); - HttpPut httpRequest = new HttpPut(uriBuilder.build()); - httpRequest.setEntity(new ByteArrayEntity(blob)); - - addHeadersToHttpRequest(httpRequest, blob, stageInfo, client); - - if (stageInfo.getStageType().equals(StageInfo.StageType.AZURE)) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, "Azure based external volumes are not yet supported."); - - /* commenting out unverified code, will fix this when server changes are in preprod / some test deplo - URI storageEndpoint = - new URI( - "https", - stageInfo.getStorageAccount() + "." + stageInfo.getEndPoint() + "/", - null, - null); - String sasToken = blobPath.substring(blobPath.indexOf("?")); - StorageCredentials azCreds = new StorageCredentialsSharedAccessSignature(sasToken); - CloudBlobClient azClient = new CloudBlobClient(storageEndpoint, azCreds); - - CloudBlobContainer container = azClient.getContainerReference(stageInfo.getLocation().substring(0, stageInfo.getLocation().indexOf("/"))); - CloudBlockBlob azBlob = container.getBlockBlobReference(); - azBlob.setMetadata((HashMap) meta.getUserMetadata()); - - OperationContext opContext = new OperationContext(); - net.snowflake.client.core.HttpUtil.setSessionlessProxyForAzure(proxyProperties, opContext); - - BlobRequestOptions options = new BlobRequestOptions(); - - try { - azBlob.uploadFromByteArray(blob, 0, blob.length, null, options, opContext); - } catch (Exception ex) { - ((SnowflakeAzureClient) client).handleStorageException(ex, 0, "upload", null, null, null); - } - */ - } - - CloseableHttpClient httpClient = net.snowflake.client.core.HttpUtil.getHttpClient(key); - CloseableHttpResponse response = - RestRequest.execute( - httpClient, - httpRequest, - 0, // retry timeout - 0, // auth timeout - (int) - net.snowflake.client.core.HttpUtil.getSocketTimeout() - .toMillis(), // socket timeout in ms - 1, // max retries - 0, // no socket timeout injection - null, // no canceling signaler, TODO: wire up thread interrupt with setting this - // AtomicBoolean to avoid retries/sleeps - false, // no cookie - false, // no url retry query parameters - false, // no request_guid - true, // retry on HTTP 403 - true, // no retry - new ExecTimeTelemetryData()); - - int statusCode = response.getStatusLine().getStatusCode(); - if (!HttpStatusCodes.isSuccess(statusCode)) { - Exception ex = - new HttpResponseException( - response.getStatusLine().getStatusCode(), - String.format( - "%s, body: %s", - response.getStatusLine().getReasonPhrase(), - EntityUtils.toString(response.getEntity()))); - - client.handleStorageException(ex, 0, "upload", null, null, null); - } - } - - private void addHeadersToHttpRequest( - HttpPut httpRequest, byte[] blob, StageInfo stageInfo, SnowflakeStorageClient client) { - // no need to set this as it causes a Content-length header is already set error in apache's - // http client. - // httpRequest.setHeader("Content-Length", "" + blob.length); - - // TODO: These custom headers need to be a part of the presigned URL HMAC computation in S3, - // we'll disable them for now until we can do presigned URL generation AFTER we have the digest. - - /* - final String clientKey = this.clientPrefix; - final String clientName = this.clientName; - - final byte[] digestBytes; - try { - digestBytes = MessageDigest.getInstance("SHA-256").digest(blob); - } catch (NoSuchAlgorithmException e) { - throw new SFException(e, ErrorCode.INTERNAL_ERROR); - } - - final String digest = Base64.getEncoder().encodeToString(digestBytes); - - StorageObjectMetadata meta = StorageClientFactory.getFactory().createStorageMetadataObj(stageInfo.getStageType()); - - client.addDigestMetadata(meta, digest); - client.addStreamingIngestMetadata(meta, clientName, clientKey); - - switch (stageInfo.getStageType()) { - case S3: - httpRequest.setHeader("x-amz-server-side-encryption", ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - httpRequest.setHeader("x-amz-checksum-sha256", digest); // TODO why does JDBC use a custom x–amz-meta-sfc-digest header for this - for (Map.Entry entry : meta.getUserMetadata().entrySet()) { - httpRequest.setHeader("x-amz-meta-" + entry.getKey(), entry.getValue()); - } - - break; - - case AZURE: - for (Map.Entry entry : meta.getUserMetadata().entrySet()) { - httpRequest.setHeader("x-ms-meta-" + entry.getKey(), entry.getValue()); - } - break; - - case GCS: - for (Map.Entry entry : meta.getUserMetadata().entrySet()) { - httpRequest.setHeader("x-goog-meta-" + entry.getKey(), entry.getValue()); - } - break; - } - */ - } - - PresignedUrlInfo dequeueUrlInfo() { - // use a 60 second buffer in case the executor service is backed up / serialization takes time / - // upload runs slow / etc. - long validUntilAtleastTimestamp = System.currentTimeMillis() + 60 * 1000; - - // TODO: Wire in a checkStop to get out of this infinite loop. - while (true) { - PresignedUrlInfo info = this.presignedUrlInfos.poll(); - if (info == null) { - // if the queue is empty, trigger a url refresh AND wait for it to complete. - // loop around when done to try and ready from the queue again. - generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH, true /* waitUntilAcquired */); - continue; - } - - // we dequeued a url, do the appropriate bookkeeping. - int remainingUrlsInQueue = this.numUrlsInQueue.decrementAndGet(); - - if (info.validUntilTimestamp < validUntilAtleastTimestamp) { - // This url can expire by the time it gets used, loop around and dequeue another URL. - continue; - } - - // if we're nearing url exhaustion, go fetch another batch. Don't wait for the response as we - // already have a valid URL to be used by the current caller of dequeueUrlInfo. - if (remainingUrlsInQueue <= LOW_WATERMARK_FOR_EARLY_REFRESH) { - // TODO: do this generation on a background thread to allow the current thread to make - // progress ? Will wait for perf runs to know this is an issue that needs addressal. - generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH, false /* waitUntilAcquired */); - } - - return info; - } - } - - // NOTE : We are intentionally NOT re-enqueuing unused URLs here as that can cause correctness - // issues by accidentally enqueuing a URL that was actually used to write data out. Its okay to - // allow an unused URL to go waste as we'll just go out and generate new URLs. - // Do NOT add an enqueueUrl() method for this reason. - - /** - * Fetches new presigned URLs from snowflake. - * - * @param minCountToSkipGeneration Skip the RPC if we already have this many URLs in the queue - * @param waitUntilAcquired when true, make the current thread block on having enough URLs in the - * queue - */ - private void generateUrls(int minCountToSkipGeneration, boolean waitUntilAcquired) { - int numAcquireAttempts = 0; - boolean acquired = false; - - while (!acquired && numAcquireAttempts++ < 300) { - // Use an aggressive timeout value as its possible that the other requests finished and added - // enough URLs to the queue. If we use a higher timeout value, this calling thread's flush is - // going to unnecessarily be blocked when URLs have already been added to the queue. - try { - // If waitUntilAcquired is false, the caller is not interested in waiting for the results - // The semaphore being already "full" implies there are many another requests in flight - // and we can just early exit to the caller. - int timeoutInSeconds = waitUntilAcquired ? 1 : 0; - acquired = this.generateUrlsSemaphore.tryAcquire(timeoutInSeconds, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // if the thread was interrupted there's nothing we can do about it, definitely shouldn't - // continue processing. - - // reset the interrupted flag on the thread in case someone in the callstack wants to - // gracefully continue processing. - boolean interrupted = Thread.interrupted(); - - String message = - String.format( - "Semaphore acquisition in ExternalVolume.generateUrls was interrupted, likely" - + " because the process is shutting down. TableRef=%s Thread.interrupted=%s", - tableRef, interrupted); - logger.logError(message); - throw new SFException(ErrorCode.INTERNAL_ERROR, message); - } - - // In case Acquire took time because no permits were available, it implies we already had N - // other threads fetching more URLs. In that case we should be content with what's in the - // buffer instead of doing another RPC potentially unnecessarily. - if (this.numUrlsInQueue.get() >= minCountToSkipGeneration) { - // release the semaphore before early-exiting to avoid a leak in semaphore permits. - if (acquired) { - this.generateUrlsSemaphore.release(); - } - - return; - } - - // If we couldn't acquire the semaphore, and the caller was doing an optimistic generateUrls - // but does NOT want to wait around for a successful generatePresignedUrlsResponse, then - // early exit and allow the caller to move on. - if (!acquired && !waitUntilAcquired) { - logger.logDebug( - "Skipping generateUrls because semaphore acquisition failed AND waitUntilAcquired ==" - + " false."); - return; - } - } - - // if we're here without acquiring, that implies the numAcquireAttempts went over 300. We're at - // an impasse and so there's nothing more to be done except error out, as that gives the client - // a chance to - // restart. - if (!acquired) { - String message = - String.format("Could not acquire semaphore to generate URLs. TableRef=%s", tableRef); - logger.logError(message); - throw new SFException(ErrorCode.INTERNAL_ERROR, message); - } - - // we have acquired a semaphore permit at this point, must release before returning - - try { - long currentTimestamp = System.currentTimeMillis(); - long validUntilTimestamp = - currentTimestamp + (DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS * 1000); - GeneratePresignedUrlsResponse response = - doGenerateUrls(DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS); - List urlInfos = response.getPresignedUrlInfos(); - urlInfos = - urlInfos.stream() - .map( - info -> { - info.validUntilTimestamp = validUntilTimestamp; - return info; - }) - .filter( - info -> { - if (info == null - || info.url == null - || info.fileName == null - || info.url.isEmpty()) { - logger.logError( - "Received unexpected null or empty URL in externalVolume.generateUrls" - + " tableRef=%s", - this.tableRef); - return false; - } - - return true; - }) - .collect(Collectors.toList()); - - // these are both thread-safe operations individually, and there is no need to do them inside - // a lock. For an infinitesimal time the numUrlsInQueue will under represent the number of - // entries in the queue. - this.presignedUrlInfos.addAll(urlInfos); - this.numUrlsInQueue.addAndGet(urlInfos.size()); - } finally { - this.generateUrlsSemaphore.release(); - } - } - - private GeneratePresignedUrlsResponse doGenerateUrls(int timeoutInSeconds) { - try { - return this.serviceClient.generatePresignedUrls( - new GeneratePresignedUrlsRequest( - tableRef, role, DEFAULT_PRESIGNED_URL_COUNT, timeoutInSeconds, deploymentId, true)); - - } catch (IngestResponseException | IOException e) { - throw new SFException(e, ErrorCode.GENERATE_PRESIGNED_URLS_FAILURE, e.getMessage()); - } - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java deleted file mode 100644 index 4147430bc..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import static net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse.PresignedUrlInfo; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import net.snowflake.ingest.connection.IngestResponseException; -import net.snowflake.ingest.utils.ErrorCode; -import net.snowflake.ingest.utils.Logging; -import net.snowflake.ingest.utils.SFException; - -/** Class to manage multiple external volumes */ -class PresignedUrlExternalVolumeManager implements IStorageManager { - // TODO: Rename all logger members to LOGGER and checkin code formatting rules - private static final Logging logger = new Logging(PresignedUrlExternalVolumeManager.class); - // Reference to the external volume per table - private final Map externalVolumeMap; - - // name of the owning client - private final String clientName; - - private final String role; - - // Reference to the Snowflake service client used for configure calls - private final SnowflakeServiceClient serviceClient; - - // Client prefix generated by the Snowflake server - private final String clientPrefix; - - // Deployment ID returned by the Snowflake server - private final Long deploymentId; - - // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if - // openChannel is called - // multiple times concurrently) - private final Object registerTableLock = new Object(); - - /** - * Constructor for ExternalVolumeManager - * - * @param isTestMode whether the manager in test mode - * @param role the role of the client - * @param clientName the name of the client - * @param snowflakeServiceClient the Snowflake service client used for configure calls - */ - PresignedUrlExternalVolumeManager( - boolean isTestMode, - String role, - String clientName, - SnowflakeServiceClient snowflakeServiceClient) { - this.clientName = clientName; - this.role = role; - this.serviceClient = snowflakeServiceClient; - this.externalVolumeMap = new ConcurrentHashMap<>(); - try { - ClientConfigureResponse response = - this.serviceClient.clientConfigure(new ClientConfigureRequest(role)); - this.clientPrefix = isTestMode ? "testPrefix" : response.getClientPrefix(); - this.deploymentId = response.getDeploymentId(); - } catch (IngestResponseException | IOException e) { - throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); - } - logger.logDebug( - "Created PresignedUrlExternalVolumeManager with clientName=%s and clientPrefix=%s", - clientName, clientPrefix); - } - - /** - * Given a fully qualified table name, return the target storage by looking up the table name - * - * @param fullyQualifiedTableName the target fully qualified table name - * @return target storage - */ - @Override - public PresignedUrlExternalVolume getStorage(String fullyQualifiedTableName) { - // Only one chunk per blob in Iceberg mode. - return getVolumeSafe(fullyQualifiedTableName); - } - - /** Informs the storage manager about a new table that's being ingested into by the client. */ - @Override - public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { - if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { - logger.logInfo( - "Skip registering table since its already been registered with the VolumeManager." - + " tableRef=%s", - tableRef); - return; - } - - // future enhancement - per table locks instead of per-client lock - synchronized (registerTableLock) { - if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { - logger.logInfo( - "Skip registering table since its already been registered with the VolumeManager." - + " tableRef=%s", - tableRef); - return; - } - - try { - PresignedUrlExternalVolume externalVolume = - new PresignedUrlExternalVolume( - clientName, - getClientPrefix(), - deploymentId, - role, - tableRef, - locationInfo, - serviceClient); - this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume); - } catch (SFException ex) { - logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); - // allow external volume ctor's SFExceptions to bubble up directly - throw ex; - } catch (Exception err) { - logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); - throw new SFException( - err, - ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, - String.format("fullyQualifiedTableName=%s", tableRef)); - } - } - } - - @Override - public BlobPath generateBlobPath(String fullyQualifiedTableName) { - PresignedUrlExternalVolume volume = getVolumeSafe(fullyQualifiedTableName); - PresignedUrlInfo urlInfo = volume.dequeueUrlInfo(); - return new BlobPath(urlInfo.url /* uploadPath */, urlInfo.fileName /* fileRegistrationPath */); - } - - /** - * Get the client prefix from first external volume in the map - * - * @return the client prefix - */ - @Override - public String getClientPrefix() { - return this.clientPrefix; - } - - private PresignedUrlExternalVolume getVolumeSafe(String fullyQualifiedTableName) { - PresignedUrlExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName); - - if (volume == null) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - String.format("No external volume found for tableRef=%s", fullyQualifiedTableName)); - } - - return volume; - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationRequest.java new file mode 100644 index 000000000..97e6e29f5 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationRequest.java @@ -0,0 +1,55 @@ +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RefreshTableInformationRequest implements IStreamingIngestRequest { + @JsonProperty("database") + private String dbName; + + @JsonProperty("schema") + private String schemaName; + + @JsonProperty("table") + private String tableName; + + @JsonProperty("role") + private String role; + + @JsonProperty("is_iceberg") + private boolean isIceberg; + + public RefreshTableInformationRequest(TableRef tableRef, String role, boolean isIceberg) { + this.dbName = tableRef.dbName; + this.schemaName = tableRef.schemaName; + this.tableName = tableRef.tableName; + this.role = role; + this.isIceberg = isIceberg; + } + + String getDBName() { + return this.dbName; + } + + String getSchemaName() { + return this.schemaName; + } + + String getTableName() { + return this.tableName; + } + + String getRole() { + return this.role; + } + + boolean getIsIceberg() { + return this.isIceberg; + } + + @Override + public String getStringForLogging() { + return String.format( + "RefreshTableInformation(db=%s, schema=%s, table=%s, role=%s, isIceberg=%s)", + dbName, schemaName, tableName, role, isIceberg); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationResponse.java new file mode 100644 index 000000000..d4a88a818 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationResponse.java @@ -0,0 +1,27 @@ +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RefreshTableInformationResponse extends StreamingIngestResponse { + @JsonProperty("status_code") + private Long statusCode; + + @JsonProperty("message") + private String message; + + @JsonProperty("iceberg_location") + private FileLocationInfo icebergLocationInfo; + + @Override + Long getStatusCode() { + return this.statusCode; + } + + String getMessage() { + return this.message; + } + + FileLocationInfo getIcebergLocationInfo() { + return this.icebergLocationInfo; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java index 7d4b52ef9..918c0a7f4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.GENERATE_PRESIGNED_URLS; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.REFRESH_TABLE_INFORMATION; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; @@ -16,6 +17,7 @@ import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.GENERATE_PRESIGNED_URLS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT; import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; @@ -97,6 +99,27 @@ GeneratePresignedUrlsResponse generatePresignedUrls(GeneratePresignedUrlsRequest return response; } + /** Fetches the latest sub-scoped tokens from the server for the requested table in the request */ + RefreshTableInformationResponse refreshTableInformation(RefreshTableInformationRequest request) + throws IngestResponseException, IOException { + RefreshTableInformationResponse response = + executeApiRequestWithRetries( + RefreshTableInformationResponse.class, + request, + REFRESH_TABLE_INFORMATION_ENDPOINT, + "refresh table information", + REFRESH_TABLE_INFORMATION); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "RefreshTableInformation request failed, request={}, response={}", + request.getStringForLogging(), + response.getMessage()); + throw new SFException(ErrorCode.REFRESH_TABLE_INFORMATION_FAILURE, response.getMessage()); + } + return response; + } + /** * Opens a channel given a {@link OpenChannelRequestInternal}. * diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index f553fb7ff..e9282c3b4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -238,9 +238,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.storageManager = isIcebergMode - ? new PresignedUrlExternalVolumeManager( - isTestMode, this.role, this.name, this.snowflakeServiceClient) - : new InternalStageManager( + ? new SubscopedTokenExternalVolumeManager( + this.role, this.name, this.snowflakeServiceClient) + : new InternalStageManager( isTestMode, this.role, this.name, this.snowflakeServiceClient); try { @@ -408,8 +408,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest this.channelCache.addChannel(channel); this.storageManager.registerTable( - new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), - response.getIcebergLocationInfo()); + new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName())); return channel; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java new file mode 100644 index 000000000..8f606b3dd --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.Logging; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; + +/** Class to manage multiple external volumes */ +class SubscopedTokenExternalVolumeManager implements IStorageManager { + private static final Logging logger = new Logging(SubscopedTokenExternalVolumeManager.class); + // Reference to the external volume per table + private final ConcurrentHashMap externalVolumeMap; + + /** Increasing counter to generate a unique blob name */ + private final AtomicLong counter; + + // name of the owning client + private final String clientName; + + private final String role; + + // Reference to the Snowflake service client used for configure calls + private final SnowflakeServiceClient serviceClient; + + // Client prefix generated by the Snowflake server + private final String clientPrefix; + + /** + * Constructor for ExternalVolumeManager + * + * @param role the role of the client + * @param clientName the name of the client + * @param snowflakeServiceClient the Snowflake service client used for configure calls + */ + SubscopedTokenExternalVolumeManager( + String role, String clientName, SnowflakeServiceClient snowflakeServiceClient) { + this.clientName = clientName; + this.role = role; + this.counter = new AtomicLong(0); + this.serviceClient = snowflakeServiceClient; + this.externalVolumeMap = new ConcurrentHashMap<>(); + try { + ClientConfigureResponse response = + this.serviceClient.clientConfigure(new ClientConfigureRequest(role)); + this.clientPrefix = response.getClientPrefix(); + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); + } + logger.logDebug( + "Created SubscopedTokenExternalVolumeManager with clientName=%s and clientPrefix=%s", + clientName, clientPrefix); + } + + /** + * Given a fully qualified table name, return the target storage by looking up the table name + * + * @param fullyQualifiedTableName the target fully qualified table name + * @return target storage + */ + @Override + public InternalStage getStorage(String fullyQualifiedTableName) { + // Only one chunk per blob in Iceberg mode. + return getVolumeSafe(fullyQualifiedTableName); + } + + /** Informs the storage manager about a new table that's being ingested into by the client. */ + @Override + public void registerTable(TableRef tableRef) { + this.externalVolumeMap.computeIfAbsent( + tableRef.fullyQualifiedName, fqn -> createStageForTable(tableRef)); + } + + private InternalStage createStageForTable(TableRef tableRef) { + // Get the locationInfo when we know this is the first register call for a given table. This is + // done to reduce the + // unnecessary overload on token generation if a client opens up a hundred channels at the same + // time. + FileLocationInfo locationInfo = getRefreshedLocation(tableRef, Optional.empty()); + + try { + return new InternalStage( + this, clientName, getClientPrefix(), tableRef, locationInfo, DEFAULT_MAX_UPLOAD_RETRIES); + } catch (SFException ex) { + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); + // allow external volume ctor's SFExceptions to bubble up directly + throw ex; + } catch (Exception err) { + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); + throw new SFException( + err, + ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, + String.format("fullyQualifiedTableName=%s", tableRef)); + } + } + + @Override + public BlobPath generateBlobPath(String fullyQualifiedTableName) { + InternalStage volume = getVolumeSafe(fullyQualifiedTableName); + + // {nullableTableBasePath}/data/streaming_ingest/{figsId}/snow_{volumeHash}_{figsId}_{workerRank}_1_ + return generateBlobPathFromLocationInfoPath( + fullyQualifiedTableName, + volume.getFileLocationInfo().getPath(), + Utils.getTwoHexChars(), + this.counter.getAndIncrement()); + } + + @VisibleForTesting + static BlobPath generateBlobPathFromLocationInfoPath( + String fullyQualifiedTableName, + String filePathRelativeToVolume, + String twoHexChars, + long counterValue) { + String[] parts = filePathRelativeToVolume.split("/"); + if (parts.length < 5) { + logger.logError( + "Invalid file path returned by server. Table=%s FilePathRelativeToVolume=%s", + fullyQualifiedTableName, filePathRelativeToVolume); + throw new SFException(ErrorCode.INTERNAL_ERROR, "File path returned by server is invalid"); + } + + // add twoHexChars as a prefix to the fileName (the last part of fileLocationInfo.getPath) + String fileNameRelativeToCredentialedPath = parts[parts.length - 1]; + fileNameRelativeToCredentialedPath = + String.join("/", twoHexChars, fileNameRelativeToCredentialedPath); + + // set this new fileName (with the prefix) back on the parts array so the full path can be + // reconstructed + parts[parts.length - 1] = fileNameRelativeToCredentialedPath; + filePathRelativeToVolume = String.join("/", parts); + + // add a monotonically increasing counter at the end and the file extension + String suffix = counterValue + ".parquet"; + + return new BlobPath( + fileNameRelativeToCredentialedPath + suffix /* uploadPath */, + filePathRelativeToVolume + suffix /* fileRegistrationPath */); + } + + /** + * Get the client prefix from first external volume in the map + * + * @return the client prefix + */ + @Override + public String getClientPrefix() { + return this.clientPrefix; + } + + @Override + public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional fileName) { + try { + RefreshTableInformationResponse response = + this.serviceClient.refreshTableInformation( + new RefreshTableInformationRequest(tableRef, this.role, true)); + logger.logDebug("Refreshed tokens for table=%s", tableRef); + if (response.getIcebergLocationInfo() == null) { + logger.logError( + "Did not receive location info, this will cause ingestion to grind to a halt." + + " TableRef=%s"); + } else { + Map creds = response.getIcebergLocationInfo().getCredentials(); + if (creds == null || creds.isEmpty()) { + logger.logError( + "Did not receive creds in location info, this will cause ingestion to grind to a" + + " halt. TableRef=%s"); + } + } + + return response.getIcebergLocationInfo(); + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.REFRESH_TABLE_INFORMATION_FAILURE, e.getMessage()); + } + } + + private InternalStage getVolumeSafe(String fullyQualifiedTableName) { + InternalStage volume = this.externalVolumeMap.get(fullyQualifiedTableName); + + if (volume == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("No external volume found for tableRef=%s", fullyQualifiedTableName)); + } + + return volume; + } +} diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index a5e04e21e..754c81cff 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -55,6 +55,8 @@ public class Constants { public static final String CLIENT_CONFIGURE_ENDPOINT = "/v1/streaming/client/configure/"; public static final String GENERATE_PRESIGNED_URLS_ENDPOINT = "/v1/streaming/presignedurls/generate/"; + public static final String REFRESH_TABLE_INFORMATION_ENDPOINT = + "/v1/streaming/tableinformation/refresh/"; public static final int COMMIT_MAX_RETRY_COUNT = 60; public static final int COMMIT_RETRY_INTERVAL_IN_MS = 1000; public static final String ENCRYPTION_ALGORITHM = "AES/CTR/NoPadding"; diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index dc7d1c631..0bb75b174 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -43,7 +43,8 @@ public enum ErrorCode { CRYPTO_PROVIDER_ERROR("0035"), DROP_CHANNEL_FAILURE("0036"), CLIENT_DEPLOYMENT_ID_MISMATCH("0037"), - GENERATE_PRESIGNED_URLS_FAILURE("0038"); + GENERATE_PRESIGNED_URLS_FAILURE("0038"), + REFRESH_TABLE_INFORMATION_FAILURE("0039"); public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages"; diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index ca71a8233..002ce86a2 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; import net.snowflake.client.core.SFSessionProperty; import org.apache.commons.codec.binary.Base64; import org.apache.parquet.bytes.BytesUtils; @@ -460,4 +461,14 @@ public static long getParquetFooterSize(byte[] bytes) throws IOException { return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset); } + + public static String getTwoHexChars() { + String twoHexChars = + Integer.toHexString((ThreadLocalRandom.current().nextInt() & 0x7FFFFFFF) % 0x100); + if (twoHexChars.length() == 1) { + twoHexChars = "0" + twoHexChars; + } + + return twoHexChars; + } } diff --git a/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties b/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties index 193ae0105..a6e53a2cc 100644 --- a/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties +++ b/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties @@ -44,4 +44,5 @@ 0035=Failed to load {0}. If you use FIPS, import BouncyCastleFipsProvider in the application: {1} 0036=Failed to drop channel: {0} 0037=Deployment ID mismatch, Client was created on: {0}, Got upload location for: {1}. Please restart client: {2}. -0038=Generate presigned URLs request failed: {0}. \ No newline at end of file +0038=Generate presigned URLs request failed: {0}. +0039=Refresh Table Information request failed: {0}. \ No newline at end of file diff --git a/src/test/java/net/snowflake/ingest/TestUtils.java b/src/test/java/net/snowflake/ingest/TestUtils.java index 349a372e2..407cba0bc 100644 --- a/src/test/java/net/snowflake/ingest/TestUtils.java +++ b/src/test/java/net/snowflake/ingest/TestUtils.java @@ -115,9 +115,9 @@ private static void init() throws NoSuchAlgorithmException, InvalidKeySpecExcept account = profile.get(ACCOUNT).asText(); port = profile.get(PORT).asInt(); ssl = profile.get(SSL).asText(); - database = profile.get(DATABASE).asText(); + database = profile.get(DATABASE) == null ? null : profile.get(DATABASE).asText(); connectString = profile.get(CONNECT_STRING).asText(); - schema = profile.get(SCHEMA).asText(); + schema = profile.get(SCHEMA) == null ? null : profile.get(SCHEMA).asText(); warehouse = profile.get(WAREHOUSE).asText(); host = profile.get(HOST).asText(); scheme = profile.get(SCHEME).asText(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 7b8ba7605..86350e03d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -100,7 +100,6 @@ private abstract static class TestContext implements AutoCloseable { FlushService flushService; IStorageManager storageManager; InternalStage storage; - PresignedUrlExternalVolume extVolume; ParameterProvider parameterProvider; RegisterService registerService; @@ -108,7 +107,6 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { storage = Mockito.mock(InternalStage.class); - extVolume = Mockito.mock(PresignedUrlExternalVolume.class); parameterProvider = new ParameterProvider(isIcebergMode); InternalParameterProvider internalParameterProvider = new InternalParameterProvider(isIcebergMode); @@ -118,12 +116,10 @@ private abstract static class TestContext implements AutoCloseable { storageManager = Mockito.spy( isIcebergMode - ? new PresignedUrlExternalVolumeManager( - true, "role", "client", MockSnowflakeServiceClient.create()) + ? new SubscopedTokenExternalVolumeManager( + "role", "client", MockSnowflakeServiceClient.create()) : new InternalStageManager(true, "role", "client", null)); - Mockito.doReturn(isIcebergMode ? extVolume : storage) - .when(storageManager) - .getStorage(ArgumentMatchers.any()); + Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any()); Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); Mockito.when(client.getParameterProvider()) .thenAnswer((Answer) (i) -> parameterProvider); @@ -445,7 +441,7 @@ public void testGetFilePath() { Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; String outputString = - ((InternalStageManager) storageManager).getNextFileName(calendar, clientPrefix); + ((InternalStageManager) storageManager).getNextFileName(calendar, clientPrefix); Path outputPath = Paths.get(outputString); Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); Assert.assertTrue( @@ -1116,7 +1112,7 @@ public void testInvalidateChannels() { innerData.add(channel2Data); IStorageManager storageManager = - Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); + Mockito.spy(new InternalStageManager(true, "role", "client", null)); FlushService flushService = new FlushService<>(client, channelCache, storageManager, false); flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java index 0cfadb92e..150ec566a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java @@ -134,13 +134,15 @@ public void testPutRemote() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); + InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - InternalStage stage = + InternalStage stage = new InternalStage( storageManager, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), 1); @@ -178,11 +180,13 @@ public void testPutLocal() throws Exception { String fullFilePath = "testOutput"; String fileName = "putLocalOutput"; - InternalStage stage = + InternalStage stage = Mockito.spy( new InternalStage( null, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( fullFilePath, Optional.of(System.currentTimeMillis())), 1)); @@ -209,10 +213,12 @@ public void doTestPutRemoteRefreshes() throws Exception { InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - InternalStage stage = - new InternalStage<>( + InternalStage stage = + new InternalStage( storageManager, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), maxUploadRetryCount); @@ -265,11 +271,13 @@ public void testPutRemoteGCS() throws Exception { InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - InternalStage stage = + InternalStage stage = Mockito.spy( - new InternalStage<>( + new InternalStage( storageManager, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), 1)); @@ -302,11 +310,16 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); InternalStageManager storageManager = - new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); + new InternalStageManager(true, "role", "client", snowflakeServiceClient); - InternalStage stage = - new InternalStage<>( - storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); + InternalStage stage = + new InternalStage( + storageManager, + "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, + (SnowflakeFileTransferMetadataWithAge) null, + 1); SnowflakeFileTransferMetadataWithAge metadataWithAge = stage.refreshSnowflakeMetadata(true); @@ -355,10 +368,10 @@ public void testRefreshSnowflakeMetadataDeploymentIdMismatch() throws Exception SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - InternalStageManager storageManager = - new InternalStageManager<>(true, "role", "clientName", snowflakeServiceClient); + InternalStageManager storageManager = + new InternalStageManager(true, "role", "clientName", snowflakeServiceClient); - InternalStage storage = storageManager.getStorage(""); + InternalStage storage = storageManager.getStorage(""); storage.refreshSnowflakeMetadata(true); Assert.assertEquals(prefix + "_" + deploymentId, storageManager.getClientPrefix()); @@ -387,8 +400,8 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockClientInternal.getRole()).thenReturn("role"); SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - InternalStageManager storageManager = - new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); + InternalStageManager storageManager = + new InternalStageManager(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -396,9 +409,14 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - InternalStage stage = + InternalStage stage = new InternalStage( - storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); + storageManager, + "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, + (SnowflakeFileTransferMetadataWithAge) null, + 1); SnowflakeFileTransferMetadataV1 metadata = stage.fetchSignedURL("path/fileName"); @@ -429,8 +447,8 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { Mockito.when(mockClientInternal.getRole()).thenReturn("role"); SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - InternalStageManager storageManager = - new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); + InternalStageManager storageManager = + new InternalStageManager(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -438,9 +456,14 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - InternalStage stage = - new InternalStage<>( - storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); + InternalStage stage = + new InternalStage( + storageManager, + "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, + (SnowflakeFileTransferMetadataWithAge) null, + 1); ThreadFactory buildUploadThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-build-upload-thread-%d").build(); @@ -571,10 +594,12 @@ public void testRefreshMetadataOnFirstPutException() throws Exception { InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - InternalStage stage = - new InternalStage<>( + InternalStage stage = + new InternalStage( storageManager, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), maxUploadRetryCount); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java b/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java index e632c2ed0..13e26fbdd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java @@ -5,6 +5,7 @@ import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.GENERATE_PRESIGNED_URLS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT; import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; import com.fasterxml.jackson.databind.ObjectMapper; @@ -130,6 +131,16 @@ public static CloseableHttpClient createHttpClient(ApiOverride apiOverride) { clientConfigresponseMap.put("deployment_id", 123L); return buildStreamingIngestResponse( HttpStatus.SC_OK, clientConfigresponseMap); + + case REFRESH_TABLE_INFORMATION_ENDPOINT: + Thread.sleep(1); + Map refreshTableInformationMap = new HashMap<>(); + refreshTableInformationMap.put("status_code", 0L); + refreshTableInformationMap.put("message", "OK"); + refreshTableInformationMap.put("iceberg_location", getStageLocationMap()); + return buildStreamingIngestResponse( + HttpStatus.SC_OK, refreshTableInformationMap); + case GENERATE_PRESIGNED_URLS_ENDPOINT: Thread.sleep(1); Map generateUrlsResponseMap = new HashMap<>(); @@ -172,7 +183,6 @@ public static CloseableHttpClient createHttpClient(ApiOverride apiOverride) { openChannelResponseMap.put("table_columns", tableColumnsLists); openChannelResponseMap.put("encryption_key", "test_encryption_key"); openChannelResponseMap.put("encryption_key_id", 123L); - openChannelResponseMap.put("iceberg_location", getStageLocationMap()); return buildStreamingIngestResponse( HttpStatus.SC_OK, openChannelResponseMap); case DROP_CHANNEL_ENDPOINT: @@ -258,8 +268,8 @@ public static Map getStageLocationMap() { Map stageLocationMap = new HashMap<>(); stageLocationMap.put("locationType", "S3"); - stageLocationMap.put("location", "test_location"); - stageLocationMap.put("path", "test_path"); + stageLocationMap.put("location", "container/vol/table/data/streaming_ingest/figsId"); + stageLocationMap.put("path", "table/data/streaming_ingest/figsId/snow_volHash_figsId_1_1_"); stageLocationMap.put("creds", credsMap); stageLocationMap.put("region", "test_region"); stageLocationMap.put("endPoint", "test_endpoint"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java deleted file mode 100644 index afc647c54..000000000 --- a/src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import static org.junit.Assert.*; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import net.snowflake.ingest.utils.SFException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class PresignedUrlPresignedUrlExternalVolumeManagerTest { - private static final ObjectMapper objectMapper = new ObjectMapper(); - private PresignedUrlExternalVolumeManager manager; - private FileLocationInfo fileLocationInfo; - private ExecutorService executorService; - - @Before - public void setup() throws JsonProcessingException { - this.manager = - new PresignedUrlExternalVolumeManager( - false /* isTestMode */, "role", "clientName", MockSnowflakeServiceClient.create()); - - Map fileLocationInfoMap = MockSnowflakeServiceClient.getStageLocationMap(); - fileLocationInfoMap.put("isClientSideEncrypted", false); - String fileLocationInfoStr = objectMapper.writeValueAsString(fileLocationInfoMap); - this.fileLocationInfo = objectMapper.readValue(fileLocationInfoStr, FileLocationInfo.class); - } - - @After - public void teardown() { - if (executorService != null) { - executorService.shutdownNow(); - } - } - - @Test - public void testRegister() { - Exception ex = null; - - try { - this.manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - } catch (Exception e) { - ex = e; - } - - assertNull(ex); - } - - @Test - public void testConcurrentRegisterTable() throws Exception { - int numThreads = 50; - int timeoutInSeconds = 30; - List> allResults = - doConcurrentTest( - numThreads, - timeoutInSeconds, - () -> manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo), - () -> manager.getStorage("db.schema.table")); - PresignedUrlExternalVolume extvol = manager.getStorage("db.schema.table"); - assertNotNull(extvol); - for (int i = 0; i < numThreads; i++) { - assertSame("" + i, extvol, allResults.get(i).get(timeoutInSeconds, TimeUnit.SECONDS)); - } - } - - @Test - public void testGetStorage() { - this.manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - PresignedUrlExternalVolume extvol = this.manager.getStorage("db.schema.table"); - assertNotNull(extvol); - } - - @Test - public void testGetStorageWithoutRegister() { - SFException ex = null; - try { - manager.getStorage("db.schema.table"); - } catch (SFException e) { - ex = e; - } - - assertNotNull(ex); - assertTrue(ex.getVendorCode().equals("0001")); - assertTrue(ex.getMessage().contains("No external volume found for tableRef=db.schema.table")); - } - - @Test - public void testGenerateBlobPath() { - manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - BlobPath blobPath = manager.generateBlobPath("db.schema.table"); - assertNotNull(blobPath); - assertEquals(blobPath.fileRegistrationPath, "f1"); - assertEquals(blobPath.uploadPath, "http://f1.com?token=t1"); - } - - @Test - public void testConcurrentGenerateBlobPath() throws Exception { - int numThreads = 50; - int timeoutInSeconds = 60; - manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - - List> allResults = - doConcurrentTest( - numThreads, - timeoutInSeconds, - () -> { - for (int i = 0; i < 1000; i++) { - manager.generateBlobPath("db.schema.table"); - } - }, - () -> manager.generateBlobPath("db.schema.table")); - for (int i = 0; i < numThreads; i++) { - BlobPath blobPath = allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); - assertNotNull(blobPath); - assertTrue(blobPath.uploadPath, blobPath.uploadPath.contains("http://f1.com?token=t")); - } - } - - private List> doConcurrentTest( - int numThreads, int timeoutInSeconds, Runnable action, Supplier getResult) - throws Exception { - assertNull(executorService); - - executorService = - new ThreadPoolExecutor( - numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); - List> tasks = new ArrayList<>(); - final CyclicBarrier startBarrier = new CyclicBarrier(numThreads); - final CyclicBarrier endBarrier = new CyclicBarrier(numThreads); - for (int i = 0; i < numThreads; i++) { - tasks.add( - () -> { - startBarrier.await(timeoutInSeconds, TimeUnit.SECONDS); - action.run(); - endBarrier.await(); - return getResult.get(); - }); - } - - List> allResults = executorService.invokeAll(tasks); - allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); - return allResults; - } - - @Test - public void testGetClientPrefix() { - assertEquals(manager.getClientPrefix(), "test_prefix_123"); - } -} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManagerTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManagerTest.java new file mode 100644 index 000000000..898bf67e8 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManagerTest.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.streaming.internal.SubscopedTokenExternalVolumeManager.generateBlobPathFromLocationInfoPath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; +import org.apache.commons.lang3.function.TriConsumer; +import org.assertj.core.api.Condition; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class SubscopedTokenExternalVolumeManagerTest { + private SubscopedTokenExternalVolumeManager manager; + private ExecutorService executorService; + + @Before + public void setup() { + this.manager = + new SubscopedTokenExternalVolumeManager( + "role", "clientName", MockSnowflakeServiceClient.create()); + } + + @After + public void teardown() { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + @Test + public void testRegister() { + assertThatCode(() -> this.manager.registerTable(new TableRef("db", "schema", "table"))) + .doesNotThrowAnyException(); + } + + @Test + public void testConcurrentRegisterTable() throws Exception { + int numThreads = 50; + int timeoutInSeconds = 30; + List> allResults = + doConcurrentTest( + numThreads, + timeoutInSeconds, + () -> manager.registerTable(new TableRef("db", "schema", "table")), + () -> manager.getStorage("db.schema.table")); + InternalStage extvol = manager.getStorage("db.schema.table"); + assertThat(extvol).isNotNull(); + for (int i = 0; i < numThreads; i++) { + assertThat(allResults.get(i).get(timeoutInSeconds, TimeUnit.SECONDS)) + .as("getStorage from threadId=%d", i) + .isSameAs(extvol); + } + } + + @Test + public void testGetStorage() { + this.manager.registerTable(new TableRef("db", "schema", "table")); + InternalStage extvol = this.manager.getStorage("db.schema.table"); + assertThat(extvol).isNotNull(); + } + + @Test + public void testGetStorageWithoutRegister() { + assertThatExceptionOfType(SFException.class) + .isThrownBy(() -> manager.getStorage("db.schema.table")) + .withMessageEndingWith("No external volume found for tableRef=db.schema.table.") + .has( + new Condition( + ex -> ex.getVendorCode().equals("0001"), "vendor code 0001")); + } + + @Test + public void testGenerateBlobPathPublic() { + manager.registerTable(new TableRef("db", "schema", "table")); + BlobPath blobPath = manager.generateBlobPath("db.schema.table"); + assertThat(blobPath).isNotNull(); + assertThat(blobPath.uploadPath).isNotNull().endsWith("/snow_volHash_figsId_1_1_0.parquet"); + + assertThat(blobPath.fileRegistrationPath) + .isNotNull() + .isEqualTo("table/data/streaming_ingest/figsId/" + blobPath.uploadPath); + } + + @Test + public void testGenerateBlobPathInternals() { + int i = 1; + assertThatThrownBy(() -> generateBlobPathFromLocationInfoPath("db.sch.tbl1", "snow_", "0a", 1)) + .hasMessageEndingWith("File path returned by server is invalid."); + assertThatThrownBy( + () -> generateBlobPathFromLocationInfoPath("db.sch.tbl1", "1/snow_", "0a", 1)) + .hasMessageEndingWith("File path returned by server is invalid."); + assertThatThrownBy( + () -> generateBlobPathFromLocationInfoPath("db.sch.tbl1", "1/2/snow_", "0a", 1)) + .hasMessageEndingWith("File path returned by server is invalid."); + + AtomicInteger counter = new AtomicInteger(1); + TriConsumer test = + (String testPath, String pathPrefix, String fileName) -> { + String twoHexChars = Utils.getTwoHexChars(); + BlobPath blobPath = + generateBlobPathFromLocationInfoPath( + "db.sch.tbl1", testPath, twoHexChars, counter.getAndIncrement()); + assertThat(blobPath).isNotNull(); + assertThat(blobPath.uploadPath) + .isNotNull() + .isEqualTo(String.format("%s/%s", twoHexChars, fileName)); + assertThat(blobPath.fileRegistrationPath) + .isNotNull() + .isEqualTo(String.format("%s/%s/%s", pathPrefix, twoHexChars, fileName)); + }; + + // happypath + test.accept( + "vol/table/data/streaming_ingest/figsId/snow_", + "vol/table/data/streaming_ingest/figsId", + "snow_1.parquet"); + + // vol has extra subfoldrs + test.accept( + "vol/vol2/vol3/table/data/streaming_ingest/figsId/snow_", + "vol/vol2/vol3/table/data/streaming_ingest/figsId", + "snow_2.parquet"); + + // table has extra subfolders + test.accept( + "vol/table/t2/t3/data/streaming_ingest/figsId/snow_", + "vol/table/t2/t3/data/streaming_ingest/figsId", + "snow_3.parquet"); + + // no table + test.accept( + "vol/data/streaming_ingest/figsId/snow_", + "vol/data/streaming_ingest/figsId", + "snow_4.parquet"); + } + + @Test + public void testConcurrentGenerateBlobPath() throws Exception { + int numThreads = 50; + int timeoutInSeconds = 60; + manager.registerTable(new TableRef("db", "schema", "table")); + + List> allResults = + doConcurrentTest( + numThreads, + timeoutInSeconds, + () -> { + for (int i = 0; i < 1000; i++) { + manager.generateBlobPath("db.schema.table"); + } + }, + () -> manager.generateBlobPath("db.schema.table")); + for (int i = 0; i < numThreads; i++) { + BlobPath blobPath = allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); + assertThat(blobPath).isNotNull(); + assertThat(blobPath.uploadPath).contains("/snow_volHash_figsId_1_1_"); + } + } + + private List> doConcurrentTest( + int numThreads, int timeoutInSeconds, Runnable action, Supplier getResult) + throws Exception { + assertThat(executorService).isNull(); + + executorService = + new ThreadPoolExecutor( + numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + List> tasks = new ArrayList<>(); + final CyclicBarrier startBarrier = new CyclicBarrier(numThreads); + final CyclicBarrier endBarrier = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + tasks.add( + () -> { + startBarrier.await(timeoutInSeconds, TimeUnit.SECONDS); + action.run(); + endBarrier.await(); + return getResult.get(); + }); + } + + List> allResults = executorService.invokeAll(tasks); + allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); + return allResults; + } + + @Test + public void testGetClientPrefix() { + assertThat(manager.getClientPrefix()).isEqualTo("test_prefix_123"); + } +}