diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs index 6f3c87291..554d0c2a9 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs @@ -514,8 +514,8 @@ public void TestDefaultLoginTimeout() // Should timeout after the default timeout (300 sec) Assert.GreaterOrEqual(stopwatch.ElapsedMilliseconds, conn.ConnectionTimeout * 1000 - delta); - // But never more because there's no connection timeout remaining - Assert.LessOrEqual(stopwatch.ElapsedMilliseconds, (conn.ConnectionTimeout + 1) * 1000); + // But never more because there's no connection timeout remaining (with 2 seconds margin) + Assert.LessOrEqual(stopwatch.ElapsedMilliseconds, (conn.ConnectionTimeout + 2) * 1000); } } } @@ -2015,8 +2015,8 @@ public void TestAsyncDefaultLoginTimeout() // Should timeout after the default timeout (300 sec) Assert.GreaterOrEqual(stopwatch.ElapsedMilliseconds, conn.ConnectionTimeout * 1000 - delta); - // But never more because there's no connection timeout remaining - Assert.LessOrEqual(stopwatch.ElapsedMilliseconds, (conn.ConnectionTimeout + 1) * 1000); + // But never more because there's no connection timeout remaining (with 2 seconds margin) + Assert.LessOrEqual(stopwatch.ElapsedMilliseconds, (conn.ConnectionTimeout + 2) * 1000); Assert.AreEqual(ConnectionState.Closed, conn.State); Assert.AreEqual(SFSessionHttpClientProperties.DefaultRetryTimeout.TotalSeconds, conn.ConnectionTimeout); diff --git a/Snowflake.Data.Tests/Mock/MockS3Client.cs b/Snowflake.Data.Tests/Mock/MockS3Client.cs index 02d08ca63..8a17efd30 100644 --- a/Snowflake.Data.Tests/Mock/MockS3Client.cs +++ b/Snowflake.Data.Tests/Mock/MockS3Client.cs @@ -35,22 +35,23 @@ class MockS3Client internal const int ContentLength = 9999; // Create AWS exception for mock requests - static Exception CreateMockAwsResponseError(string errorCode, bool isAsync) + static Exception CreateMockAwsResponseError(string awsErrorCode, bool isAsync) { - AmazonS3Exception awsError = new AmazonS3Exception(S3ErrorMessage); - awsError.ErrorCode = errorCode; + Exception exception = awsErrorCode.Length > 0 + ? new AmazonS3Exception(S3ErrorMessage) { ErrorCode = awsErrorCode } + : new Exception("Non-AWS exception"); if (isAsync) { - return awsError; // S3 throws the AmazonS3Exception on async calls + return exception; // S3 throws the AmazonS3Exception on async calls } - Exception exceptionContainingS3Error = new Exception(S3ErrorMessage, awsError); + Exception exceptionContainingS3Error = new Exception(S3ErrorMessage, exception); return exceptionContainingS3Error; // S3 places the AmazonS3Exception on the InnerException property on non-async calls } // Create mock response for GetFileHeader - static internal Task CreateResponseForGetFileHeader(string statusCode, bool isAsync) + internal static Task CreateResponseForGetFileHeader(string statusCode, bool isAsync) { if (statusCode == HttpStatusCode.OK.ToString()) { @@ -70,20 +71,20 @@ static internal Task CreateResponseForGetFileHeader(string st } // Create mock response for UploadFile - static internal Task CreateResponseForUploadFile(string statusCode, bool isAsync) + internal static Task CreateResponseForUploadFile(string awsStatusCode, bool isAsync) { - if (statusCode == HttpStatusCode.OK.ToString()) + if (awsStatusCode == AwsStatusOk) { return Task.FromResult(new PutObjectResponse()); } else { - throw CreateMockAwsResponseError(statusCode, isAsync); + throw CreateMockAwsResponseError(awsStatusCode, isAsync); } } // Create mock response for DownloadFile - static internal Task CreateResponseForDownloadFile(string statusCode, bool isAsync) + internal static Task CreateResponseForDownloadFile(string statusCode, bool isAsync) { if (statusCode == HttpStatusCode.OK.ToString()) { diff --git a/Snowflake.Data.Tests/UnitTests/SFS3ClientTest.cs b/Snowflake.Data.Tests/UnitTests/SFS3ClientTest.cs index 50faae758..54647db8b 100644 --- a/Snowflake.Data.Tests/UnitTests/SFS3ClientTest.cs +++ b/Snowflake.Data.Tests/UnitTests/SFS3ClientTest.cs @@ -127,18 +127,15 @@ public void TestExtractBucketNameAndPath() [TestCase(SFS3Client.EXPIRED_TOKEN, ResultStatus.RENEW_TOKEN)] [TestCase(SFS3Client.NO_SUCH_KEY, ResultStatus.NOT_FOUND_FILE)] [TestCase(MockS3Client.AwsStatusError, ResultStatus.ERROR)] // Any error that isn't the above will return ResultStatus.ERROR - public void TestGetFileHeader(string requestKey, ResultStatus expectedResultStatus) + [TestCase("", ResultStatus.ERROR)] // For non-AWS exception will return ResultStatus.ERROR + public void TestGetFileHeader(string awsStatusCode, ResultStatus expectedResultStatus) { // Arrange var mockAmazonS3Client = new Mock(AwsKeyId, AwsSecretKey, AwsToken, _clientConfig); mockAmazonS3Client.Setup(client => client.GetObjectAsync(It.IsAny(), It.IsAny())) - .Returns((request, cancellationToken) => - { - return MockS3Client.CreateResponseForGetFileHeader(request.BucketName, false); - }); + .Returns(() => MockS3Client.CreateResponseForGetFileHeader(awsStatusCode, false)); _client = new SFS3Client(_fileMetadata.stageInfo, MaxRetry, Parallel, _proxyCredentials, mockAmazonS3Client.Object); _fileMetadata.client = _client; - _fileMetadata.stageInfo.location = requestKey; // Act FileHeader fileHeader = _client.GetFileHeader(_fileMetadata); @@ -152,18 +149,15 @@ public void TestGetFileHeader(string requestKey, ResultStatus expectedResultStat [TestCase(SFS3Client.EXPIRED_TOKEN, ResultStatus.RENEW_TOKEN)] [TestCase(SFS3Client.NO_SUCH_KEY, ResultStatus.NOT_FOUND_FILE)] [TestCase(MockS3Client.AwsStatusError, ResultStatus.ERROR)] // Any error that isn't the above will return ResultStatus.ERROR - public async Task TestGetFileHeaderAsync(string requestKey, ResultStatus expectedResultStatus) + [TestCase("", ResultStatus.ERROR)] // For non-AWS exception will return ResultStatus.ERROR + public async Task TestGetFileHeaderAsync(string awsStatusCode, ResultStatus expectedResultStatus) { // Arrange var mockAmazonS3Client = new Mock(AwsKeyId, AwsSecretKey, AwsToken, _clientConfig); mockAmazonS3Client.Setup(client => client.GetObjectAsync(It.IsAny(), It.IsAny())) - .Returns((request, cancellationToken) => - { - return MockS3Client.CreateResponseForGetFileHeader(request.BucketName, true); - }); + .Returns(() => MockS3Client.CreateResponseForGetFileHeader(awsStatusCode, true)); _client = new SFS3Client(_fileMetadata.stageInfo, MaxRetry, Parallel, _proxyCredentials, mockAmazonS3Client.Object); _fileMetadata.client = _client; - _fileMetadata.stageInfo.location = requestKey; // Act FileHeader fileHeader = await _client.GetFileHeaderAsync(_fileMetadata, _cancellationToken).ConfigureAwait(false); @@ -194,18 +188,15 @@ private void AssertForGetFileHeaderTests(ResultStatus expectedResultStatus, File [TestCase(MockS3Client.AwsStatusOk, ResultStatus.UPLOADED)] [TestCase(SFS3Client.EXPIRED_TOKEN, ResultStatus.RENEW_TOKEN)] [TestCase(MockS3Client.AwsStatusError, ResultStatus.NEED_RETRY)] // Any error that isn't the above will return ResultStatus.NEED_RETRY - public void TestUploadFile(string requestKey, ResultStatus expectedResultStatus) + [TestCase("", ResultStatus.NEED_RETRY)] // For non-AWS exception will return ResultStatus.NEED_RETRY + public void TestUploadFile(string awsStatusCode, ResultStatus expectedResultStatus) { // Arrange var mockAmazonS3Client = new Mock(AwsKeyId, AwsSecretKey, AwsToken, _clientConfig); mockAmazonS3Client.Setup(client => client.PutObjectAsync(It.IsAny(), It.IsAny())) - .Returns((request, cancellationToken) => - { - return MockS3Client.CreateResponseForUploadFile(request.BucketName, false); - }); + .Returns(() => MockS3Client.CreateResponseForUploadFile(awsStatusCode, false)); _client = new SFS3Client(_fileMetadata.stageInfo, MaxRetry, Parallel, _proxyCredentials, mockAmazonS3Client.Object); _fileMetadata.client = _client; - _fileMetadata.stageInfo.location = requestKey; _fileMetadata.uploadSize = UploadFileSize; // Act @@ -254,18 +245,15 @@ public void TestAppendHttpsToEndpointWithBrackets() [TestCase(MockS3Client.AwsStatusOk, ResultStatus.UPLOADED)] [TestCase(SFS3Client.EXPIRED_TOKEN, ResultStatus.RENEW_TOKEN)] [TestCase(MockS3Client.AwsStatusError, ResultStatus.NEED_RETRY)] // Any error that isn't the above will return ResultStatus.NEED_RETRY - public async Task TestUploadFileAsync(string requestKey, ResultStatus expectedResultStatus) + [TestCase("", ResultStatus.NEED_RETRY)] // For non-AWS exception will return ResultStatus.NEED_RETRY + public async Task TestUploadFileAsync(string awsStatusCode, ResultStatus expectedResultStatus) { // Arrange var mockAmazonS3Client = new Mock(AwsKeyId, AwsSecretKey, AwsToken, _clientConfig); mockAmazonS3Client.Setup(client => client.PutObjectAsync(It.IsAny(), It.IsAny())) - .Returns((request, cancellationToken) => - { - return MockS3Client.CreateResponseForUploadFile(request.BucketName, true); - }); + .Returns(() => MockS3Client.CreateResponseForUploadFile(awsStatusCode, true)); _client = new SFS3Client(_fileMetadata.stageInfo, MaxRetry, Parallel, _proxyCredentials, mockAmazonS3Client.Object); _fileMetadata.client = _client; - _fileMetadata.stageInfo.location = requestKey; _fileMetadata.uploadSize = UploadFileSize; // Act @@ -295,18 +283,15 @@ private void AssertForUploadFileTests(ResultStatus expectedResultStatus) [TestCase(MockS3Client.AwsStatusOk, ResultStatus.DOWNLOADED)] [TestCase(SFS3Client.EXPIRED_TOKEN, ResultStatus.RENEW_TOKEN)] [TestCase(MockS3Client.AwsStatusError, ResultStatus.NEED_RETRY)] // Any error that isn't the above will return ResultStatus.NEED_RETRY - public void TestDownloadFile(string requestKey, ResultStatus expectedResultStatus) + [TestCase("", ResultStatus.NEED_RETRY)] // For non-AWS exception will return ResultStatus.NEED_RETRY + public void TestDownloadFile(string awsStatusCode, ResultStatus expectedResultStatus) { // Arrange var mockAmazonS3Client = new Mock(AwsKeyId, AwsSecretKey, AwsToken, _clientConfig); mockAmazonS3Client.Setup(client => client.GetObjectAsync(It.IsAny(), It.IsAny())) - .Returns((request, cancellationToken) => - { - return MockS3Client.CreateResponseForDownloadFile(request.BucketName, false); - }); + .Returns(() => MockS3Client.CreateResponseForDownloadFile(awsStatusCode, false)); _client = new SFS3Client(_fileMetadata.stageInfo, MaxRetry, Parallel, _proxyCredentials, mockAmazonS3Client.Object); _fileMetadata.client = _client; - _fileMetadata.stageInfo.location = requestKey; // Act _client.DownloadFile(_fileMetadata, t_downloadFileName, Parallel); @@ -319,18 +304,15 @@ public void TestDownloadFile(string requestKey, ResultStatus expectedResultStatu [TestCase(MockS3Client.AwsStatusOk, ResultStatus.DOWNLOADED)] [TestCase(SFS3Client.EXPIRED_TOKEN, ResultStatus.RENEW_TOKEN)] [TestCase(MockS3Client.AwsStatusError, ResultStatus.NEED_RETRY)] // Any error that isn't the above will return ResultStatus.NEED_RETRY - public async Task TestDownloadFileAsync(string requestKey, ResultStatus expectedResultStatus) + [TestCase("", ResultStatus.NEED_RETRY)] // For non-AWS exception will return ResultStatus.NEED_RETRY + public async Task TestDownloadFileAsync(string awsStatusCode, ResultStatus expectedResultStatus) { // Arrange var mockAmazonS3Client = new Mock(AwsKeyId, AwsSecretKey, AwsToken, _clientConfig); mockAmazonS3Client.Setup(client => client.GetObjectAsync(It.IsAny(), It.IsAny())) - .Returns((request, cancellationToken) => - { - return MockS3Client.CreateResponseForDownloadFile(request.BucketName, true); - }); + .Returns(() => MockS3Client.CreateResponseForDownloadFile(awsStatusCode, true)); _client = new SFS3Client(_fileMetadata.stageInfo, MaxRetry, Parallel, _proxyCredentials, mockAmazonS3Client.Object); _fileMetadata.client = _client; - _fileMetadata.stageInfo.location = requestKey; // Act await _client.DownloadFileAsync(_fileMetadata, t_downloadFileName, Parallel, _cancellationToken).ConfigureAwait(false); diff --git a/Snowflake.Data/Core/FileTransfer/StorageClient/SFS3Client.cs b/Snowflake.Data/Core/FileTransfer/StorageClient/SFS3Client.cs index 60d67b5d7..b6896cc79 100644 --- a/Snowflake.Data/Core/FileTransfer/StorageClient/SFS3Client.cs +++ b/Snowflake.Data/Core/FileTransfer/StorageClient/SFS3Client.cs @@ -206,7 +206,7 @@ public FileHeader GetFileHeader(SFFileMetadata fileMetadata) } catch (Exception ex) { - fileMetadata = HandleFileHeaderErr(ex.InnerException, fileMetadata); // S3 places the AmazonS3Exception on the InnerException on non-async calls + HandleFileHeaderErr(ex.InnerException, fileMetadata); // S3 places the AmazonS3Exception on the InnerException on non-async calls return null; } } @@ -233,7 +233,7 @@ public async Task GetFileHeaderAsync(SFFileMetadata fileMetadata, Ca } catch (Exception ex) { - fileMetadata = HandleFileHeaderErr(ex, fileMetadata); // S3 throws the AmazonS3Exception on async calls + HandleFileHeaderErr(ex, fileMetadata); // S3 throws the AmazonS3Exception on async calls return null; } @@ -363,7 +363,7 @@ public void UploadFile(SFFileMetadata fileMetadata, Stream fileBytesStream, SFEn } catch (Exception ex) { - fileMetadata = HandleUploadFileErr(ex.InnerException, fileMetadata); + HandleUploadFileErr(ex.InnerException, fileMetadata); return; } @@ -391,7 +391,7 @@ public async Task UploadFileAsync(SFFileMetadata fileMetadata, Stream fileBytesS } catch (Exception ex) { - fileMetadata = HandleUploadFileErr(ex, fileMetadata); + HandleUploadFileErr(ex, fileMetadata); return; } @@ -461,7 +461,7 @@ public void DownloadFile(SFFileMetadata fileMetadata, string fullDstPath, int ma } catch (Exception ex) { - fileMetadata = HandleDownloadFileErr(ex.InnerException, fileMetadata); + HandleDownloadFileErr(ex.InnerException, fileMetadata); return; } @@ -494,7 +494,7 @@ public async Task DownloadFileAsync(SFFileMetadata fileMetadata, string fullDstP } catch (Exception ex) { - fileMetadata = HandleDownloadFileErr(ex, fileMetadata); + HandleDownloadFileErr(ex, fileMetadata); return; } @@ -519,25 +519,31 @@ private GetObjectRequest GetGetObjectRequest(ref AmazonS3Client client, SFFileMe /// /// Exception from file header. /// The file metadata. - /// The file metadata. - private SFFileMetadata HandleFileHeaderErr(Exception ex, SFFileMetadata fileMetadata) + private void HandleFileHeaderErr(Exception ex, SFFileMetadata fileMetadata) { Logger.Error("Failed to get file header: " + ex.Message); - AmazonS3Exception err = (AmazonS3Exception)ex; - if (err.ErrorCode == EXPIRED_TOKEN || err.ErrorCode == HttpStatusCode.BadRequest.ToString()) + switch (ex) { - fileMetadata.resultStatus = ResultStatus.RENEW_TOKEN.ToString(); - } - else if (err.ErrorCode == NO_SUCH_KEY) - { - fileMetadata.resultStatus = ResultStatus.NOT_FOUND_FILE.ToString(); - } - else - { - fileMetadata.resultStatus = ResultStatus.ERROR.ToString(); + case AmazonS3Exception exAws: + if (exAws.ErrorCode == EXPIRED_TOKEN || exAws.ErrorCode == HttpStatusCode.BadRequest.ToString()) + { + fileMetadata.resultStatus = ResultStatus.RENEW_TOKEN.ToString(); + } + else if (exAws.ErrorCode == NO_SUCH_KEY) + { + fileMetadata.resultStatus = ResultStatus.NOT_FOUND_FILE.ToString(); + } + else + { + fileMetadata.resultStatus = ResultStatus.ERROR.ToString(); + } + + break; + default: + fileMetadata.resultStatus = ResultStatus.ERROR.ToString(); + break; } - return fileMetadata; } /// @@ -545,22 +551,29 @@ private SFFileMetadata HandleFileHeaderErr(Exception ex, SFFileMetadata fileMeta /// /// Exception from file header. /// The file metadata. - /// The file metadata. - private SFFileMetadata HandleUploadFileErr(Exception ex, SFFileMetadata fileMetadata) + private void HandleUploadFileErr(Exception ex, SFFileMetadata fileMetadata) { Logger.Error("Failed to upload file: " + ex.Message); - AmazonS3Exception err = (AmazonS3Exception)ex; - if (err.ErrorCode == EXPIRED_TOKEN) - { - fileMetadata.resultStatus = ResultStatus.RENEW_TOKEN.ToString(); - } - else + switch (ex) { - fileMetadata.lastError = err; - fileMetadata.resultStatus = ResultStatus.NEED_RETRY.ToString(); + case AmazonS3Exception exAws: + if (exAws.ErrorCode == EXPIRED_TOKEN) + { + fileMetadata.resultStatus = ResultStatus.RENEW_TOKEN.ToString(); + } + else + { + fileMetadata.lastError = exAws; + fileMetadata.resultStatus = ResultStatus.NEED_RETRY.ToString(); + } + break; + + case Exception exOther: + fileMetadata.lastError = exOther; + fileMetadata.resultStatus = ResultStatus.NEED_RETRY.ToString(); + break; } - return fileMetadata; } /// @@ -568,22 +581,29 @@ private SFFileMetadata HandleUploadFileErr(Exception ex, SFFileMetadata fileMeta /// /// Exception from file header. /// The file metadata. - /// The file metadata. - private SFFileMetadata HandleDownloadFileErr(Exception ex, SFFileMetadata fileMetadata) + private void HandleDownloadFileErr(Exception ex, SFFileMetadata fileMetadata) { Logger.Error("Failed to download file: " + ex.Message); - AmazonS3Exception err = (AmazonS3Exception)ex; - if (err.ErrorCode == EXPIRED_TOKEN) - { - fileMetadata.resultStatus = ResultStatus.RENEW_TOKEN.ToString(); - } - else + switch (ex) { - fileMetadata.lastError = err; - fileMetadata.resultStatus = ResultStatus.NEED_RETRY.ToString(); + case AmazonS3Exception exAws: + if (exAws.ErrorCode == EXPIRED_TOKEN) + { + fileMetadata.resultStatus = ResultStatus.RENEW_TOKEN.ToString(); + } + else + { + fileMetadata.lastError = exAws; + fileMetadata.resultStatus = ResultStatus.NEED_RETRY.ToString(); + } + break; + + case Exception exOther: + fileMetadata.lastError = exOther; + fileMetadata.resultStatus = ResultStatus.NEED_RETRY.ToString(); + break; } - return fileMetadata; } } } diff --git a/Snowflake.Data/Core/HttpUtil.cs b/Snowflake.Data/Core/HttpUtil.cs index 72d18bcdd..3e779e34a 100755 --- a/Snowflake.Data/Core/HttpUtil.cs +++ b/Snowflake.Data/Core/HttpUtil.cs @@ -378,9 +378,9 @@ protected override async Task SendAsync(HttpRequestMessage UriUpdater updater = new UriUpdater(requestMessage.RequestUri, includeRetryReason); int retryCount = 0; + long startTimeInMilliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); while (true) { - try { childCts = null; @@ -401,13 +401,12 @@ protected override async Task SendAsync(HttpRequestMessage lastException = e; if (cancellationToken.IsCancellationRequested) { - logger.Debug("SF rest request timeout or explicit cancel called."); + logger.Info("SF rest request timeout or explicit cancel called."); cancellationToken.ThrowIfCancellationRequested(); } else if (childCts != null && childCts.Token.IsCancellationRequested) { - logger.Warn("Http request timeout. Retry the request"); - totalRetryTime += (int)httpTimeout.TotalSeconds; + logger.Warn($"Http request timeout. Retry the request after {backOffInSec} sec."); } else { @@ -426,6 +425,8 @@ protected override async Task SendAsync(HttpRequestMessage } } + totalRetryTime = (int)((DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - startTimeInMilliseconds) / 1000); + if (childCts != null) { childCts.Dispose(); @@ -464,6 +465,19 @@ protected override async Task SendAsync(HttpRequestMessage logger.Info("Response returned was null."); } + if (restTimeout.TotalSeconds > 0 && totalRetryTime > restTimeout.TotalSeconds) + { + logger.Debug($"stop retry as connection_timeout {restTimeout.TotalSeconds} sec. reached"); + if (response != null) + { + return response; + } + var errorMessage = $"http request failed and connection_timeout {restTimeout.TotalSeconds} sec. reached.\n"; + errorMessage += $"Last exception encountered: {lastException}"; + logger.Error(errorMessage); + throw new OperationCanceledException(errorMessage); + } + retryCount++; if ((maxRetryCount > 0) && (retryCount > maxRetryCount)) { @@ -486,7 +500,6 @@ protected override async Task SendAsync(HttpRequestMessage logger.Debug($"Sleep {backOffInSec} seconds and then retry the request, retryCount: {retryCount}"); await Task.Delay(TimeSpan.FromSeconds(backOffInSec), cancellationToken).ConfigureAwait(false); - totalRetryTime += backOffInSec; var jitter = GetJitter(backOffInSec); @@ -504,12 +517,14 @@ protected override async Task SendAsync(HttpRequestMessage backOffInSec *= 2; } + totalRetryTime = (int)((DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - startTimeInMilliseconds) / 1000); if ((restTimeout.TotalSeconds > 0) && (totalRetryTime + backOffInSec > restTimeout.TotalSeconds)) { // No need to wait more than necessary if it can be avoided. // If the rest timeout will be reached before the next back-off, - // then use the remaining connection timeout - backOffInSec = Math.Min(backOffInSec, (int)restTimeout.TotalSeconds - totalRetryTime); + // then use the remaining connection timeout. + // Math.Max with 0 in case totalRetryTime > restTimeout.TotalSeconds + backOffInSec = Math.Max(Math.Min(backOffInSec, (int)restTimeout.TotalSeconds - totalRetryTime), 0); } } }