From 20242c64c1c1d8f28b8e9d785ec71e7267251d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Hofman?= Date: Thu, 5 Sep 2024 20:21:06 +0200 Subject: [PATCH 1/4] SNOW-1651983 authenticated proxy on azure --- parent-pom.xml | 2 +- .../java/net/snowflake/client/core/HttpUtil.java | 14 ++++++++++---- .../client/core/CoreUtilsMiscellaneousTest.java | 2 ++ 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/parent-pom.xml b/parent-pom.xml index 28412d161..9fecf4ea3 100644 --- a/parent-pom.xml +++ b/parent-pom.xml @@ -24,7 +24,7 @@ 1.8.1 4.2.0 1.12.655 - 5.0.0 + 8.6.6 1.74 1.0.2.4 1.0.5 diff --git a/src/main/java/net/snowflake/client/core/HttpUtil.java b/src/main/java/net/snowflake/client/core/HttpUtil.java index 166bd7e0a..537c692c3 100644 --- a/src/main/java/net/snowflake/client/core/HttpUtil.java +++ b/src/main/java/net/snowflake/client/core/HttpUtil.java @@ -225,11 +225,17 @@ public static void setSessionlessProxyForAzure( */ public static void setProxyForAzure(HttpClientSettingsKey key, OperationContext opContext) { if (key != null && key.usesProxy()) { - Proxy azProxy = - new Proxy(Proxy.Type.HTTP, new InetSocketAddress(key.getProxyHost(), key.getProxyPort())); - logger.debug( - "Setting Azure proxy. Host: {}, port: {}", key.getProxyHost(), key.getProxyPort()); + Proxy azProxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(key.getProxyHost(), key.getProxyPort())); opContext.setProxy(azProxy); + boolean setProxyUser = !Strings.isNullOrEmpty(key.getProxyUser()) && !Strings.isNullOrEmpty(key.getProxyPassword()); + logger.debug("Setting Azure proxy {} user. Host: {}, port: {}", + setProxyUser ? "with" : "without", + key.getProxyHost(), + key.getProxyPort()); + if (setProxyUser) { + opContext.setProxyUsername(key.getProxyUser()); + opContext.setProxyPassword(key.getProxyPassword()); + } } else { logger.debug("Omitting Azure proxy setup"); } diff --git a/src/test/java/net/snowflake/client/core/CoreUtilsMiscellaneousTest.java b/src/test/java/net/snowflake/client/core/CoreUtilsMiscellaneousTest.java index f11614c8b..87df6c659 100644 --- a/src/test/java/net/snowflake/client/core/CoreUtilsMiscellaneousTest.java +++ b/src/test/java/net/snowflake/client/core/CoreUtilsMiscellaneousTest.java @@ -162,6 +162,8 @@ public void testSetProxyForAzure() { false); HttpUtil.setProxyForAzure(testKey, op); Proxy proxy = op.getProxy(); + assertEquals("testuser", op.getProxyUsername()); + assertEquals("pw", op.getProxyPassword()); assertEquals(Proxy.Type.HTTP, proxy.type()); assertEquals(new InetSocketAddress("snowflakecomputing.com", 443), proxy.address()); } From 0e8de1a1ae15bc65351e97fc9208d7aefe352e1e Mon Sep 17 00:00:00 2001 From: sfc-gh-astachowski Date: Fri, 6 Sep 2024 07:54:32 +0200 Subject: [PATCH 2/4] Formatting --- .../java/net/snowflake/client/core/HttpUtil.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/java/net/snowflake/client/core/HttpUtil.java b/src/main/java/net/snowflake/client/core/HttpUtil.java index 537c692c3..fb3cf7552 100644 --- a/src/main/java/net/snowflake/client/core/HttpUtil.java +++ b/src/main/java/net/snowflake/client/core/HttpUtil.java @@ -225,13 +225,17 @@ public static void setSessionlessProxyForAzure( */ public static void setProxyForAzure(HttpClientSettingsKey key, OperationContext opContext) { if (key != null && key.usesProxy()) { - Proxy azProxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(key.getProxyHost(), key.getProxyPort())); + Proxy azProxy = + new Proxy(Proxy.Type.HTTP, new InetSocketAddress(key.getProxyHost(), key.getProxyPort())); opContext.setProxy(azProxy); - boolean setProxyUser = !Strings.isNullOrEmpty(key.getProxyUser()) && !Strings.isNullOrEmpty(key.getProxyPassword()); - logger.debug("Setting Azure proxy {} user. Host: {}, port: {}", - setProxyUser ? "with" : "without", - key.getProxyHost(), - key.getProxyPort()); + boolean setProxyUser = + !Strings.isNullOrEmpty(key.getProxyUser()) + && !Strings.isNullOrEmpty(key.getProxyPassword()); + logger.debug( + "Setting Azure proxy {} user. Host: {}, port: {}", + setProxyUser ? "with" : "without", + key.getProxyHost(), + key.getProxyPort()); if (setProxyUser) { opContext.setProxyUsername(key.getProxyUser()); opContext.setProxyPassword(key.getProxyPassword()); From 8656d349ff5c6111685f1970d4625d26fc5bb300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kubik?= Date: Mon, 16 Sep 2024 13:00:44 +0200 Subject: [PATCH 3/4] Initial azure-storage-blob sdk bump Migrates the azure storage interactions to the V12 api. Still WIP, requires additional error handling, a bit of refactoring and new unit tests --- parent-pom.xml | 12 +- pom.xml | 4 +- .../net/snowflake/client/core/HttpUtil.java | 116 +++++--- .../snowflake/client/jdbc/RestRequest.java | 10 +- .../jdbc/SnowflakeFileTransferAgent.java | 5 +- .../storage/AzureObjectSummariesIterator.java | 28 +- .../cloud/storage/SnowflakeAzureClient.java | 277 +++++++++--------- .../cloud/storage/StorageObjectSummary.java | 46 ++- .../StorageObjectSummaryCollection.java | 14 +- .../core/CoreUtilsMiscellaneousTest.java | 32 +- ...akeAzureClientHandleExceptionLatestIT.java | 73 ++++- .../storage/SnowflakeAzureClientLatestIT.java | 7 +- .../storage/SnowflakeAzureClientTest.java | 62 ++-- 13 files changed, 391 insertions(+), 295 deletions(-) diff --git a/parent-pom.xml b/parent-pom.xml index 9fecf4ea3..9a66aec9c 100644 --- a/parent-pom.xml +++ b/parent-pom.xml @@ -24,7 +24,7 @@ 1.8.1 4.2.0 1.12.655 - 8.6.6 + 12.26.1 1.74 1.0.2.4 1.0.5 @@ -200,9 +200,9 @@ ${google.guava.version} - com.microsoft.azure - azure-storage - ${azure.storage.version} + com.azure + azure-storage-blob + ${azure.storage.blob.version} com.nimbusds @@ -575,8 +575,8 @@ google-http-client - com.microsoft.azure - azure-storage + com.azure + azure-storage-blob com.nimbusds diff --git a/pom.xml b/pom.xml index 096641174..9a76edd03 100644 --- a/pom.xml +++ b/pom.xml @@ -808,8 +808,8 @@ ${shadeBase}.software.amazon.ion - com.microsoft.azure - ${shadeBase}.microsoft.azure + com.azure + ${shadeBase}.azure com.fasterxml diff --git a/src/main/java/net/snowflake/client/core/HttpUtil.java b/src/main/java/net/snowflake/client/core/HttpUtil.java index fb3cf7552..8b5f320e2 100644 --- a/src/main/java/net/snowflake/client/core/HttpUtil.java +++ b/src/main/java/net/snowflake/client/core/HttpUtil.java @@ -9,9 +9,10 @@ import static org.apache.http.client.config.CookieSpecs.IGNORE_COOKIES; import com.amazonaws.ClientConfiguration; +import com.azure.core.http.ProxyOptions; +import com.azure.core.util.HttpClientOptions; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.microsoft.azure.storage.OperationContext; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -183,63 +184,85 @@ public static void setSessionlessProxyForS3( * from the StageInfo * * @param proxyProperties proxy properties - * @param opContext the configuration needed by Azure to set the proxy + * @param httpClientOptions the configuration needed by Azure to set the proxy * @throws SnowflakeSQLException */ public static void setSessionlessProxyForAzure( - Properties proxyProperties, OperationContext opContext) throws SnowflakeSQLException { - if (proxyProperties != null - && proxyProperties.size() > 0 - && proxyProperties.getProperty(SFSessionProperty.USE_PROXY.getPropertyKey()) != null) { - Boolean useProxy = - Boolean.valueOf( - proxyProperties.getProperty(SFSessionProperty.USE_PROXY.getPropertyKey())); - if (useProxy) { - String proxyHost = - proxyProperties.getProperty(SFSessionProperty.PROXY_HOST.getPropertyKey()); - int proxyPort; - try { - proxyPort = - Integer.parseInt( - proxyProperties.getProperty(SFSessionProperty.PROXY_PORT.getPropertyKey())); - } catch (NumberFormatException | NullPointerException e) { - throw new SnowflakeSQLException( - ErrorCode.INVALID_PROXY_PROPERTIES, "Could not parse port number"); - } - Proxy azProxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); - logger.debug("Setting sessionless Azure proxy. Host: {}, port: {}", proxyHost, proxyPort); - opContext.setProxy(azProxy); - } else { - logger.debug("Omitting sessionless Azure proxy setup as proxy is disabled"); - } - } else { + Properties proxyProperties, HttpClientOptions httpClientOptions) throws SnowflakeSQLException { + if (proxyProperties == null + || proxyProperties.isEmpty() + || proxyProperties.getProperty(SFSessionProperty.USE_PROXY.getPropertyKey()) == null) { logger.debug("Omitting sessionless Azure proxy setup"); + return; + } + + boolean useProxy = + Boolean.parseBoolean(proxyProperties.getProperty(SFSessionProperty.USE_PROXY.getPropertyKey())); + if (!useProxy) { + logger.debug("Omitting sessionless Azure proxy setup as proxy is disabled"); } + + // TODO: extract common logic + String proxyHost = + proxyProperties.getProperty(SFSessionProperty.PROXY_HOST.getPropertyKey()); + int proxyPort; + try { + proxyPort = + Integer.parseInt( + proxyProperties.getProperty(SFSessionProperty.PROXY_PORT.getPropertyKey())); + } catch (NumberFormatException | NullPointerException e) { + throw new SnowflakeSQLException( + ErrorCode.INVALID_PROXY_PROPERTIES, "Could not parse port number"); + } + + String proxyUser = proxyProperties.getProperty(SFSessionProperty.PROXY_USER.getPropertyKey()); + String proxyPassword = proxyProperties.getProperty(SFSessionProperty.PROXY_PASSWORD.getPropertyKey()); + + boolean setProxyUser = + !Strings.isNullOrEmpty(proxyUser); +// Isn't blank password allowed? Also, the userid I guess. Source: RFC 2617 +// && !Strings.isNullOrEmpty(proxyPassword); + + ProxyOptions proxyOptions = new ProxyOptions(ProxyOptions.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); + if (setProxyUser) { + proxyOptions.setCredentials(proxyUser, proxyPassword); + } + + logger.info( + "Setting Azure proxy {} user. Host: {}, port: {}", + setProxyUser ? "with" : "without", + proxyHost, + proxyPort); + + httpClientOptions.setProxyOptions(proxyOptions); } /** * A static function to set Azure proxy params when there is a valid session * * @param key key to HttpClient map containing OCSP and proxy info - * @param opContext the configuration needed by Azure to set the proxy + * @param httpClientOptions the configuration needed by Azure to set the proxy */ - public static void setProxyForAzure(HttpClientSettingsKey key, OperationContext opContext) { + public static void setProxyForAzure(HttpClientSettingsKey key, HttpClientOptions httpClientOptions) { if (key != null && key.usesProxy()) { - Proxy azProxy = - new Proxy(Proxy.Type.HTTP, new InetSocketAddress(key.getProxyHost(), key.getProxyPort())); - opContext.setProxy(azProxy); + ProxyOptions proxyOptions = + new ProxyOptions(ProxyOptions.Type.HTTP, new InetSocketAddress(key.getProxyHost(), key.getProxyPort())); boolean setProxyUser = - !Strings.isNullOrEmpty(key.getProxyUser()) - && !Strings.isNullOrEmpty(key.getProxyPassword()); - logger.debug( + !Strings.isNullOrEmpty(key.getProxyUser()); +// Isn't blank password allowed? Also, the userid I guess. Source: RFC 2617 +// && !Strings.isNullOrEmpty(key.getProxyPassword()); + logger.info( "Setting Azure proxy {} user. Host: {}, port: {}", setProxyUser ? "with" : "without", key.getProxyHost(), key.getProxyPort()); if (setProxyUser) { - opContext.setProxyUsername(key.getProxyUser()); - opContext.setProxyPassword(key.getProxyPassword()); + proxyOptions.setCredentials(key.getProxyUser(), key.getProxyPassword()); + logger.info("Azure proxy user: {}, azure proxy password: {}", key.getProxyUser(), key.getProxyPassword()); } + + httpClientOptions.setProxyOptions(proxyOptions); + } else { logger.debug("Omitting Azure proxy setup"); } @@ -883,6 +906,23 @@ private static String executeRequestInternal( return theString; } + // TODO: do we need this? + public static void configureHttpClientTimeouts(HttpClientOptions httpClientOptions) { + // Set the connection timeout for a request to be sent. + httpClientOptions.setConnectTimeout(Duration.ofMillis(DEFAULT_HTTP_CLIENT_CONNECTION_TIMEOUT_IN_MS)); + // Set the duration of time before an idle connection. + httpClientOptions.setConnectionIdleTimeout(Duration.ofSeconds(DEFAULT_IDLE_CONNECTION_TIMEOUT)); +//// Set the maximum connection pool size used by the underlying HTTP client. +// httpClientOptions.setMaximumConnectionPoolSize(SystemUtil.convertSystemPropertyToIntValue( +// JDBC_MAX_CONNECTIONS_PROPERTY, DEFAULT_MAX_CONNECTIONS)); +//// Sets the read timeout duration used when reading the server response. +// httpClientOptions.setReadTimeout(Duration readTimeout); +//// Sets the response timeout duration used when waiting for a server to reply. +// httpClientOptions.setResponseTimeout(Duration responseTimeout); +//// Sets the writing timeout for a request to be sent. +// httpClientOptions.setWriteTimeout(Duration writeTimeout); + } + // This is a workaround for JDK-7036144. // // The GZIPInputStream prematurely closes its input if a) it finds diff --git a/src/main/java/net/snowflake/client/jdbc/RestRequest.java b/src/main/java/net/snowflake/client/jdbc/RestRequest.java index 5be46c5de..69c174e52 100644 --- a/src/main/java/net/snowflake/client/jdbc/RestRequest.java +++ b/src/main/java/net/snowflake/client/jdbc/RestRequest.java @@ -361,19 +361,19 @@ public static CloseableHttpResponse execute( break; } else { if (response != null) { - logger.debug( + logger.info( "{}HTTP response not ok: status code: {}, request: {}", requestIdStr, response.getStatusLine().getStatusCode(), requestInfoScrubbed); } else if (savedEx != null) { - logger.debug( + logger.info( "{}Null response for cause: {}, request: {}", requestIdStr, getRootCause(savedEx).getMessage(), requestInfoScrubbed); } else { - logger.debug("{}Null response for request: {}", requestIdStr, requestInfoScrubbed); + logger.info("{}Null response for request: {}", requestIdStr, requestInfoScrubbed); } // get the elapsed time for the last request @@ -490,14 +490,14 @@ public static CloseableHttpResponse execute( // sleep for backoff - elapsed amount of time if (backoffInMilli > elapsedMilliForLastCall) { try { - logger.debug( + logger.info( "{}Retry request {}: sleeping for {} ms", requestIdStr, requestInfoScrubbed, backoffInMilli); Thread.sleep(backoffInMilli); } catch (InterruptedException ex1) { - logger.debug("{}Backoff sleep before retrying login got interrupted", requestIdStr); + logger.info("{}Backoff sleep before retrying login got interrupted", requestIdStr); } elapsedMilliForTransientIssues += backoffInMilli; backoffInMilli = diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java index bd5a3945e..341c95714 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java @@ -1579,9 +1579,10 @@ public boolean execute() throws SQLException { private void uploadStream() throws SnowflakeSQLException { try { FileMetadata fileMetadata = fileMetadataMap.get(SRC_FILE_NAME_FOR_STREAM); + logger.info("Start uploading stream {}", SRC_FILE_NAME_FOR_STREAM); if (fileMetadata.resultStatus == ResultStatus.SKIPPED) { - logger.debug( + logger.info( "Skipping {}, status: {}, details: {}", SRC_FILE_NAME_FOR_STREAM, fileMetadata.resultStatus, @@ -2083,7 +2084,7 @@ private static void pushFileToRemoteStore( remoteLocation.path + (!remoteLocation.path.endsWith("/") ? "/" : "") + destFileName; } - logger.debug( + logger.info( "Upload object. Location: {}, key: {}, srcFile: {}, encryption: {}", remoteLocation.location, destFileName, diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/AzureObjectSummariesIterator.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/AzureObjectSummariesIterator.java index cce70a835..3f4b02729 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/AzureObjectSummariesIterator.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/AzureObjectSummariesIterator.java @@ -5,10 +5,11 @@ package net.snowflake.client.jdbc.cloud.storage; import com.amazonaws.services.kms.model.UnsupportedOperationException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.ListBlobItem; +import com.azure.storage.blob.models.BlobItem; + import java.util.Iterator; import java.util.NoSuchElementException; + import net.snowflake.client.log.SFLogger; import net.snowflake.client.log.SFLoggerFactory; @@ -21,15 +22,17 @@ public class AzureObjectSummariesIterator implements Iterator { private static final SFLogger logger = SFLoggerFactory.getLogger(AzureObjectSummariesIterator.class); - Iterator itemIterator; + private final String storageLocation; + Iterator itemIterator; /* * Constructs a summaries iterator object from an iterable derived by a * lostBlobs method * @param azCloudBlobIterable an iterable set of ListBlobItems */ - public AzureObjectSummariesIterator(Iterable azCloudBlobIterable) { + public AzureObjectSummariesIterator(Iterable azCloudBlobIterable, String azStorageLocation) { itemIterator = azCloudBlobIterable.iterator(); + storageLocation = azStorageLocation; } public boolean hasNext() { @@ -46,16 +49,15 @@ public boolean hasNext() { } public StorageObjectSummary next() { - ListBlobItem listBlobItem = itemIterator.next(); - - if (!(listBlobItem instanceof CloudBlob)) { - // The only other possible type would a CloudDirectory - // This should never happen since we are listing items as a flat list - logger.debug("Unexpected listBlobItem instance type: {}", listBlobItem.getClass()); - throw new IllegalArgumentException("Unexpected listBlobItem instance type"); - } + BlobItem blobItem = itemIterator.next(); +// if (!(blobItem.getProperties().getBlobType() instanceof BlobClient)) { +// // The only other possible type would a CloudDirectory +// // This should never happen since we are listing items as a flat list +// logger.debug("Unexpected listBlobItem instance type: {}", blobItem.getClass()); +// throw new IllegalArgumentException("Unexpected listBlobItem instance type"); +// } - return StorageObjectSummary.createFromAzureListBlobItem(listBlobItem); + return StorageObjectSummary.createFromAzureListBlobItem(blobItem, storageLocation); } public void remove() { diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java index cdf303bbd..688120256 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java @@ -6,24 +6,26 @@ import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED; import static net.snowflake.client.jdbc.SnowflakeUtil.systemGetProperty; +import com.azure.core.http.rest.Response; +import com.azure.core.util.HttpClientOptions; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobProperties; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.ListBlobsOptions; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlobDownloadToFileOptions; +import com.azure.storage.blob.options.BlobParallelUploadOptions; + +import com.azure.storage.blob.specialized.BlockBlobClient; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.microsoft.azure.storage.OperationContext; -import com.microsoft.azure.storage.StorageCredentials; -import com.microsoft.azure.storage.StorageCredentialsAnonymous; -import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.StorageExtendedErrorInformation; -import com.microsoft.azure.storage.blob.BlobListingDetails; -import com.microsoft.azure.storage.blob.BlobProperties; -import com.microsoft.azure.storage.blob.BlobRequestOptions; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudBlockBlob; -import com.microsoft.azure.storage.blob.ListBlobItem; + import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -33,14 +35,15 @@ import java.net.URI; import java.net.URISyntaxException; import java.security.InvalidKeyException; +import java.time.Duration; import java.util.AbstractMap; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Base64; -import java.util.EnumSet; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import net.snowflake.client.core.HttpUtil; import net.snowflake.client.core.ObjectMapperFactory; import net.snowflake.client.core.SFBaseSession; @@ -76,9 +79,8 @@ public class SnowflakeAzureClient implements SnowflakeStorageClient { private int encryptionKeySize = 0; // used for PUTs private StageInfo stageInfo; private RemoteStoreFileEncryptionMaterial encMat; - private CloudBlobClient azStorageClient; + private BlobServiceClient azBlobServiceClient; private static final SFLogger logger = SFLoggerFactory.getLogger(SnowflakeAzureClient.class); - private OperationContext opContext = null; private SFBaseSession session; private SnowflakeAzureClient() {} @@ -121,20 +123,23 @@ private void setupAzureClient( this.encMat = encMat; this.session = sfSession; - logger.debug("Setting up the Azure client ", false); + logger.info("Setting up the Azure client ", false); try { URI storageEndpoint = buildAzureStorageEndpointURI(stage.getEndPoint(), stage.getStorageAccount()); - StorageCredentials azCreds; + BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder() + .endpoint(storageEndpoint.toString()); + String sasToken = (String) stage.getCredentials().get("AZURE_SAS_TOKEN"); if (sasToken != null) { // We are authenticated with a shared access token. - azCreds = new StorageCredentialsSharedAccessSignature(sasToken); + logger.info("sas token set"); + blobServiceClientBuilder.sasToken(sasToken); } else { // Use anonymous authentication. - azCreds = StorageCredentialsAnonymous.ANONYMOUS; + logger.info("anonymous authentication"); } if (stage.getIsClientSideEncrypted() && encMat != null) { @@ -151,13 +156,20 @@ private void setupAzureClient( encryptionKeySize); } } - this.azStorageClient = new CloudBlobClient(storageEndpoint, azCreds); - opContext = new OperationContext(); + + HttpClientOptions httpClientOptions = new HttpClientOptions(); + HttpUtil.configureHttpClientTimeouts(httpClientOptions); + if (session != null) { - HttpUtil.setProxyForAzure(session.getHttpClientKey(), opContext); + HttpUtil.setProxyForAzure(session.getHttpClientKey(), httpClientOptions); } else { - HttpUtil.setSessionlessProxyForAzure(stage.getProxyProperties(), opContext); + HttpUtil.setSessionlessProxyForAzure(stage.getProxyProperties(), httpClientOptions); } + + this.azBlobServiceClient = blobServiceClientBuilder + .clientOptions(httpClientOptions) + .buildClient(); + } catch (URISyntaxException ex) { throw new IllegalArgumentException("invalid_azure_credentials"); } @@ -237,20 +249,12 @@ public StorageObjectSummaryCollection listObjects(String remoteStorageLocation, throws StorageProviderException { StorageObjectSummaryCollection storageObjectSummaries; - try { - CloudBlobContainer container = azStorageClient.getContainerReference(remoteStorageLocation); - Iterable listBlobItemIterable = - container.listBlobs( - prefix, // List the BLOBs under this prefix - true, // List the BLOBs as a flat list, i.e. do not list directories - EnumSet.noneOf(BlobListingDetails.class), - (BlobRequestOptions) null, - opContext); - storageObjectSummaries = new StorageObjectSummaryCollection(listBlobItemIterable); - } catch (URISyntaxException | StorageException ex) { - logger.debug("Failed to list objects: {}", ex); - throw new StorageProviderException(ex); - } + ListBlobsOptions listBlobsOptions = new ListBlobsOptions() + .setPrefix(prefix); + + BlobContainerClient blobContainerClient = azBlobServiceClient.getBlobContainerClient(remoteStorageLocation); + List blobs = blobContainerClient.listBlobs(listBlobsOptions, Duration.ofSeconds(60)).stream().collect(Collectors.toList()); + storageObjectSummaries = new StorageObjectSummaryCollection(blobs, remoteStorageLocation); return storageObjectSummaries; } @@ -266,33 +270,24 @@ public StorageObjectSummaryCollection listObjects(String remoteStorageLocation, public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, String prefix) throws StorageProviderException { CommonObjectMetadata azureObjectMetadata = null; - try { - // Get a reference to the BLOB, to retrieve its metadata - CloudBlobContainer container = azStorageClient.getContainerReference(remoteStorageLocation); - CloudBlob blob = container.getBlockBlobReference(prefix); - blob.downloadAttributes(null, null, opContext); - - // Get the user-defined BLOB metadata - Map userDefinedMetadata = blob.getMetadata(); - - // Get the BLOB system properties we care about - BlobProperties properties = blob.getProperties(); - long contentLength = properties.getLength(); - String contentEncoding = properties.getContentEncoding(); - - // Construct an Azure metadata object - azureObjectMetadata = - new CommonObjectMetadata(contentLength, contentEncoding, userDefinedMetadata); - } catch (StorageException ex) { - logger.debug( - "Failed to retrieve BLOB metadata: {} - {}", - ex.getErrorCode(), - FormatStorageExtendedErrorInformation(ex.getExtendedErrorInformation())); - throw new StorageProviderException(ex); - } catch (URISyntaxException ex) { - logger.debug("Cannot retrieve BLOB properties, invalid URI: {}", ex); - throw new StorageProviderException(ex); - } + // Get a reference to the BLOB, to retrieve its metadata + BlobContainerClient blobContainerClient = azBlobServiceClient.getBlobContainerClient(remoteStorageLocation); + BlobClient blobClient = blobContainerClient.getBlobClient(prefix); + BlockBlobClient blockBlobClient = blobClient.getBlockBlobClient(); + + // Get the BLOB system properties we care about + BlobProperties properties = blockBlobClient.getProperties(); + + // Get the user-defined BLOB metadata + Map userDefinedMetadata = properties.getMetadata(); + + long contentLength = properties.getBlobSize(); + String contentEncoding = properties.getContentEncoding(); + + // Construct an Azure metadata object + azureObjectMetadata = + new CommonObjectMetadata(contentLength, contentEncoding, userDefinedMetadata); + return azureObjectMetadata; } @@ -334,21 +329,28 @@ public void download( do { try { File localFile = new File(localFilePath); - CloudBlobContainer container = azStorageClient.getContainerReference(remoteStorageLocation); - CloudBlob blob = container.getBlockBlobReference(stageFilePath); + BlobContainerClient blobContainerClient = azBlobServiceClient.getBlobContainerClient(remoteStorageLocation); + BlobClient blobClient = blobContainerClient.getBlobClient(stageFilePath); + BlockBlobClient blockBlobClient = blobClient.getBlockBlobClient(); - BlobRequestOptions transferOptions = new BlobRequestOptions(); - transferOptions.setConcurrentRequestCount(parallelism); + BlobDownloadToFileOptions downloadOptions = new BlobDownloadToFileOptions(localFilePath) + .setParallelTransferOptions(new com.azure.storage.common.ParallelTransferOptions().setMaxConcurrency(parallelism)); + + Response response = blockBlobClient.downloadToFileWithResponse(downloadOptions, Duration.ofSeconds(13), null); + response.getValue().getMetadata(); - blob.downloadToFile(localFilePath, null, transferOptions, opContext); stopwatch.stop(); long downloadMillis = stopwatch.elapsedMillis(); // Pull object metadata from Azure - blob.downloadAttributes(null, transferOptions, opContext); + //blob.downloadAttributes(null, transferOptions, opContext); // Get the user-defined BLOB metadata - Map userDefinedMetadata = blob.getMetadata(); + Map userDefinedMetadata = blockBlobClient.getProperties().getMetadata(); + logger.info("Received user metadata for stageFilePath: {}", stageFilePath); + userDefinedMetadata.forEach((key, value) -> { + logger.info("UserDefinedMetadata key: {}, value: {}", key, value); + }); AbstractMap.SimpleEntry encryptionData = parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId); @@ -365,6 +367,7 @@ public void download( SqlState.INTERNAL_ERROR, "File metadata incomplete"); } + logger.info("encryption data: key: {}, iv: {}", key, iv); // Decrypt file try { @@ -440,14 +443,14 @@ public InputStream downloadToStream( do { try { - CloudBlobContainer container = azStorageClient.getContainerReference(remoteStorageLocation); - - CloudBlob blob = container.getBlockBlobReference(stageFilePath); + BlobContainerClient blobContainerClient = azBlobServiceClient.getBlobContainerClient(remoteStorageLocation); + BlobClient blobClient = blobContainerClient.getBlobClient(stageFilePath); + BlockBlobClient blockBlobClient = blobClient.getBlockBlobClient(); - InputStream stream = blob.openInputStream(null, null, opContext); + InputStream stream = blockBlobClient.openInputStream(); stopwatch.stop(); long downloadMillis = stopwatch.elapsedMillis(); - Map userDefinedMetadata = blob.getMetadata(); + Map userDefinedMetadata = blockBlobClient.getProperties().getMetadata(); AbstractMap.SimpleEntry encryptionData = parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId); @@ -569,22 +572,26 @@ public void upload( stopwatch.start(); do { try { + logger.info("Try uploading retryCount: {}", retryCount); InputStream fileInputStream = uploadStreamInfo.left; - CloudBlobContainer container = azStorageClient.getContainerReference(remoteStorageLocation); - CloudBlockBlob blob = container.getBlockBlobReference(destFileName); + BlobContainerClient blobContainerClient = azBlobServiceClient.getBlobContainerClient(remoteStorageLocation); + BlobClient blobClient = blobContainerClient.getBlobClient(destFileName); // Set the user-defined/Snowflake metadata and upload the BLOB - blob.setMetadata((HashMap) meta.getUserMetadata()); + logger.info("Uploading file: {}, with metadata:", destFileName); + meta.getUserMetadata().forEach((k, v) -> { + logger.info("User metadata key: {}, value: {}", k, v); + }); - BlobRequestOptions transferOptions = new BlobRequestOptions(); - transferOptions.setConcurrentRequestCount(parallelism); + BlobParallelUploadOptions parallelUploadOptions = new BlobParallelUploadOptions(fileInputStream) + .setParallelTransferOptions(new ParallelTransferOptions().setMaxConcurrency(parallelism)) + .setMetadata(meta.getUserMetadata());//this might be redundant - blob.upload( - fileInputStream, // input stream to upload from - -1, // -1 indicates an unknown stream length - null, - transferOptions, - opContext); + logger.info("parallelism: {}", parallelism); + + blobClient.uploadWithResponse(parallelUploadOptions, Duration.ofSeconds(13), null); + + logger.info("Finished uploading retryCount: {}", retryCount); stopwatch.stop(); if (uploadFromStream) { @@ -602,7 +609,9 @@ public void upload( retryCount); } - blob.uploadMetadata(null, transferOptions, opContext); + // TODO: Should be done already +// blob.setMetadata(meta.getUserMetadata()); +// blob.uploadMetadata(null, transferOptions, opContext); // close any open streams in the "toClose" list and return for (FileInputStream is : toClose) { @@ -611,6 +620,7 @@ public void upload( return; } catch (Exception ex) { + logger.error("Error while uploading file", ex); handleAzureException(ex, ++retryCount, "upload", session, command, this, queryId); if (uploadFromStream && fileBackedOutputStream == null) { @@ -794,10 +804,10 @@ private static void handleAzureException( SnowflakeFileTransferAgent.throwNoSpaceLeftError(session, operation, ex, queryId); } - if (ex instanceof StorageException) { - StorageException se = (StorageException) ex; + if (ex instanceof BlobStorageException) { + BlobStorageException se = (BlobStorageException) ex; - if (((StorageException) ex).getHttpStatusCode() == 403) { + if (((BlobStorageException) ex).getStatusCode() == 403) { // A 403 indicates that the SAS token has expired, // we need to refresh the Azure client with the new token if (session != null) { @@ -806,7 +816,7 @@ private static void handleAzureException( // If session is null we cannot renew the token so throw the ExpiredToken exception throw new SnowflakeSQLException( queryId, - se.getErrorCode(), + se.getErrorCode().toString(), CLOUD_STORAGE_CREDENTIALS_EXPIRED, "Azure credentials may have expired"); } @@ -814,7 +824,7 @@ private static void handleAzureException( // If we have exceeded the max number of retries, propagate the error // no need for back off and retry if the file does not exist if (retryCount > azClient.getMaxRetries() - || ((StorageException) ex).getHttpStatusCode() == 404) { + || ((BlobStorageException) ex).getStatusCode() == 404) { throw new SnowflakeSQLLoggedException( queryId, session, @@ -823,9 +833,10 @@ private static void handleAzureException( se, operation, se.getErrorCode(), - se.getHttpStatusCode(), + se.getStatusCode(), se.getMessage(), - FormatStorageExtendedErrorInformation(se.getExtendedErrorInformation())); +// FormatStorageExtendedErrorInformation(se.getServiceMessage())); + se.getServiceMessage()); } else { logger.debug( "Encountered exception ({}) during {}, retry count: {}", @@ -849,7 +860,7 @@ private static void handleAzureException( // ignore } - if (se.getHttpStatusCode() == 403) { + if (se.getStatusCode() == 403) { // A 403 indicates that the SAS token has expired, // we need to refresh the Azure client with the new token SnowflakeFileTransferAgent.renewExpiredToken(session, command, azClient); @@ -885,40 +896,40 @@ private static void handleAzureException( } } - /** - * Format the StorageExtendedErrorInformation to a String. - * - * @param info the StorageExtendedErrorInformation object - * @return - */ - static String FormatStorageExtendedErrorInformation(StorageExtendedErrorInformation info) { - if (info == null) { - return ""; - } - - StringBuilder sb = new StringBuilder(); - sb.append("StorageExceptionExtendedErrorInformation: {ErrorCode= "); - sb.append(info.getErrorCode()); - sb.append(", ErrorMessage= "); - sb.append(info.getErrorMessage()); - - HashMap details = info.getAdditionalDetails(); - if (details != null) { - sb.append(", AdditionalDetails= { "); - for (Map.Entry detail : details.entrySet()) { - sb.append(detail.getKey()); - sb.append("= "); - - for (String value : detail.getValue()) { - sb.append(value); - } - sb.append(","); - } - sb.setCharAt(sb.length() - 1, '}'); // overwrite the last comma - } - sb.append("}"); - return sb.toString(); - } +// /** +// * Format the StorageExtendedErrorInformation to a String. +// * +// * @param info the StorageExtendedErrorInformation object +// * @return +// */ +// static String FormatStorageExtendedErrorInformation(StorageExtendedErrorInformation info) { +// if (info == null) { +// return ""; +// } +// +// StringBuilder sb = new StringBuilder(); +// sb.append("StorageExceptionExtendedErrorInformation: {ErrorCode= "); +// sb.append(info.getErrorCode()); +// sb.append(", ErrorMessage= "); +// sb.append(info.getErrorMessage()); +// +// HashMap details = info.getAdditionalDetails(); +// if (details != null) { +// sb.append(", AdditionalDetails= { "); +// for (Map.Entry detail : details.entrySet()) { +// sb.append(detail.getKey()); +// sb.append("= "); +// +// for (String value : detail.getValue()) { +// sb.append(value); +// } +// sb.append(","); +// } +// sb.setCharAt(sb.length() - 1, '}'); // overwrite the last comma +// } +// sb.append("}"); +// return sb.toString(); +// } /* * Builds a URI to an Azure Storage account endpoint diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummary.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummary.java index 64fa08547..5dc82db5f 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummary.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummary.java @@ -4,28 +4,25 @@ package net.snowflake.client.jdbc.cloud.storage; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobItemProperties; +import com.azure.storage.blob.models.BlobStorageException; import com.google.cloud.storage.Blob; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.BlobProperties; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.ListBlobItem; -import java.net.URISyntaxException; -import java.util.Base64; + import net.snowflake.client.log.SFLogger; import net.snowflake.client.log.SFLoggerFactory; /** - * Storage platform agnostic class that encapsulates remote storage object properties + * Storage platform-agnostic class that encapsulates remote storage object properties * * @author lgiakoumakis */ public class StorageObjectSummary { private static final SFLogger logger = SFLoggerFactory.getLogger(StorageObjectSummary.class); - private String location; // location translates to "bucket" for S3 - private String key; - private String md5; - private long size; + private final String location; // location translates to "bucket" for S3 + private final String key; + private final String md5; + private final long size; /** * Constructs a StorageObjectSummary object from the S3 equivalent S3ObjectSummary @@ -64,14 +61,14 @@ public static StorageObjectSummary createFromS3ObjectSummary(S3ObjectSummary obj * Constructs a StorageObjectSummary object from Azure BLOB properties Using factory methods to * create these objects since Azure can throw, while retrieving the BLOB properties * - * @param listBlobItem an Azure ListBlobItem object + * @param blobItem Azure ListBlobItem object + * @param location * @return the ObjectSummary object created */ - public static StorageObjectSummary createFromAzureListBlobItem(ListBlobItem listBlobItem) + public static StorageObjectSummary createFromAzureListBlobItem(BlobItem blobItem, String location) throws StorageProviderException { - String location, key, md5; + String key, md5; long size; - CloudBlobContainer container; // Retrieve the BLOB properties that we need for the Summary // Azure Storage stores metadata inside each BLOB, therefore the listBlobItem @@ -79,16 +76,13 @@ public static StorageObjectSummary createFromAzureListBlobItem(ListBlobItem list // During the process the Storage Client could fail, hence we need to wrap the // get calls in try/catch and handle possible exceptions try { - container = listBlobItem.getContainer(); - location = container.getName(); - CloudBlob cloudBlob = (CloudBlob) listBlobItem; - key = cloudBlob.getName(); - BlobProperties blobProperties = cloudBlob.getProperties(); + key = blobItem.getName(); + BlobItemProperties blobProperties = blobItem.getProperties(); // the content md5 property is not always the actual md5 of the file. But for here, it's only // used for skipping file on PUT command, hence is ok. - md5 = convertBase64ToHex(blobProperties.getContentMD5()); - size = blobProperties.getLength(); - } catch (URISyntaxException | StorageException ex) { + md5 = convertBase64ToHex(blobProperties.getContentMd5()); + size = blobProperties.getContentLength(); + } catch (BlobStorageException ex) { // This should only happen if somehow we got here with and invalid URI (it should never // happen) // ...or there is a Storage service error. Unlike S3, Azure fetches metadata from the BLOB @@ -117,10 +111,8 @@ public static StorageObjectSummary createFromGcsBlob(Blob blob) { return new StorageObjectSummary(bucketName, path, hexMD5, size); } - private static String convertBase64ToHex(String base64String) { + private static String convertBase64ToHex(byte[] bytes) { try { - byte[] bytes = Base64.getDecoder().decode(base64String); - final StringBuilder builder = new StringBuilder(); for (byte b : bytes) { builder.append(String.format("%02x", b)); diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummaryCollection.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummaryCollection.java index 56f88cb26..4460988b0 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummaryCollection.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummaryCollection.java @@ -4,9 +4,9 @@ package net.snowflake.client.jdbc.cloud.storage; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.azure.storage.blob.models.BlobItem; import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; -import com.microsoft.azure.storage.blob.ListBlobItem; import java.util.Iterator; import java.util.List; @@ -17,6 +17,8 @@ */ public class StorageObjectSummaryCollection implements Iterable { + private String azStorageLocation = null; + private enum storageType { S3, AZURE, @@ -25,7 +27,7 @@ private enum storageType { private final storageType sType; private List s3ObjSummariesList = null; - private Iterable azCLoudBlobIterable = null; + private Iterable azCLoudBlobIterable = null; private Page gcsIterablePage = null; // Constructs platform-agnostic collection of object summaries from S3 object summaries @@ -36,8 +38,9 @@ public StorageObjectSummaryCollection(List s3ObjectSummaries) { // Constructs platform-agnostic collection of object summaries from an Azure CloudBlobDirectory // object - public StorageObjectSummaryCollection(Iterable azCLoudBlobIterable) { + public StorageObjectSummaryCollection(Iterable azCLoudBlobIterable, String remoteStorageLocation) { this.azCLoudBlobIterable = azCLoudBlobIterable; + this.azStorageLocation = remoteStorageLocation; sType = storageType.AZURE; } @@ -52,7 +55,10 @@ public Iterator iterator() { case S3: return new S3ObjectSummariesIterator(s3ObjSummariesList); case AZURE: - return new AzureObjectSummariesIterator(azCLoudBlobIterable); + if (azStorageLocation == null) { + throw new RuntimeException("Storage type is Azure but azStorageLocation field is not set. Should never happen"); + } + return new AzureObjectSummariesIterator(azCLoudBlobIterable, azStorageLocation); case GCS: return new GcsObjectSummariesIterator(this.gcsIterablePage); default: diff --git a/src/test/java/net/snowflake/client/core/CoreUtilsMiscellaneousTest.java b/src/test/java/net/snowflake/client/core/CoreUtilsMiscellaneousTest.java index 87df6c659..9b5b79cc0 100644 --- a/src/test/java/net/snowflake/client/core/CoreUtilsMiscellaneousTest.java +++ b/src/test/java/net/snowflake/client/core/CoreUtilsMiscellaneousTest.java @@ -11,9 +11,9 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; -import com.microsoft.azure.storage.OperationContext; +import com.azure.core.http.ProxyOptions; +import com.azure.core.util.HttpClientOptions; import java.net.InetSocketAddress; -import java.net.Proxy; import java.util.HashMap; import java.util.Properties; import net.snowflake.client.ConditionalIgnoreRule; @@ -90,7 +90,7 @@ public void testHttpClientSettingsKey() { HttpClientSettingsKey testKey3 = new HttpClientSettingsKey(OCSPMode.FAIL_CLOSED, "jdbc", false); // Assert the first 2 test keys are equal assertTrue(testKey1.equals(testKey2)); - // Assert that testKey2 has its non proxy hosts updated by the equals function + // Assert that testKey2 has its non-proxy hosts updated by the equals function assertEquals("*.foo.com", testKey2.getNonProxyHosts()); // Assert that the test key with the default options is different from the others assertFalse(testKey1.equals(testKey3)); @@ -148,7 +148,7 @@ public void testSetSessionlessProxyForS3() throws SnowflakeSQLException { @Test public void testSetProxyForAzure() { - OperationContext op = new OperationContext(); + HttpClientOptions httpClientOptions = new HttpClientOptions(); HttpClientSettingsKey testKey = new HttpClientSettingsKey( OCSPMode.FAIL_OPEN, @@ -160,29 +160,29 @@ public void testSetProxyForAzure() { "https", "jdbc", false); - HttpUtil.setProxyForAzure(testKey, op); - Proxy proxy = op.getProxy(); - assertEquals("testuser", op.getProxyUsername()); - assertEquals("pw", op.getProxyPassword()); - assertEquals(Proxy.Type.HTTP, proxy.type()); - assertEquals(new InetSocketAddress("snowflakecomputing.com", 443), proxy.address()); + HttpUtil.setProxyForAzure(testKey, httpClientOptions); + ProxyOptions proxyOptions = httpClientOptions.getProxyOptions(); + assertEquals("testuser", proxyOptions.getUsername()); + assertEquals("pw", proxyOptions.getPassword()); + assertEquals(ProxyOptions.Type.HTTP, proxyOptions.getType()); + assertEquals(new InetSocketAddress("snowflakecomputing.com", 443), proxyOptions.getAddress()); } @Test public void testSetSessionlessProxyForAzure() throws SnowflakeSQLException { + HttpClientOptions httpClientOptions = new HttpClientOptions(); Properties props = new Properties(); props.put("useProxy", "true"); props.put("proxyHost", "localhost"); props.put("proxyPort", "8084"); - OperationContext op = new OperationContext(); - HttpUtil.setSessionlessProxyForAzure(props, op); - Proxy proxy = op.getProxy(); - assertEquals(Proxy.Type.HTTP, proxy.type()); - assertEquals(new InetSocketAddress("localhost", 8084), proxy.address()); + HttpUtil.setSessionlessProxyForAzure(props, httpClientOptions); + ProxyOptions proxyOptions = httpClientOptions.getProxyOptions(); + assertEquals(ProxyOptions.Type.HTTP, proxyOptions.getType()); + assertEquals(new InetSocketAddress("localhost", 8084), proxyOptions.getAddress()); // Test that exception is thrown when port number is invalid props.put("proxyPort", "invalidnumber"); try { - HttpUtil.setSessionlessProxyForAzure(props, op); + HttpUtil.setSessionlessProxyForAzure(props, httpClientOptions); } catch (SnowflakeSQLException e) { assertEquals((int) ErrorCode.INVALID_PROXY_PROPERTIES.getMessageCode(), e.getErrorCode()); } diff --git a/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java b/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java index c0c5dc18d..251042d9b 100644 --- a/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java @@ -3,15 +3,19 @@ */ package net.snowflake.client.jdbc; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.StorageExtendedErrorInformation; import java.io.File; import java.io.IOException; import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.security.InvalidKeyException; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpResponse; +import com.azure.storage.blob.models.BlobStorageException; import net.snowflake.client.AbstractDriverIT; import net.snowflake.client.ConditionalIgnoreRule; import net.snowflake.client.RunningOnGithubAction; @@ -20,6 +24,7 @@ import net.snowflake.client.core.SFSession; import net.snowflake.client.core.SFStatement; import net.snowflake.client.jdbc.cloud.storage.SnowflakeAzureClient; +import org.apache.http.HttpStatus; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -28,6 +33,8 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** Test for SnowflakeAzureClient handle exception function */ @Category(TestCategoryOthers.class) @@ -59,13 +66,52 @@ public void setup() throws SQLException { spyingClient = Mockito.spy(client); } + private HttpResponse constructHttpResponse(int httpStatus) { + return new HttpResponse(null) { + @Override + public int getStatusCode() { + return httpStatus; + } + + @Override + public String getHeaderValue(String s) { + return ""; + } + + @Override + public HttpHeaders getHeaders() { + return null; + } + + @Override + public Flux getBody() { + return null; + } + + @Override + public Mono getBodyAsByteArray() { + return null; + } + + @Override + public Mono getBodyAsString() { + return null; + } + + @Override + public Mono getBodyAsString(Charset charset) { + return null; + } + }; + } + @Test @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void error403RenewExpired() throws SQLException, InterruptedException { // Unauthenticated, renew is called. spyingClient.handleStorageException( - new StorageException( - "403", "Unauthenticated", 403, new StorageExtendedErrorInformation(), new Exception()), + new BlobStorageException( + "Unauthenticated", constructHttpResponse(HttpStatus.SC_FORBIDDEN), new Exception()), 0, "upload", sfSession, @@ -82,11 +128,9 @@ public void error403RenewExpired() throws SQLException, InterruptedException { public void run() { try { spyingClient.handleStorageException( - new StorageException( - "403", + new BlobStorageException( "Unauthenticated", - 403, - new StorageExtendedErrorInformation(), + constructHttpResponse(HttpStatus.SC_FORBIDDEN), new Exception()), maxRetry, "upload", @@ -109,8 +153,8 @@ public void run() { @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void error403OverMaxRetryThrow() throws SQLException { spyingClient.handleStorageException( - new StorageException( - "403", "Unauthenticated", 403, new StorageExtendedErrorInformation(), new Exception()), + new BlobStorageException( + "Unauthenticated", constructHttpResponse(HttpStatus.SC_FORBIDDEN), new Exception()), overMaxRetry, "upload", sfSession, @@ -122,8 +166,8 @@ public void error403OverMaxRetryThrow() throws SQLException { @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void error403NullSession() throws SQLException { spyingClient.handleStorageException( - new StorageException( - "403", "Unauthenticated", 403, new StorageExtendedErrorInformation(), new Exception()), + new BlobStorageException( + "Unauthenticated", constructHttpResponse(HttpStatus.SC_FORBIDDEN), new Exception()), 0, "upload", null, @@ -182,10 +226,7 @@ public void errorNoSpaceLeftOnDevice() throws SQLException, IOException { String getCommand = "get @testPutGet_stage/" + TEST_DATA_FILE + " 'file://" + destFolderCanonicalPath + "'"; spyingClient.handleStorageException( - new StorageException( - "", - Constants.NO_SPACE_LEFT_ON_DEVICE_ERR, - new IOException(Constants.NO_SPACE_LEFT_ON_DEVICE_ERR)), + new IOException(Constants.NO_SPACE_LEFT_ON_DEVICE_ERR), 0, "download", null, diff --git a/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClientLatestIT.java b/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClientLatestIT.java index 93539005a..1447c93e0 100644 --- a/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClientLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClientLatestIT.java @@ -5,7 +5,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.spy; -import com.microsoft.azure.storage.blob.ListBlobItem; +import com.azure.storage.blob.models.BlobItem; import java.sql.Connection; import java.sql.SQLException; import net.snowflake.client.ConditionalIgnoreRule; @@ -42,8 +42,9 @@ public void testAzureClientSetupInvalidEncryptionKeySize() throws SQLException { @Test public void testCloudExceptionTest() { - Iterable mockList = null; - AzureObjectSummariesIterator iterator = new AzureObjectSummariesIterator(mockList); + Iterable mockList = null; + String azStorageLocation = "dummy_location"; + AzureObjectSummariesIterator iterator = new AzureObjectSummariesIterator(mockList, azStorageLocation); AzureObjectSummariesIterator spyIterator = spy(iterator); UnsupportedOperationException ex = assertThrows(UnsupportedOperationException.class, () -> spyIterator.remove()); diff --git a/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClientTest.java b/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClientTest.java index f0ba5b3d4..3ae538219 100644 --- a/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClientTest.java +++ b/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClientTest.java @@ -6,38 +6,40 @@ import static org.junit.Assert.assertEquals; -import com.microsoft.azure.storage.StorageExtendedErrorInformation; import java.util.LinkedHashMap; + +import org.junit.Ignore; import org.junit.Test; +@Ignore public class SnowflakeAzureClientTest { - @Test - public void testFormatStorageExtendedErrorInformation() { - String expectedStr0 = - "StorageExceptionExtendedErrorInformation: {ErrorCode= 403, ErrorMessage= Server refuses" - + " to authorize the request, AdditionalDetails= {}}"; - String expectedStr1 = - "StorageExceptionExtendedErrorInformation: {ErrorCode= 403, ErrorMessage= Server refuses" - + " to authorize the request, AdditionalDetails= { key1= helloworld,key2= ,key3=" - + " fakemessage}}"; - StorageExtendedErrorInformation info = new StorageExtendedErrorInformation(); - info.setErrorCode("403"); - info.setErrorMessage("Server refuses to authorize the request"); - String formatedStr = SnowflakeAzureClient.FormatStorageExtendedErrorInformation(info); - assertEquals(expectedStr0, formatedStr); - - LinkedHashMap map = new LinkedHashMap<>(); - map.put("key1", new String[] {"hello", "world"}); - map.put("key2", new String[] {}); - map.put("key3", new String[] {"fake", "message"}); - info.setAdditionalDetails(map); - formatedStr = SnowflakeAzureClient.FormatStorageExtendedErrorInformation(info); - assertEquals(expectedStr1, formatedStr); - } - - @Test - public void testFormatStorageExtendedErrorEmptyInformation() { - String formatedStr = SnowflakeAzureClient.FormatStorageExtendedErrorInformation(null); - assertEquals("", formatedStr); - } +// @Test +// public void testFormatStorageExtendedErrorInformation() { +// String expectedStr0 = +// "StorageExceptionExtendedErrorInformation: {ErrorCode= 403, ErrorMessage= Server refuses" +// + " to authorize the request, AdditionalDetails= {}}"; +// String expectedStr1 = +// "StorageExceptionExtendedErrorInformation: {ErrorCode= 403, ErrorMessage= Server refuses" +// + " to authorize the request, AdditionalDetails= { key1= helloworld,key2= ,key3=" +// + " fakemessage}}"; +// StorageExtendedErrorInformation info = new StorageExtendedErrorInformation(); +// info.setErrorCode("403"); +// info.setErrorMessage("Server refuses to authorize the request"); +// String formatedStr = SnowflakeAzureClient.FormatStorageExtendedErrorInformation(info); +// assertEquals(expectedStr0, formatedStr); +// +// LinkedHashMap map = new LinkedHashMap<>(); +// map.put("key1", new String[] {"hello", "world"}); +// map.put("key2", new String[] {}); +// map.put("key3", new String[] {"fake", "message"}); +// info.setAdditionalDetails(map); +// formatedStr = SnowflakeAzureClient.FormatStorageExtendedErrorInformation(info); +// assertEquals(expectedStr1, formatedStr); +// } +// +// @Test +// public void testFormatStorageExtendedErrorEmptyInformation() { +// String formatedStr = SnowflakeAzureClient.FormatStorageExtendedErrorInformation(null); +// assertEquals("", formatedStr); +// } } From d117a6bad04848fdb75e3dc73b514736204557c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kubik?= Date: Tue, 12 Nov 2024 11:49:49 +0100 Subject: [PATCH 4/4] add old api with storage sdk v8 to prevent BCR --- parent-pom.xml | 27 +++++++- pom.xml | 12 ++++ .../net/snowflake/client/core/HttpUtil.java | 68 +++++++++++++++++++ .../storage/AzureObjectSummariesIterator.java | 36 +++++++++- .../cloud/storage/StorageObjectSummary.java | 51 ++++++++++++++ .../StorageObjectSummaryCollection.java | 19 ++++-- ...akeAzureClientHandleExceptionLatestIT.java | 66 ++++++------------ 7 files changed, 223 insertions(+), 56 deletions(-) diff --git a/parent-pom.xml b/parent-pom.xml index 5b19171e5..6c9c3705c 100644 --- a/parent-pom.xml +++ b/parent-pom.xml @@ -26,6 +26,8 @@ 4.2.0 1.12.655 12.26.1 + 8.0.0 + 1.2.29 1.78.1 1.0.2.5 1.0.7 @@ -115,6 +117,13 @@ pom import + + com.azure + azure-sdk-bom + ${azuresdk.version} + pom + import + com.fasterxml.jackson jackson-bom @@ -201,9 +210,9 @@ ${google.guava.version} - com.azure - azure-storage-blob - ${azure.storage.blob.version} + com.microsoft.azure + azure-storage + ${azure.storage.version} com.nimbusds @@ -590,6 +599,18 @@ com.azure azure-storage-blob + + com.azure + azure-core + + + com.azure + azure-storage-common + + + com.microsoft.azure + azure-storage + com.nimbusds nimbus-jose-jwt diff --git a/pom.xml b/pom.xml index 1776dc0e8..5dfbf8def 100644 --- a/pom.xml +++ b/pom.xml @@ -815,6 +815,10 @@ com.azure ${shadeBase}.azure + + com.microsoft.azure + ${shadeBase}.microsoft.azure + com.fasterxml ${shadeBase}.fasterxml @@ -875,6 +879,14 @@ io.netty ${shadeBase}.io.netty + + io.projectreactor + ${shadeBase}.io.projectreactor + + + org.reactivestreams + ${shadeBase}.org.reactivestreams + com.carrotsearch ${shadeBase}.com.carrotsearch diff --git a/src/main/java/net/snowflake/client/core/HttpUtil.java b/src/main/java/net/snowflake/client/core/HttpUtil.java index 8b5f320e2..eaa5247b6 100644 --- a/src/main/java/net/snowflake/client/core/HttpUtil.java +++ b/src/main/java/net/snowflake/client/core/HttpUtil.java @@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import javax.net.ssl.TrustManager; + +import com.microsoft.azure.storage.OperationContext; import net.snowflake.client.jdbc.ErrorCode; import net.snowflake.client.jdbc.RestRequest; import net.snowflake.client.jdbc.SnowflakeDriver; @@ -179,6 +181,50 @@ public static void setSessionlessProxyForS3( S3HttpUtil.setSessionlessProxyForS3(proxyProperties, clientConfig); } + + /** + * A static function to set Azure proxy params for sessionless connections using the proxy params + * from the StageInfo + * + * @param proxyProperties proxy properties + * @param opContext the configuration needed by Azure to set the proxy + * @throws SnowflakeSQLException + * + * @deprecated, please use {@link HttpUtil#setSessionlessProxyForAzure(Properties, HttpClientOptions)} as + * it supports the up-to-date azure storage client. + */ + @Deprecated + public static void setSessionlessProxyForAzure( + Properties proxyProperties, OperationContext opContext) throws SnowflakeSQLException { + if (proxyProperties != null + && proxyProperties.size() > 0 + && proxyProperties.getProperty(SFSessionProperty.USE_PROXY.getPropertyKey()) != null) { + Boolean useProxy = + Boolean.valueOf( + proxyProperties.getProperty(SFSessionProperty.USE_PROXY.getPropertyKey())); + if (useProxy) { + String proxyHost = + proxyProperties.getProperty(SFSessionProperty.PROXY_HOST.getPropertyKey()); + int proxyPort; + try { + proxyPort = + Integer.parseInt( + proxyProperties.getProperty(SFSessionProperty.PROXY_PORT.getPropertyKey())); + } catch (NumberFormatException | NullPointerException e) { + throw new SnowflakeSQLException( + ErrorCode.INVALID_PROXY_PROPERTIES, "Could not parse port number"); + } + Proxy azProxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); + logger.debug("Setting sessionless Azure proxy. Host: {}, port: {}", proxyHost, proxyPort); + opContext.setProxy(azProxy); + } else { + logger.debug("Omitting sessionless Azure proxy setup as proxy is disabled"); + } + } else { + logger.debug("Omitting sessionless Azure proxy setup"); + } + } + /** * A static function to set Azure proxy params for sessionless connections using the proxy params * from the StageInfo @@ -237,6 +283,28 @@ public static void setSessionlessProxyForAzure( httpClientOptions.setProxyOptions(proxyOptions); } + /** + * A static function to set Azure proxy params when there is a valid session + * + * @param key key to HttpClient map containing OCSP and proxy info + * @param opContext the configuration needed by Azure to set the proxy + * + * @deprecated Use {@link HttpUtil#setProxyForAzure(HttpClientSettingsKey, OperationContext)} as it supports + * the most recent azure storage client + */ + @Deprecated + public static void setProxyForAzure(HttpClientSettingsKey key, OperationContext opContext) { + if (key != null && key.usesProxy()) { + Proxy azProxy = + new Proxy(Proxy.Type.HTTP, new InetSocketAddress(key.getProxyHost(), key.getProxyPort())); + logger.debug( + "Setting Azure proxy. Host: {}, port: {}", key.getProxyHost(), key.getProxyPort()); + opContext.setProxy(azProxy); + } else { + logger.debug("Omitting Azure proxy setup"); + } + } + /** * A static function to set Azure proxy params when there is a valid session * diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/AzureObjectSummariesIterator.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/AzureObjectSummariesIterator.java index 3f4b02729..7f284359c 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/AzureObjectSummariesIterator.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/AzureObjectSummariesIterator.java @@ -10,6 +10,8 @@ import java.util.Iterator; import java.util.NoSuchElementException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.ListBlobItem; import net.snowflake.client.log.SFLogger; import net.snowflake.client.log.SFLoggerFactory; @@ -24,6 +26,19 @@ public class AzureObjectSummariesIterator implements Iterator itemIterator; + Iterator listBlobItemIterator; + + /* + * Constructs a summaries iterator object from an iterable derived by a + * lostBlobs method + * @param azCloudBlobIterable an iterable set of ListBlobItems + */ + @Deprecated + public AzureObjectSummariesIterator(Iterable azCloudBlobIterable) { + storageLocation = null; + itemIterator = null; + listBlobItemIterator = azCloudBlobIterable.iterator(); + } /* * Constructs a summaries iterator object from an iterable derived by a @@ -33,6 +48,9 @@ public class AzureObjectSummariesIterator implements Iterator azCloudBlobIterable, String azStorageLocation) { itemIterator = azCloudBlobIterable.iterator(); storageLocation = azStorageLocation; + + // listBlobItem support is deprecated as it comes from azure storage v8 + listBlobItemIterator = null; } public boolean hasNext() { @@ -49,15 +67,27 @@ public boolean hasNext() { } public StorageObjectSummary next() { - BlobItem blobItem = itemIterator.next(); + if (itemIterator != null) { + BlobItem blobItem = itemIterator.next(); // if (!(blobItem.getProperties().getBlobType() instanceof BlobClient)) { // // The only other possible type would a CloudDirectory // // This should never happen since we are listing items as a flat list // logger.debug("Unexpected listBlobItem instance type: {}", blobItem.getClass()); // throw new IllegalArgumentException("Unexpected listBlobItem instance type"); // } - - return StorageObjectSummary.createFromAzureListBlobItem(blobItem, storageLocation); + return StorageObjectSummary.createFromAzureListBlobItem(blobItem, storageLocation); + } + else if (listBlobItemIterator != null) { + ListBlobItem listBlobItem = listBlobItemIterator.next(); + if (!(listBlobItem instanceof CloudBlob)) { + // The only other possible type would a CloudDirectory + // This should never happen since we are listing items as a flat list + logger.debug("Unexpected listBlobItem instance type: {}", listBlobItem.getClass()); + throw new IllegalArgumentException("Unexpected listBlobItem instance type"); + } + return StorageObjectSummary.createFromAzureListBlobItem(listBlobItem); + } + throw new RuntimeException("No azure blob iterator was initialized, should never happen"); } public void remove() { diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummary.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummary.java index 5dc82db5f..d672f7061 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummary.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummary.java @@ -9,9 +9,16 @@ import com.azure.storage.blob.models.BlobStorageException; import com.google.cloud.storage.Blob; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.ListBlobItem; import net.snowflake.client.log.SFLogger; import net.snowflake.client.log.SFLoggerFactory; +import java.net.URISyntaxException; + /** * Storage platform-agnostic class that encapsulates remote storage object properties * @@ -57,6 +64,50 @@ public static StorageObjectSummary createFromS3ObjectSummary(S3ObjectSummary obj objSummary.getSize()); } + /** + * Constructs a StorageObjectSummary object from Azure BLOB properties Using factory methods to + * create these objects since Azure can throw, while retrieving the BLOB properties + * + * @param listBlobItem an Azure ListBlobItem object + * @return the ObjectSummary object created + */ + @Deprecated + public static StorageObjectSummary createFromAzureListBlobItem(ListBlobItem listBlobItem) + throws StorageProviderException { + String location, key, md5; + long size; + CloudBlobContainer container; + + // Retrieve the BLOB properties that we need for the Summary + // Azure Storage stores metadata inside each BLOB, therefore the listBlobItem + // will point us to the underlying BLOB and will get the properties from it + // During the process the Storage Client could fail, hence we need to wrap the + // get calls in try/catch and handle possible exceptions + try { + container = listBlobItem.getContainer(); + location = container.getName(); + CloudBlob cloudBlob = (CloudBlob) listBlobItem; + key = cloudBlob.getName(); + BlobProperties blobProperties = cloudBlob.getProperties(); + // the content md5 property is not always the actual md5 of the file. But for here, it's only + // used for skipping file on PUT command, hence is ok. + md5 = convertBase64ToHex(blobProperties.getContentMD5().getBytes()); + size = blobProperties.getLength(); + } catch (URISyntaxException | StorageException ex) { + // This should only happen if somehow we got here with and invalid URI (it should never + // happen) + // ...or there is a Storage service error. Unlike S3, Azure fetches metadata from the BLOB + // itself, + // and its a lazy operation + logger.debug("Failed to create StorageObjectSummary from Azure ListBlobItem: {}", ex); + throw new StorageProviderException(ex); + } catch (Throwable th) { + logger.debug("Failed to create StorageObjectSummary from Azure ListBlobItem: {}", th); + throw th; + } + return new StorageObjectSummary(location, key, md5, size); + } + /** * Constructs a StorageObjectSummary object from Azure BLOB properties Using factory methods to * create these objects since Azure can throw, while retrieving the BLOB properties diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummaryCollection.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummaryCollection.java index 4460988b0..4a454332b 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummaryCollection.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/StorageObjectSummaryCollection.java @@ -7,6 +7,8 @@ import com.azure.storage.blob.models.BlobItem; import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; +import com.microsoft.azure.storage.blob.ListBlobItem; + import java.util.Iterator; import java.util.List; @@ -27,7 +29,8 @@ private enum storageType { private final storageType sType; private List s3ObjSummariesList = null; - private Iterable azCLoudBlobIterable = null; + private Iterable azCLoudBlobIterable = null; + private Iterable azCloudBlobItemIterable = null; private Page gcsIterablePage = null; // Constructs platform-agnostic collection of object summaries from S3 object summaries @@ -38,12 +41,17 @@ public StorageObjectSummaryCollection(List s3ObjectSummaries) { // Constructs platform-agnostic collection of object summaries from an Azure CloudBlobDirectory // object - public StorageObjectSummaryCollection(Iterable azCLoudBlobIterable, String remoteStorageLocation) { - this.azCLoudBlobIterable = azCLoudBlobIterable; + public StorageObjectSummaryCollection(Iterable azCLoudBlobItemIterable, String remoteStorageLocation) { + this.azCloudBlobItemIterable = azCLoudBlobItemIterable; this.azStorageLocation = remoteStorageLocation; sType = storageType.AZURE; } + public StorageObjectSummaryCollection(Iterable azCLoudBlobIterable) { + this.azCLoudBlobIterable = azCLoudBlobIterable; + sType = storageType.AZURE; + } + public StorageObjectSummaryCollection(Page gcsIterablePage) { this.gcsIterablePage = gcsIterablePage; sType = storageType.GCS; @@ -56,9 +64,12 @@ public Iterator iterator() { return new S3ObjectSummariesIterator(s3ObjSummariesList); case AZURE: if (azStorageLocation == null) { + if (azCLoudBlobIterable != null) { + return new AzureObjectSummariesIterator(azCLoudBlobIterable); + } throw new RuntimeException("Storage type is Azure but azStorageLocation field is not set. Should never happen"); } - return new AzureObjectSummariesIterator(azCLoudBlobIterable, azStorageLocation); + return new AzureObjectSummariesIterator(azCloudBlobItemIterable, azStorageLocation); case GCS: return new GcsObjectSummariesIterator(this.gcsIterablePage); default: diff --git a/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java b/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java index 251042d9b..8827f1bb8 100644 --- a/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java @@ -6,8 +6,6 @@ import java.io.File; import java.io.IOException; import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.security.InvalidKeyException; import java.sql.Connection; import java.sql.SQLException; @@ -32,12 +30,16 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.Mockito; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.Mockito.when; /** Test for SnowflakeAzureClient handle exception function */ @Category(TestCategoryOthers.class) +@RunWith(MockitoJUnitRunner.class) public class SnowflakeAzureClientHandleExceptionLatestIT extends AbstractDriverIT { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); private Connection connection; @@ -47,6 +49,13 @@ public class SnowflakeAzureClientHandleExceptionLatestIT extends AbstractDriverI private SnowflakeAzureClient spyingClient; private int overMaxRetry; private int maxRetry; + @Mock + HttpResponse mockHttpResponse; + + /** + * @see https://learn.microsoft.com/en-us/rest/api/storageservices/status-and-error-codes2 + */ + private final static String AZURE_ERROR_CODE_HEADER = "x-ms-error-code"; @Before public void setup() throws SQLException { @@ -64,45 +73,10 @@ public void setup() throws SQLException { maxRetry = client.getMaxRetries(); overMaxRetry = maxRetry + 1; spyingClient = Mockito.spy(client); - } - private HttpResponse constructHttpResponse(int httpStatus) { - return new HttpResponse(null) { - @Override - public int getStatusCode() { - return httpStatus; - } - - @Override - public String getHeaderValue(String s) { - return ""; - } - - @Override - public HttpHeaders getHeaders() { - return null; - } - - @Override - public Flux getBody() { - return null; - } - - @Override - public Mono getBodyAsByteArray() { - return null; - } - - @Override - public Mono getBodyAsString() { - return null; - } - - @Override - public Mono getBodyAsString(Charset charset) { - return null; - } - }; + when(mockHttpResponse.getStatusCode()).thenReturn(HttpStatus.SC_FORBIDDEN); + when(mockHttpResponse.getHeaders()).thenReturn(new HttpHeaders() + .add(AZURE_ERROR_CODE_HEADER, "InvalidAuthenticationInfo")); } @Test @@ -111,7 +85,7 @@ public void error403RenewExpired() throws SQLException, InterruptedException { // Unauthenticated, renew is called. spyingClient.handleStorageException( new BlobStorageException( - "Unauthenticated", constructHttpResponse(HttpStatus.SC_FORBIDDEN), new Exception()), + "Unauthenticated", mockHttpResponse, new Exception()), 0, "upload", sfSession, @@ -130,7 +104,7 @@ public void run() { spyingClient.handleStorageException( new BlobStorageException( "Unauthenticated", - constructHttpResponse(HttpStatus.SC_FORBIDDEN), + mockHttpResponse, new Exception()), maxRetry, "upload", @@ -154,7 +128,7 @@ public void run() { public void error403OverMaxRetryThrow() throws SQLException { spyingClient.handleStorageException( new BlobStorageException( - "Unauthenticated", constructHttpResponse(HttpStatus.SC_FORBIDDEN), new Exception()), + "Unauthenticated", mockHttpResponse, new Exception()), overMaxRetry, "upload", sfSession, @@ -167,7 +141,7 @@ public void error403OverMaxRetryThrow() throws SQLException { public void error403NullSession() throws SQLException { spyingClient.handleStorageException( new BlobStorageException( - "Unauthenticated", constructHttpResponse(HttpStatus.SC_FORBIDDEN), new Exception()), + "Unauthenticated", mockHttpResponse, new Exception()), 0, "upload", null,