diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java index cb5affd8e..2ee9f9df0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java @@ -26,7 +26,7 @@ interface IStorageManager { IStorage getStorage(String fullyQualifiedTableName); /** Informs the storage manager about a new table that's being ingested into by the client. */ - void registerTable(TableRef tableRef, FileLocationInfo locationInfo); + void registerTable(TableRef tableRef); /** * Generate a unique blob path and increment the blob sequencer diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java index e1192ba6c..1dc65cd09 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java @@ -97,8 +97,7 @@ class InternalStage implements IStorage { (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries); Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix); - this.fileLocationInfo = fileLocationInfo; - this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo); + setFileLocationInfo(fileLocationInfo); } /** @@ -214,8 +213,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) */ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boolean force) throws SnowflakeSQLException, IOException { - logger.logInfo("Refresh Snowflake metadata, client={} force={}", clientName, force); - if (!force && fileTransferMetadataWithAge != null && fileTransferMetadataWithAge.timestamp.isPresent() @@ -224,12 +221,19 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } + logger.logInfo( + "Refresh Snowflake metadata, client={} force={} tableRef={}", clientName, force, tableRef); + FileLocationInfo location = this.owningManager.getRefreshedLocation(this.tableRef, Optional.empty()); - SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location); - this.fileLocationInfo = location; - this.fileTransferMetadataWithAge = metadata; - return metadata; + setFileLocationInfo(location); + return this.fileTransferMetadataWithAge; + } + + private synchronized void setFileLocationInfo(FileLocationInfo fileLocationInfo) + throws SnowflakeSQLException, IOException { + this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo); + this.fileLocationInfo = fileLocationInfo; } static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index e7014274f..b1a5e7665 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -116,7 +116,7 @@ public InternalStage getStorage(String fullyQualifiedTableName) { * nothing as there's no per-table state yet for FDN tables (that use internal stages). */ @Override - public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {} + public void registerTable(TableRef tableRef) {} /** * Gets the latest file location info (with a renewed short-lived access token) for the specified diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java deleted file mode 100644 index ed8e2891f..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java +++ /dev/null @@ -1,442 +0,0 @@ -package net.snowflake.ingest.streaming.internal; - -import static net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse.PresignedUrlInfo; - -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import net.snowflake.client.core.ExecTimeTelemetryData; -import net.snowflake.client.core.HttpClientSettingsKey; -import net.snowflake.client.core.OCSPMode; -import net.snowflake.client.jdbc.RestRequest; -import net.snowflake.client.jdbc.SnowflakeSQLException; -import net.snowflake.client.jdbc.SnowflakeUtil; -import net.snowflake.client.jdbc.cloud.storage.SnowflakeStorageClient; -import net.snowflake.client.jdbc.cloud.storage.StageInfo; -import net.snowflake.client.jdbc.cloud.storage.StorageClientFactory; -import net.snowflake.client.jdbc.internal.apache.http.client.HttpResponseException; -import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; -import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPut; -import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder; -import net.snowflake.client.jdbc.internal.apache.http.entity.ByteArrayEntity; -import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; -import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils; -import net.snowflake.client.jdbc.internal.google.api.client.http.HttpStatusCodes; -import net.snowflake.ingest.connection.IngestResponseException; -import net.snowflake.ingest.utils.ErrorCode; -import net.snowflake.ingest.utils.HttpUtil; -import net.snowflake.ingest.utils.Logging; -import net.snowflake.ingest.utils.SFException; - -/** Handles uploading files to the Iceberg Table's external volume's table data path */ -class PresignedUrlExternalVolume implements IStorage { - // TODO everywhere: static final should be named in all capitals - private static final Logging logger = new Logging(PresignedUrlExternalVolume.class); - private static final int DEFAULT_PRESIGNED_URL_COUNT = 10; - private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900; - - // Allowing concurrent generate URL requests is a weak form of adapting to high throughput - // clients. The low watermark ideally should be adaptive too for such clients,will wait for perf - // runs to show its necessary. - private static final int MAX_CONCURRENT_GENERATE_URLS_REQUESTS = 10; - private static final int LOW_WATERMARK_FOR_EARLY_REFRESH = 5; - - private final String clientName; - private final String clientPrefix; - private final Long deploymentId; - private final String role; - - // The db name, schema name and table name for this storage location - private final TableRef tableRef; - - // The RPC client for snowflake cloud service - private final SnowflakeServiceClient serviceClient; - - // semaphore to limit how many RPCs go out for one location concurrently - private final Semaphore generateUrlsSemaphore; - - // thread-safe queue of unused URLs, to be disbursed whenever flush codepath is cutting the next - // file - private final ConcurrentLinkedQueue presignedUrlInfos; - - // sometimes-stale counter of how many URLs are remaining, to avoid calling presignedUrls.size() - // and increasing lock contention / volatile reads on the internal data structures inside - // ConcurrentLinkedQueue - private final AtomicInteger numUrlsInQueue; - - private final FileLocationInfo locationInfo; - private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata; - - PresignedUrlExternalVolume( - String clientName, - String clientPrefix, - Long deploymentId, - String role, - TableRef tableRef, - FileLocationInfo locationInfo, - SnowflakeServiceClient serviceClient) { - this.clientName = clientName; - this.clientPrefix = clientPrefix; - this.deploymentId = deploymentId; - this.role = role; - this.tableRef = tableRef; - this.serviceClient = serviceClient; - this.locationInfo = locationInfo; - this.presignedUrlInfos = new ConcurrentLinkedQueue<>(); - this.numUrlsInQueue = new AtomicInteger(0); - this.generateUrlsSemaphore = new Semaphore(MAX_CONCURRENT_GENERATE_URLS_REQUESTS); - - if (this.locationInfo.getIsClientSideEncrypted()) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - "Cannot ingest into an external volume that requests client side encryption"); - } - if ("S3".equalsIgnoreCase(this.locationInfo.getLocationType())) { - // add dummy values so that JDBC's S3 client creation doesn't barf on initialization. - this.locationInfo.getCredentials().put("AWS_KEY_ID", "key"); - this.locationInfo.getCredentials().put("AWS_SECRET_KEY", "secret"); - } - - try { - this.fileTransferMetadata = - InternalStage.createFileTransferMetadataWithAge(this.locationInfo); - } catch (JsonProcessingException - | SnowflakeSQLException - | net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException e) { - throw new SFException(e, ErrorCode.INTERNAL_ERROR); - } - - // the caller is just setting up this object and not expecting a URL to be returned (unlike in - // dequeueUrlInfo), thus waitUntilAcquired=false. - generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH, false /* waitUntilAcquired */); - } - - // TODO : Add timing ; add logging ; add retries ; add http exception handling better than - // client.handleEx? - @Override - public void put(BlobPath blobPath, byte[] blob) { - if (this.fileTransferMetadata.isLocalFS) { - InternalStage.putLocal( - this.fileTransferMetadata.localLocation, blobPath.fileRegistrationPath, blob); - return; - } - - try { - putRemote(blobPath.uploadPath, blob); - } catch (Throwable e) { - throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE); - } - } - - private void putRemote(String blobPath, byte[] blob) - throws SnowflakeSQLException, URISyntaxException, IOException { - // TODO: Add a backlog item for somehow doing multipart upload with presigned URLs (each part - // has its own URL) for large files - - // already verified that client side encryption is disabled, in the ctor's call to generateUrls - final Properties proxyProperties = HttpUtil.generateProxyPropertiesForJDBC(); - final HttpClientSettingsKey key = - SnowflakeUtil.convertProxyPropertiesToHttpClientKey(OCSPMode.FAIL_OPEN, proxyProperties); - - StageInfo stageInfo = fileTransferMetadata.fileTransferMetadata.getStageInfo(); - SnowflakeStorageClient client = - StorageClientFactory.getFactory().createClient(stageInfo, 1, null, null); - - URIBuilder uriBuilder = new URIBuilder(blobPath); - HttpPut httpRequest = new HttpPut(uriBuilder.build()); - httpRequest.setEntity(new ByteArrayEntity(blob)); - - addHeadersToHttpRequest(httpRequest, blob, stageInfo, client); - - if (stageInfo.getStageType().equals(StageInfo.StageType.AZURE)) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, "Azure based external volumes are not yet supported."); - - /* commenting out unverified code, will fix this when server changes are in preprod / some test deplo - URI storageEndpoint = - new URI( - "https", - stageInfo.getStorageAccount() + "." + stageInfo.getEndPoint() + "/", - null, - null); - String sasToken = blobPath.substring(blobPath.indexOf("?")); - StorageCredentials azCreds = new StorageCredentialsSharedAccessSignature(sasToken); - CloudBlobClient azClient = new CloudBlobClient(storageEndpoint, azCreds); - - CloudBlobContainer container = azClient.getContainerReference(stageInfo.getLocation().substring(0, stageInfo.getLocation().indexOf("/"))); - CloudBlockBlob azBlob = container.getBlockBlobReference(); - azBlob.setMetadata((HashMap) meta.getUserMetadata()); - - OperationContext opContext = new OperationContext(); - net.snowflake.client.core.HttpUtil.setSessionlessProxyForAzure(proxyProperties, opContext); - - BlobRequestOptions options = new BlobRequestOptions(); - - try { - azBlob.uploadFromByteArray(blob, 0, blob.length, null, options, opContext); - } catch (Exception ex) { - ((SnowflakeAzureClient) client).handleStorageException(ex, 0, "upload", null, null, null); - } - */ - } - - CloseableHttpClient httpClient = net.snowflake.client.core.HttpUtil.getHttpClient(key); - CloseableHttpResponse response = - RestRequest.execute( - httpClient, - httpRequest, - 0, // retry timeout - 0, // auth timeout - (int) - net.snowflake.client.core.HttpUtil.getSocketTimeout() - .toMillis(), // socket timeout in ms - 1, // max retries - 0, // no socket timeout injection - null, // no canceling signaler, TODO: wire up thread interrupt with setting this - // AtomicBoolean to avoid retries/sleeps - false, // no cookie - false, // no url retry query parameters - false, // no request_guid - true, // retry on HTTP 403 - true, // no retry - new ExecTimeTelemetryData()); - - int statusCode = response.getStatusLine().getStatusCode(); - if (!HttpStatusCodes.isSuccess(statusCode)) { - Exception ex = - new HttpResponseException( - response.getStatusLine().getStatusCode(), - String.format( - "%s, body: %s", - response.getStatusLine().getReasonPhrase(), - EntityUtils.toString(response.getEntity()))); - - client.handleStorageException(ex, 0, "upload", null, null, null); - } - } - - private void addHeadersToHttpRequest( - HttpPut httpRequest, byte[] blob, StageInfo stageInfo, SnowflakeStorageClient client) { - // no need to set this as it causes a Content-length header is already set error in apache's - // http client. - // httpRequest.setHeader("Content-Length", "" + blob.length); - - // TODO: These custom headers need to be a part of the presigned URL HMAC computation in S3, - // we'll disable them for now until we can do presigned URL generation AFTER we have the digest. - - /* - final String clientKey = this.clientPrefix; - final String clientName = this.clientName; - - final byte[] digestBytes; - try { - digestBytes = MessageDigest.getInstance("SHA-256").digest(blob); - } catch (NoSuchAlgorithmException e) { - throw new SFException(e, ErrorCode.INTERNAL_ERROR); - } - - final String digest = Base64.getEncoder().encodeToString(digestBytes); - - StorageObjectMetadata meta = StorageClientFactory.getFactory().createStorageMetadataObj(stageInfo.getStageType()); - - client.addDigestMetadata(meta, digest); - client.addStreamingIngestMetadata(meta, clientName, clientKey); - - switch (stageInfo.getStageType()) { - case S3: - httpRequest.setHeader("x-amz-server-side-encryption", ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - httpRequest.setHeader("x-amz-checksum-sha256", digest); // TODO why does JDBC use a custom x–amz-meta-sfc-digest header for this - for (Map.Entry entry : meta.getUserMetadata().entrySet()) { - httpRequest.setHeader("x-amz-meta-" + entry.getKey(), entry.getValue()); - } - - break; - - case AZURE: - for (Map.Entry entry : meta.getUserMetadata().entrySet()) { - httpRequest.setHeader("x-ms-meta-" + entry.getKey(), entry.getValue()); - } - break; - - case GCS: - for (Map.Entry entry : meta.getUserMetadata().entrySet()) { - httpRequest.setHeader("x-goog-meta-" + entry.getKey(), entry.getValue()); - } - break; - } - */ - } - - PresignedUrlInfo dequeueUrlInfo() { - // use a 60 second buffer in case the executor service is backed up / serialization takes time / - // upload runs slow / etc. - long validUntilAtleastTimestamp = System.currentTimeMillis() + 60 * 1000; - - // TODO: Wire in a checkStop to get out of this infinite loop. - while (true) { - PresignedUrlInfo info = this.presignedUrlInfos.poll(); - if (info == null) { - // if the queue is empty, trigger a url refresh AND wait for it to complete. - // loop around when done to try and ready from the queue again. - generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH, true /* waitUntilAcquired */); - continue; - } - - // we dequeued a url, do the appropriate bookkeeping. - int remainingUrlsInQueue = this.numUrlsInQueue.decrementAndGet(); - - if (info.validUntilTimestamp < validUntilAtleastTimestamp) { - // This url can expire by the time it gets used, loop around and dequeue another URL. - continue; - } - - // if we're nearing url exhaustion, go fetch another batch. Don't wait for the response as we - // already have a valid URL to be used by the current caller of dequeueUrlInfo. - if (remainingUrlsInQueue <= LOW_WATERMARK_FOR_EARLY_REFRESH) { - // TODO: do this generation on a background thread to allow the current thread to make - // progress ? Will wait for perf runs to know this is an issue that needs addressal. - generateUrls(LOW_WATERMARK_FOR_EARLY_REFRESH, false /* waitUntilAcquired */); - } - - return info; - } - } - - // NOTE : We are intentionally NOT re-enqueuing unused URLs here as that can cause correctness - // issues by accidentally enqueuing a URL that was actually used to write data out. Its okay to - // allow an unused URL to go waste as we'll just go out and generate new URLs. - // Do NOT add an enqueueUrl() method for this reason. - - /** - * Fetches new presigned URLs from snowflake. - * - * @param minCountToSkipGeneration Skip the RPC if we already have this many URLs in the queue - * @param waitUntilAcquired when true, make the current thread block on having enough URLs in the - * queue - */ - private void generateUrls(int minCountToSkipGeneration, boolean waitUntilAcquired) { - int numAcquireAttempts = 0; - boolean acquired = false; - - while (!acquired && numAcquireAttempts++ < 300) { - // Use an aggressive timeout value as its possible that the other requests finished and added - // enough URLs to the queue. If we use a higher timeout value, this calling thread's flush is - // going to unnecessarily be blocked when URLs have already been added to the queue. - try { - // If waitUntilAcquired is false, the caller is not interested in waiting for the results - // The semaphore being already "full" implies there are many another requests in flight - // and we can just early exit to the caller. - int timeoutInSeconds = waitUntilAcquired ? 1 : 0; - acquired = this.generateUrlsSemaphore.tryAcquire(timeoutInSeconds, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // if the thread was interrupted there's nothing we can do about it, definitely shouldn't - // continue processing. - - // reset the interrupted flag on the thread in case someone in the callstack wants to - // gracefully continue processing. - boolean interrupted = Thread.interrupted(); - - String message = - String.format( - "Semaphore acquisition in ExternalVolume.generateUrls was interrupted, likely" - + " because the process is shutting down. TableRef=%s Thread.interrupted=%s", - tableRef, interrupted); - logger.logError(message); - throw new SFException(ErrorCode.INTERNAL_ERROR, message); - } - - // In case Acquire took time because no permits were available, it implies we already had N - // other threads fetching more URLs. In that case we should be content with what's in the - // buffer instead of doing another RPC potentially unnecessarily. - if (this.numUrlsInQueue.get() >= minCountToSkipGeneration) { - // release the semaphore before early-exiting to avoid a leak in semaphore permits. - if (acquired) { - this.generateUrlsSemaphore.release(); - } - - return; - } - - // If we couldn't acquire the semaphore, and the caller was doing an optimistic generateUrls - // but does NOT want to wait around for a successful generatePresignedUrlsResponse, then - // early exit and allow the caller to move on. - if (!acquired && !waitUntilAcquired) { - logger.logDebug( - "Skipping generateUrls because semaphore acquisition failed AND waitUntilAcquired ==" - + " false."); - return; - } - } - - // if we're here without acquiring, that implies the numAcquireAttempts went over 300. We're at - // an impasse and so there's nothing more to be done except error out, as that gives the client - // a chance to - // restart. - if (!acquired) { - String message = - String.format("Could not acquire semaphore to generate URLs. TableRef=%s", tableRef); - logger.logError(message); - throw new SFException(ErrorCode.INTERNAL_ERROR, message); - } - - // we have acquired a semaphore permit at this point, must release before returning - - try { - long currentTimestamp = System.currentTimeMillis(); - long validUntilTimestamp = - currentTimestamp + (DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS * 1000); - GeneratePresignedUrlsResponse response = - doGenerateUrls(DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS); - List urlInfos = response.getPresignedUrlInfos(); - urlInfos = - urlInfos.stream() - .map( - info -> { - info.validUntilTimestamp = validUntilTimestamp; - return info; - }) - .filter( - info -> { - if (info == null - || info.url == null - || info.fileName == null - || info.url.isEmpty()) { - logger.logError( - "Received unexpected null or empty URL in externalVolume.generateUrls" - + " tableRef=%s", - this.tableRef); - return false; - } - - return true; - }) - .collect(Collectors.toList()); - - // these are both thread-safe operations individually, and there is no need to do them inside - // a lock. For an infinitesimal time the numUrlsInQueue will under represent the number of - // entries in the queue. - this.presignedUrlInfos.addAll(urlInfos); - this.numUrlsInQueue.addAndGet(urlInfos.size()); - } finally { - this.generateUrlsSemaphore.release(); - } - } - - private GeneratePresignedUrlsResponse doGenerateUrls(int timeoutInSeconds) { - try { - return this.serviceClient.generatePresignedUrls( - new GeneratePresignedUrlsRequest( - tableRef, role, DEFAULT_PRESIGNED_URL_COUNT, timeoutInSeconds, deploymentId, true)); - - } catch (IngestResponseException | IOException e) { - throw new SFException(e, ErrorCode.GENERATE_PRESIGNED_URLS_FAILURE, e.getMessage()); - } - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java deleted file mode 100644 index 6ce3cb555..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import static net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse.PresignedUrlInfo; - -import java.io.IOException; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import net.snowflake.ingest.connection.IngestResponseException; -import net.snowflake.ingest.utils.ErrorCode; -import net.snowflake.ingest.utils.Logging; -import net.snowflake.ingest.utils.SFException; - -/** Class to manage multiple external volumes */ -class PresignedUrlExternalVolumeManager implements IStorageManager { - // TODO: Rename all logger members to LOGGER and checkin code formatting rules - private static final Logging logger = new Logging(PresignedUrlExternalVolumeManager.class); - // Reference to the external volume per table - private final Map externalVolumeMap; - - // name of the owning client - private final String clientName; - - private final String role; - - // Reference to the Snowflake service client used for configure calls - private final SnowflakeServiceClient serviceClient; - - // Client prefix generated by the Snowflake server - private final String clientPrefix; - - // Deployment ID returned by the Snowflake server - private final Long deploymentId; - - // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if - // openChannel is called - // multiple times concurrently) - private final Object registerTableLock = new Object(); - - /** - * Constructor for ExternalVolumeManager - * - * @param isTestMode whether the manager in test mode - * @param role the role of the client - * @param clientName the name of the client - * @param snowflakeServiceClient the Snowflake service client used for configure calls - */ - PresignedUrlExternalVolumeManager( - boolean isTestMode, - String role, - String clientName, - SnowflakeServiceClient snowflakeServiceClient) { - this.clientName = clientName; - this.role = role; - this.serviceClient = snowflakeServiceClient; - this.externalVolumeMap = new ConcurrentHashMap<>(); - try { - ClientConfigureResponse response = - this.serviceClient.clientConfigure(new ClientConfigureRequest(role)); - this.clientPrefix = isTestMode ? "testPrefix" : response.getClientPrefix(); - this.deploymentId = response.getDeploymentId(); - } catch (IngestResponseException | IOException e) { - throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); - } - logger.logDebug( - "Created PresignedUrlExternalVolumeManager with clientName=%s and clientPrefix=%s", - clientName, clientPrefix); - } - - /** - * Given a fully qualified table name, return the target storage by looking up the table name - * - * @param fullyQualifiedTableName the target fully qualified table name - * @return target storage - */ - @Override - public PresignedUrlExternalVolume getStorage(String fullyQualifiedTableName) { - // Only one chunk per blob in Iceberg mode. - return getVolumeSafe(fullyQualifiedTableName); - } - - /** Informs the storage manager about a new table that's being ingested into by the client. */ - @Override - public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { - if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { - logger.logInfo( - "Skip registering table since its already been registered with the VolumeManager." - + " tableRef=%s", - tableRef); - return; - } - - // future enhancement - per table locks instead of per-client lock - synchronized (registerTableLock) { - if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { - logger.logInfo( - "Skip registering table since its already been registered with the VolumeManager." - + " tableRef=%s", - tableRef); - return; - } - - try { - PresignedUrlExternalVolume externalVolume = - new PresignedUrlExternalVolume( - clientName, - getClientPrefix(), - deploymentId, - role, - tableRef, - locationInfo, - serviceClient); - this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume); - } catch (SFException ex) { - logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); - // allow external volume ctor's SFExceptions to bubble up directly - throw ex; - } catch (Exception err) { - logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); - throw new SFException( - err, - ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, - String.format("fullyQualifiedTableName=%s", tableRef)); - } - } - } - - @Override - public BlobPath generateBlobPath(String fullyQualifiedTableName) { - PresignedUrlExternalVolume volume = getVolumeSafe(fullyQualifiedTableName); - PresignedUrlInfo urlInfo = volume.dequeueUrlInfo(); - return new BlobPath(urlInfo.url /* uploadPath */, urlInfo.fileName /* fileRegistrationPath */); - } - - /** - * Get the client prefix from first external volume in the map - * - * @return the client prefix - */ - @Override - public String getClientPrefix() { - return this.clientPrefix; - } - - private PresignedUrlExternalVolume getVolumeSafe(String fullyQualifiedTableName) { - PresignedUrlExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName); - - if (volume == null) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - String.format("No external volume found for tableRef=%s", fullyQualifiedTableName)); - } - - return volume; - } - - @Override - public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional fileName) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Unsupported"); - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 5729b9baf..e9282c3b4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -239,7 +239,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.storageManager = isIcebergMode ? new SubscopedTokenExternalVolumeManager( - isTestMode, this.role, this.name, this.snowflakeServiceClient) + this.role, this.name, this.snowflakeServiceClient) : new InternalStageManager( isTestMode, this.role, this.name, this.snowflakeServiceClient); @@ -408,8 +408,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest this.channelCache.addChannel(channel); this.storageManager.registerTable( - new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), - response.getIcebergLocationInfo()); + new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName())); return channel; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java index 2317a59f8..5977403c0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Map; import java.util.Optional; @@ -13,6 +14,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; /** Class to manage multiple external volumes */ class SubscopedTokenExternalVolumeManager implements IStorageManager { @@ -45,16 +47,12 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager { /** * Constructor for ExternalVolumeManager * - * @param isTestMode whether the manager in test mode * @param role the role of the client * @param clientName the name of the client * @param snowflakeServiceClient the Snowflake service client used for configure calls */ SubscopedTokenExternalVolumeManager( - boolean isTestMode, - String role, - String clientName, - SnowflakeServiceClient snowflakeServiceClient) { + String role, String clientName, SnowflakeServiceClient snowflakeServiceClient) { this.clientName = clientName; this.role = role; this.counter = new AtomicLong(0); @@ -63,7 +61,7 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager { try { ClientConfigureResponse response = this.serviceClient.clientConfigure(new ClientConfigureRequest(role)); - this.clientPrefix = isTestMode ? "testPrefix" : response.getClientPrefix(); + this.clientPrefix = response.getClientPrefix(); this.deploymentId = response.getDeploymentId(); } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); @@ -87,7 +85,7 @@ public InternalStage getStorage(String fullyQualifiedTableName) { /** Informs the storage manager about a new table that's being ingested into by the client. */ @Override - public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { + public void registerTable(TableRef tableRef) { if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { logger.logInfo( "Skip registering table since its already been registered with the VolumeManager." @@ -106,6 +104,12 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { return; } + // get the locationInfo when we know this is the first register call for a given table. + // This is done to reduce the unnecessary overload on token generation if a client opens up a + // hundred channels at + // the same time. + FileLocationInfo locationInfo = getRefreshedLocation(tableRef, Optional.empty()); + try { InternalStage externalVolume = new InternalStage( @@ -136,16 +140,41 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { public BlobPath generateBlobPath(String fullyQualifiedTableName) { InternalStage volume = getVolumeSafe(fullyQualifiedTableName); - // {nullableTableBasePath}/data/streaming_ingest/{figsId}/{twoHexChars}/snow_{volumeHash}_{figsId}_{workerRank}_1_ - String filePathRelativeToVolume = volume.getFileLocationInfo().getPath(); + // {nullableTableBasePath}/data/streaming_ingest/{figsId}/snow_{volumeHash}_{figsId}_{workerRank}_1_ + return generateBlobPathFromLocationInfoPath( + fullyQualifiedTableName, + volume.getFileLocationInfo().getPath(), + Utils.getTwoHexChars(), + this.counter.getAndIncrement()); + } + + @VisibleForTesting + static BlobPath generateBlobPathFromLocationInfoPath( + String fullyQualifiedTableName, + String filePathRelativeToVolume, + String twoHexChars, + long counterValue) { String[] parts = filePathRelativeToVolume.split("/"); - if (parts.length < 6) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "File path returned by server is invalid."); + if (parts.length < 5) { + logger.logError( + "Invalid file path returned by server. Table=%s FilePathRelativeToVolume=%s", + fullyQualifiedTableName, filePathRelativeToVolume); + throw new SFException(ErrorCode.INTERNAL_ERROR, "File path returned by server is invalid"); } - String suffix = this.counter.getAndIncrement() + ".parquet"; - + // add twoHexChars as a prefix to the fileName (the last part of fileLocationInfo.getPath) String fileNameRelativeToCredentialedPath = parts[parts.length - 1]; + fileNameRelativeToCredentialedPath = + String.join("/", twoHexChars, fileNameRelativeToCredentialedPath); + + // set this new fileName (with the prefix) back on the parts array so the full path can be + // reconstructed + parts[parts.length - 1] = fileNameRelativeToCredentialedPath; + filePathRelativeToVolume = String.join("/", parts); + + // add a monotonically increasing counter at the end and the file extension + String suffix = counterValue + ".parquet"; + return new BlobPath( fileNameRelativeToCredentialedPath + suffix /* uploadPath */, filePathRelativeToVolume + suffix /* fileRegistrationPath */); @@ -167,6 +196,20 @@ public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional RefreshTableInformationResponse response = this.serviceClient.refreshTableInformation( new RefreshTableInformationRequest(tableRef, this.role, true)); + logger.logDebug("Refreshed tokens for table=%s", tableRef); + if (response.getIcebergLocationInfo() == null) { + logger.logError( + "Did not receive location info, this will cause ingestion to grind to a halt." + + " TableRef=%s"); + } else { + Map creds = response.getIcebergLocationInfo().getCredentials(); + if (creds == null || creds.isEmpty()) { + logger.logError( + "Did not receive creds in location info, this will cause ingestion to grind to a" + + " halt. TableRef=%s"); + } + } + return response.getIcebergLocationInfo(); } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.REFRESH_TABLE_INFORMATION_FAILURE, e.getMessage()); diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index ca71a8233..002ce86a2 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; import net.snowflake.client.core.SFSessionProperty; import org.apache.commons.codec.binary.Base64; import org.apache.parquet.bytes.BytesUtils; @@ -460,4 +461,14 @@ public static long getParquetFooterSize(byte[] bytes) throws IOException { return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset); } + + public static String getTwoHexChars() { + String twoHexChars = + Integer.toHexString((ThreadLocalRandom.current().nextInt() & 0x7FFFFFFF) % 0x100); + if (twoHexChars.length() == 1) { + twoHexChars = "0" + twoHexChars; + } + + return twoHexChars; + } } diff --git a/src/test/java/net/snowflake/ingest/TestUtils.java b/src/test/java/net/snowflake/ingest/TestUtils.java index 349a372e2..407cba0bc 100644 --- a/src/test/java/net/snowflake/ingest/TestUtils.java +++ b/src/test/java/net/snowflake/ingest/TestUtils.java @@ -115,9 +115,9 @@ private static void init() throws NoSuchAlgorithmException, InvalidKeySpecExcept account = profile.get(ACCOUNT).asText(); port = profile.get(PORT).asInt(); ssl = profile.get(SSL).asText(); - database = profile.get(DATABASE).asText(); + database = profile.get(DATABASE) == null ? null : profile.get(DATABASE).asText(); connectString = profile.get(CONNECT_STRING).asText(); - schema = profile.get(SCHEMA).asText(); + schema = profile.get(SCHEMA) == null ? null : profile.get(SCHEMA).asText(); warehouse = profile.get(WAREHOUSE).asText(); host = profile.get(HOST).asText(); scheme = profile.get(SCHEME).asText(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 9df3f1b11..86350e03d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -100,7 +100,6 @@ private abstract static class TestContext implements AutoCloseable { FlushService flushService; IStorageManager storageManager; InternalStage storage; - PresignedUrlExternalVolume extVolume; ParameterProvider parameterProvider; RegisterService registerService; @@ -108,7 +107,6 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { storage = Mockito.mock(InternalStage.class); - extVolume = Mockito.mock(PresignedUrlExternalVolume.class); parameterProvider = new ParameterProvider(isIcebergMode); InternalParameterProvider internalParameterProvider = new InternalParameterProvider(isIcebergMode); @@ -118,12 +116,10 @@ private abstract static class TestContext implements AutoCloseable { storageManager = Mockito.spy( isIcebergMode - ? new PresignedUrlExternalVolumeManager( - true, "role", "client", MockSnowflakeServiceClient.create()) + ? new SubscopedTokenExternalVolumeManager( + "role", "client", MockSnowflakeServiceClient.create()) : new InternalStageManager(true, "role", "client", null)); - Mockito.doReturn(isIcebergMode ? extVolume : storage) - .when(storageManager) - .getStorage(ArgumentMatchers.any()); + Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any()); Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); Mockito.when(client.getParameterProvider()) .thenAnswer((Answer) (i) -> parameterProvider); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java b/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java index 33ccc3824..13e26fbdd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java @@ -131,6 +131,7 @@ public static CloseableHttpClient createHttpClient(ApiOverride apiOverride) { clientConfigresponseMap.put("deployment_id", 123L); return buildStreamingIngestResponse( HttpStatus.SC_OK, clientConfigresponseMap); + case REFRESH_TABLE_INFORMATION_ENDPOINT: Thread.sleep(1); Map refreshTableInformationMap = new HashMap<>(); @@ -182,7 +183,6 @@ public static CloseableHttpClient createHttpClient(ApiOverride apiOverride) { openChannelResponseMap.put("table_columns", tableColumnsLists); openChannelResponseMap.put("encryption_key", "test_encryption_key"); openChannelResponseMap.put("encryption_key_id", 123L); - openChannelResponseMap.put("iceberg_location", getStageLocationMap()); return buildStreamingIngestResponse( HttpStatus.SC_OK, openChannelResponseMap); case DROP_CHANNEL_ENDPOINT: @@ -268,8 +268,8 @@ public static Map getStageLocationMap() { Map stageLocationMap = new HashMap<>(); stageLocationMap.put("locationType", "S3"); - stageLocationMap.put("location", "test_location"); - stageLocationMap.put("path", "test_path"); + stageLocationMap.put("location", "container/vol/table/data/streaming_ingest/figsId"); + stageLocationMap.put("path", "table/data/streaming_ingest/figsId/snow_volHash_figsId_1_1_"); stageLocationMap.put("creds", credsMap); stageLocationMap.put("region", "test_region"); stageLocationMap.put("endPoint", "test_endpoint"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java deleted file mode 100644 index afc647c54..000000000 --- a/src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import static org.junit.Assert.*; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import net.snowflake.ingest.utils.SFException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class PresignedUrlPresignedUrlExternalVolumeManagerTest { - private static final ObjectMapper objectMapper = new ObjectMapper(); - private PresignedUrlExternalVolumeManager manager; - private FileLocationInfo fileLocationInfo; - private ExecutorService executorService; - - @Before - public void setup() throws JsonProcessingException { - this.manager = - new PresignedUrlExternalVolumeManager( - false /* isTestMode */, "role", "clientName", MockSnowflakeServiceClient.create()); - - Map fileLocationInfoMap = MockSnowflakeServiceClient.getStageLocationMap(); - fileLocationInfoMap.put("isClientSideEncrypted", false); - String fileLocationInfoStr = objectMapper.writeValueAsString(fileLocationInfoMap); - this.fileLocationInfo = objectMapper.readValue(fileLocationInfoStr, FileLocationInfo.class); - } - - @After - public void teardown() { - if (executorService != null) { - executorService.shutdownNow(); - } - } - - @Test - public void testRegister() { - Exception ex = null; - - try { - this.manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - } catch (Exception e) { - ex = e; - } - - assertNull(ex); - } - - @Test - public void testConcurrentRegisterTable() throws Exception { - int numThreads = 50; - int timeoutInSeconds = 30; - List> allResults = - doConcurrentTest( - numThreads, - timeoutInSeconds, - () -> manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo), - () -> manager.getStorage("db.schema.table")); - PresignedUrlExternalVolume extvol = manager.getStorage("db.schema.table"); - assertNotNull(extvol); - for (int i = 0; i < numThreads; i++) { - assertSame("" + i, extvol, allResults.get(i).get(timeoutInSeconds, TimeUnit.SECONDS)); - } - } - - @Test - public void testGetStorage() { - this.manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - PresignedUrlExternalVolume extvol = this.manager.getStorage("db.schema.table"); - assertNotNull(extvol); - } - - @Test - public void testGetStorageWithoutRegister() { - SFException ex = null; - try { - manager.getStorage("db.schema.table"); - } catch (SFException e) { - ex = e; - } - - assertNotNull(ex); - assertTrue(ex.getVendorCode().equals("0001")); - assertTrue(ex.getMessage().contains("No external volume found for tableRef=db.schema.table")); - } - - @Test - public void testGenerateBlobPath() { - manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - BlobPath blobPath = manager.generateBlobPath("db.schema.table"); - assertNotNull(blobPath); - assertEquals(blobPath.fileRegistrationPath, "f1"); - assertEquals(blobPath.uploadPath, "http://f1.com?token=t1"); - } - - @Test - public void testConcurrentGenerateBlobPath() throws Exception { - int numThreads = 50; - int timeoutInSeconds = 60; - manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - - List> allResults = - doConcurrentTest( - numThreads, - timeoutInSeconds, - () -> { - for (int i = 0; i < 1000; i++) { - manager.generateBlobPath("db.schema.table"); - } - }, - () -> manager.generateBlobPath("db.schema.table")); - for (int i = 0; i < numThreads; i++) { - BlobPath blobPath = allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); - assertNotNull(blobPath); - assertTrue(blobPath.uploadPath, blobPath.uploadPath.contains("http://f1.com?token=t")); - } - } - - private List> doConcurrentTest( - int numThreads, int timeoutInSeconds, Runnable action, Supplier getResult) - throws Exception { - assertNull(executorService); - - executorService = - new ThreadPoolExecutor( - numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); - List> tasks = new ArrayList<>(); - final CyclicBarrier startBarrier = new CyclicBarrier(numThreads); - final CyclicBarrier endBarrier = new CyclicBarrier(numThreads); - for (int i = 0; i < numThreads; i++) { - tasks.add( - () -> { - startBarrier.await(timeoutInSeconds, TimeUnit.SECONDS); - action.run(); - endBarrier.await(); - return getResult.get(); - }); - } - - List> allResults = executorService.invokeAll(tasks); - allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); - return allResults; - } - - @Test - public void testGetClientPrefix() { - assertEquals(manager.getClientPrefix(), "test_prefix_123"); - } -} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManagerTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManagerTest.java new file mode 100644 index 000000000..898bf67e8 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManagerTest.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.streaming.internal.SubscopedTokenExternalVolumeManager.generateBlobPathFromLocationInfoPath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; +import org.apache.commons.lang3.function.TriConsumer; +import org.assertj.core.api.Condition; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class SubscopedTokenExternalVolumeManagerTest { + private SubscopedTokenExternalVolumeManager manager; + private ExecutorService executorService; + + @Before + public void setup() { + this.manager = + new SubscopedTokenExternalVolumeManager( + "role", "clientName", MockSnowflakeServiceClient.create()); + } + + @After + public void teardown() { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + @Test + public void testRegister() { + assertThatCode(() -> this.manager.registerTable(new TableRef("db", "schema", "table"))) + .doesNotThrowAnyException(); + } + + @Test + public void testConcurrentRegisterTable() throws Exception { + int numThreads = 50; + int timeoutInSeconds = 30; + List> allResults = + doConcurrentTest( + numThreads, + timeoutInSeconds, + () -> manager.registerTable(new TableRef("db", "schema", "table")), + () -> manager.getStorage("db.schema.table")); + InternalStage extvol = manager.getStorage("db.schema.table"); + assertThat(extvol).isNotNull(); + for (int i = 0; i < numThreads; i++) { + assertThat(allResults.get(i).get(timeoutInSeconds, TimeUnit.SECONDS)) + .as("getStorage from threadId=%d", i) + .isSameAs(extvol); + } + } + + @Test + public void testGetStorage() { + this.manager.registerTable(new TableRef("db", "schema", "table")); + InternalStage extvol = this.manager.getStorage("db.schema.table"); + assertThat(extvol).isNotNull(); + } + + @Test + public void testGetStorageWithoutRegister() { + assertThatExceptionOfType(SFException.class) + .isThrownBy(() -> manager.getStorage("db.schema.table")) + .withMessageEndingWith("No external volume found for tableRef=db.schema.table.") + .has( + new Condition( + ex -> ex.getVendorCode().equals("0001"), "vendor code 0001")); + } + + @Test + public void testGenerateBlobPathPublic() { + manager.registerTable(new TableRef("db", "schema", "table")); + BlobPath blobPath = manager.generateBlobPath("db.schema.table"); + assertThat(blobPath).isNotNull(); + assertThat(blobPath.uploadPath).isNotNull().endsWith("/snow_volHash_figsId_1_1_0.parquet"); + + assertThat(blobPath.fileRegistrationPath) + .isNotNull() + .isEqualTo("table/data/streaming_ingest/figsId/" + blobPath.uploadPath); + } + + @Test + public void testGenerateBlobPathInternals() { + int i = 1; + assertThatThrownBy(() -> generateBlobPathFromLocationInfoPath("db.sch.tbl1", "snow_", "0a", 1)) + .hasMessageEndingWith("File path returned by server is invalid."); + assertThatThrownBy( + () -> generateBlobPathFromLocationInfoPath("db.sch.tbl1", "1/snow_", "0a", 1)) + .hasMessageEndingWith("File path returned by server is invalid."); + assertThatThrownBy( + () -> generateBlobPathFromLocationInfoPath("db.sch.tbl1", "1/2/snow_", "0a", 1)) + .hasMessageEndingWith("File path returned by server is invalid."); + + AtomicInteger counter = new AtomicInteger(1); + TriConsumer test = + (String testPath, String pathPrefix, String fileName) -> { + String twoHexChars = Utils.getTwoHexChars(); + BlobPath blobPath = + generateBlobPathFromLocationInfoPath( + "db.sch.tbl1", testPath, twoHexChars, counter.getAndIncrement()); + assertThat(blobPath).isNotNull(); + assertThat(blobPath.uploadPath) + .isNotNull() + .isEqualTo(String.format("%s/%s", twoHexChars, fileName)); + assertThat(blobPath.fileRegistrationPath) + .isNotNull() + .isEqualTo(String.format("%s/%s/%s", pathPrefix, twoHexChars, fileName)); + }; + + // happypath + test.accept( + "vol/table/data/streaming_ingest/figsId/snow_", + "vol/table/data/streaming_ingest/figsId", + "snow_1.parquet"); + + // vol has extra subfoldrs + test.accept( + "vol/vol2/vol3/table/data/streaming_ingest/figsId/snow_", + "vol/vol2/vol3/table/data/streaming_ingest/figsId", + "snow_2.parquet"); + + // table has extra subfolders + test.accept( + "vol/table/t2/t3/data/streaming_ingest/figsId/snow_", + "vol/table/t2/t3/data/streaming_ingest/figsId", + "snow_3.parquet"); + + // no table + test.accept( + "vol/data/streaming_ingest/figsId/snow_", + "vol/data/streaming_ingest/figsId", + "snow_4.parquet"); + } + + @Test + public void testConcurrentGenerateBlobPath() throws Exception { + int numThreads = 50; + int timeoutInSeconds = 60; + manager.registerTable(new TableRef("db", "schema", "table")); + + List> allResults = + doConcurrentTest( + numThreads, + timeoutInSeconds, + () -> { + for (int i = 0; i < 1000; i++) { + manager.generateBlobPath("db.schema.table"); + } + }, + () -> manager.generateBlobPath("db.schema.table")); + for (int i = 0; i < numThreads; i++) { + BlobPath blobPath = allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); + assertThat(blobPath).isNotNull(); + assertThat(blobPath.uploadPath).contains("/snow_volHash_figsId_1_1_"); + } + } + + private List> doConcurrentTest( + int numThreads, int timeoutInSeconds, Runnable action, Supplier getResult) + throws Exception { + assertThat(executorService).isNull(); + + executorService = + new ThreadPoolExecutor( + numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + List> tasks = new ArrayList<>(); + final CyclicBarrier startBarrier = new CyclicBarrier(numThreads); + final CyclicBarrier endBarrier = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + tasks.add( + () -> { + startBarrier.await(timeoutInSeconds, TimeUnit.SECONDS); + action.run(); + endBarrier.await(); + return getResult.get(); + }); + } + + List> allResults = executorService.invokeAll(tasks); + allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); + return allResults; + } + + @Test + public void testGetClientPrefix() { + assertThat(manager.getClientPrefix()).isEqualTo("test_prefix_123"); + } +}