From fd193e10025caf4224a996dcdefd00e604787f12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Hofman?= Date: Wed, 21 Feb 2024 17:47:52 +0100 Subject: [PATCH] SNOW-950923 fix to provide QueryID for failures during GET/PUT file operations (#825) ### Description QueryID is super helpful when solving issues but it was not reachable when PUT/GET operation got failed at the command/exception level. It was not available for successful scenario for PUT/GET commands as well. This change actually introduces a new type of exception prior to the one used previously - FileNotFoundException/DirectoryNotFoundException and this previous exception is now an inner exception of SnowflakeDbException which we had to use to provide QueryId. In rare cases (for instance when a connection was not opened) request to PUT/GET file on stage might still end up with a base Exception since QueryId won't be created in such a case. ### Checklist - [x] Code compiles correctly - [x] Code is formatted according to [Coding Conventions](../CodingConventions.md) - [x] Created tests which fail without the change (if possible) - [x] All tests passing (`dotnet test`) - [x] Extended the README / documentation, if necessary - [x] Provide JIRA issue id (if possible) or GitHub issue id in PR name --- README.md | 62 +++++++ .../IntegrationTests/SFConnectionIT.cs | 60 ++++--- .../IntegrationTests/SFPutGetTest.cs | 156 +++++++++++++++++- .../UnitTests/SFFileTransferAgentTests.cs | 38 +++-- .../Util/SnowflakeDbExceptionAssert.cs | 86 ++++++++++ Snowflake.Data/Client/SnowflakeDbCommand.cs | 7 +- Snowflake.Data/Client/SnowflakeDbException.cs | 56 ++++--- Snowflake.Data/Core/ErrorMessages.resx | 6 + .../Core/FileTransfer/SFFileTransferAgent.cs | 86 ++++++---- Snowflake.Data/Core/SFBindUploader.cs | 4 +- Snowflake.Data/Core/SFError.cs | 6 + Snowflake.Data/Core/SFStatement.cs | 144 ++++++++++------ 12 files changed, 555 insertions(+), 156 deletions(-) create mode 100644 Snowflake.Data.Tests/Util/SnowflakeDbExceptionAssert.cs diff --git a/README.md b/README.md index 48c48d19b..a719edbcc 100644 --- a/README.md +++ b/README.md @@ -672,6 +672,68 @@ using (IDbConnection conn = new SnowflakeDbConnection()) } ``` +PUT local files to stage +------------------------ + +PUT command can be used to upload files of a local directory or a single local file to the Snowflake stages (named, internal table stage or internal user stage). +Such staging files can be used to load data into a table. +More on this topic: [File staging with PUT](https://docs.snowflake.com/en/sql-reference/sql/put). + +In the driver the command can be executed in a bellow way: +```cs +using (IDbConnection conn = new SnowflakeDbConnection()) +{ + try + { + conn.ConnectionString = ""; + conn.Open(); + var cmd = (SnowflakeDbCommand)conn.CreateCommand(); // cast allows get QueryId from the command + + cmd.CommandText = "PUT file://some_data.csv @my_schema.my_stage AUTO_COMPRESS=TRUE"; + var reader = cmd.ExecuteReader(); + Assert.IsTrue(reader.read()); + Assert.DoesNotThrow(() => Guid.Parse(cmd.GetQueryId())); + } + catch (SnowflakeDbException e) + { + Assert.DoesNotThrow(() => Guid.Parse(e.QueryId)); // when failed + Assert.That(e.InnerException.GetType(), Is.EqualTo(typeof(FileNotFoundException))); + } +``` +In case of a failure a SnowflakeDbException exception will be thrown with affected QueryId if possible. +If it was after the query got executed this exception will be a SnowflakeDbException containing affected QueryId. +In case of the initial phase of execution QueryId might not be provided. +Inner exception (if applicable) will provide some details on the failure cause and +it will be for example: FileNotFoundException, DirectoryNotFoundException. + +GET stage files +--------------- +GET command allows to download stage directories or files to a local directory. +It can be used in connection with named stage, table internal stage or user stage. +Detailed information on the command: [Downloading files with GET](https://docs.snowflake.com/en/sql-reference/sql/get). + +To use the command in a driver similar code can be executed in a client app: +```cs + try + { + conn.ConnectionString = ""; + conn.Open(); + var cmd = (SnowflakeDbCommand)conn.CreateCommand(); // cast allows get QueryId from the command + + cmd.CommandText = "GET @my_schema.my_stage/stage_file.csv file://local_file.csv AUTO_COMPRESS=TRUE"; + var reader = cmd.ExecuteReader(); + Assert.IsTrue(reader.read()); // True on success, False if failure + Assert.DoesNotThrow(() => Guid.Parse(cmd.GetQueryId())); + } + catch (SnowflakeDbException e) + { + Assert.DoesNotThrow(() => Guid.Parse(e.QueryId)); // on failure + } +``` +In case of a failure a SnowflakeDbException will be thrown with affected QueryId if possible. +When no technical or syntax errors occurred but the DBDataReader has no data to process it returns False +without throwing an exception. + Close the Connection -------------------- diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs index fe9419939..f84447000 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs @@ -1,8 +1,10 @@ /* - * Copyright (c) 2012-2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. */ using System.Data.Common; +using System.Net; +using Snowflake.Data.Tests.Util; namespace Snowflake.Data.Tests.IntegrationTests { @@ -503,8 +505,7 @@ public void TestDefaultLoginTimeout() { if (e.InnerException is SnowflakeDbException) { - Assert.AreEqual(SFError.REQUEST_TIMEOUT.GetAttribute().errorCode, - ((SnowflakeDbException)e.InnerException).ErrorCode); + SnowflakeDbExceptionAssert.HasErrorCode(e.InnerException, SFError.REQUEST_TIMEOUT); stopwatch.Stop(); int delta = 10; // in case server time slower. @@ -519,12 +520,12 @@ public void TestDefaultLoginTimeout() } [Test] - public void TestConnectionFailFast() + public void TestConnectionFailFastForNonRetried404OnLogin() { using (var conn = new SnowflakeDbConnection()) { // Just a way to get a 404 on the login request and make sure there are no retry - string invalidConnectionString = "host=learn.microsoft.com;" + string invalidConnectionString = "host=google.com/404;" + "connection_timeout=0;account=testFailFast;user=testFailFast;password=testFailFast;"; conn.ConnectionString = invalidConnectionString; @@ -537,8 +538,12 @@ public void TestConnectionFailFast() } catch (SnowflakeDbException e) { - Assert.AreEqual(SFError.INTERNAL_ERROR.GetAttribute().errorCode, - e.ErrorCode); + SnowflakeDbExceptionAssert.HasHttpErrorCodeInExceptionChain(e, HttpStatusCode.NotFound); + SnowflakeDbExceptionAssert.HasMessageInExceptionChain(e, "404 (Not Found)"); + } + catch (Exception unexpected) + { + Assert.Fail($"Unexpected {unexpected.GetType()} exception occurred"); } Assert.AreEqual(ConnectionState.Closed, conn.State); @@ -546,11 +551,11 @@ public void TestConnectionFailFast() } [Test] - public void TestEnableRetry() + public void TestEnableLoginRetryOn404() { using (var conn = new SnowflakeDbConnection()) { - string invalidConnectionString = "host=learn.microsoft.com;" + string invalidConnectionString = "host=google.com/404;" + "connection_timeout=0;account=testFailFast;user=testFailFast;password=testFailFast;disableretry=true;forceretryon404=true"; conn.ConnectionString = invalidConnectionString; @@ -562,8 +567,12 @@ public void TestEnableRetry() } catch (SnowflakeDbException e) { - Assert.AreEqual(SFError.INTERNAL_ERROR.GetAttribute().errorCode, - e.ErrorCode); + SnowflakeDbExceptionAssert.HasErrorCode(e, SFError.INTERNAL_ERROR); + SnowflakeDbExceptionAssert.HasHttpErrorCodeInExceptionChain(e, HttpStatusCode.NotFound); + } + catch (Exception unexpected) + { + Assert.Fail($"Unexpected {unexpected.GetType()} exception occurred"); } Assert.AreEqual(ConnectionState.Closed, conn.State); @@ -794,7 +803,7 @@ public void TestUnknownAuthenticator() } catch (SnowflakeDbException e) { - Assert.AreEqual(SFError.UNKNOWN_AUTHENTICATOR.GetAttribute().errorCode, e.ErrorCode); + SnowflakeDbExceptionAssert.HasErrorCode(e, SFError.UNKNOWN_AUTHENTICATOR); } } @@ -849,7 +858,7 @@ public void TestOktaConnectionUntilMaxTimeout() catch (Exception e) { Assert.IsInstanceOf(e); - Assert.AreEqual(SFError.INTERNAL_ERROR.GetAttribute().errorCode, ((SnowflakeDbException)e).ErrorCode); + SnowflakeDbExceptionAssert.HasErrorCode(e, SFError.INTERNAL_ERROR); Assert.IsTrue(e.Message.Contains( $"The retry count has reached its limit of {expectedMaxRetryCount} and " + $"the timeout elapsed has reached its limit of {expectedMaxConnectionTimeout} " + @@ -1856,9 +1865,7 @@ public void TestAsyncLoginTimeout() } catch (AggregateException e) { - Assert.AreEqual(SFError.INTERNAL_ERROR.GetAttribute().errorCode, - ((SnowflakeDbException)e.InnerException).ErrorCode); - + SnowflakeDbExceptionAssert.HasErrorCode(e.InnerException, SFError.INTERNAL_ERROR); } stopwatch.Stop(); int delta = 10; // in case server time slower. @@ -1894,9 +1901,7 @@ public void TestAsyncLoginTimeoutWithRetryTimeoutLesserThanConnectionTimeout() } catch (AggregateException e) { - Assert.AreEqual(SFError.INTERNAL_ERROR.GetAttribute().errorCode, - ((SnowflakeDbException)e.InnerException).ErrorCode); - + SnowflakeDbExceptionAssert.HasErrorCode(e.InnerException, SFError.INTERNAL_ERROR); } stopwatch.Stop(); int delta = 10; // in case server time slower. @@ -1929,8 +1934,7 @@ public void TestAsyncDefaultLoginTimeout() } catch (AggregateException e) { - Assert.AreEqual(SFError.INTERNAL_ERROR.GetAttribute().errorCode, - ((SnowflakeDbException)e.InnerException).ErrorCode); + SnowflakeDbExceptionAssert.HasErrorCode(e.InnerException, SFError.INTERNAL_ERROR); } stopwatch.Stop(); int delta = 10; // in case server time slower. @@ -1946,12 +1950,12 @@ public void TestAsyncDefaultLoginTimeout() } [Test] - public void TestAsyncConnectionFailFast() + public void TestAsyncConnectionFailFastForNonRetried404OnLogin() { using (var conn = new SnowflakeDbConnection()) { // Just a way to get a 404 on the login request and make sure there are no retry - string invalidConnectionString = "host=learn.microsoft.com;" + string invalidConnectionString = "host=google.com/404;" + "connection_timeout=0;account=testFailFast;user=testFailFast;password=testFailFast;"; conn.ConnectionString = invalidConnectionString; @@ -1966,8 +1970,12 @@ public void TestAsyncConnectionFailFast() } catch (AggregateException e) { - Assert.AreEqual(SFError.INTERNAL_ERROR.GetAttribute().errorCode, - ((SnowflakeDbException)e.InnerException).ErrorCode); + SnowflakeDbExceptionAssert.HasHttpErrorCodeInExceptionChain(e, HttpStatusCode.NotFound); + SnowflakeDbExceptionAssert.HasMessageInExceptionChain(e, "404 (Not Found)"); + } + catch (Exception unexpected) + { + Assert.Fail($"Unexpected {unexpected.GetType()} exception occurred"); } Assert.AreEqual(ConnectionState.Closed, conn.State); @@ -2131,7 +2139,7 @@ public void TestAsyncOktaConnectionUntilMaxTimeout() catch (Exception e) { Assert.IsInstanceOf(e.InnerException); - Assert.AreEqual(SFError.INTERNAL_ERROR.GetAttribute().errorCode, ((SnowflakeDbException)e.InnerException).ErrorCode); + SnowflakeDbExceptionAssert.HasErrorCode(e.InnerException, SFError.INTERNAL_ERROR); Exception oktaException; #if NETFRAMEWORK oktaException = e.InnerException.InnerException.InnerException; diff --git a/Snowflake.Data.Tests/IntegrationTests/SFPutGetTest.cs b/Snowflake.Data.Tests/IntegrationTests/SFPutGetTest.cs index 6178f16b8..975d041e0 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFPutGetTest.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFPutGetTest.cs @@ -7,6 +7,7 @@ using System.Data.Common; using System.IO.Compression; using System.Text; +using Snowflake.Data.Tests.Util; namespace Snowflake.Data.Tests.IntegrationTests { @@ -208,7 +209,128 @@ public void TestPutFileRelativePathWithoutDirectory() VerifyFilesAreUploaded(conn, new List { t_inputFilePath }, t_internalStagePath); } } + + [Test] + public void TestPutGetOnClosedConnectionThrowsWithoutQueryId([Values("GET", "PUT")] string command) + { + t_inputFilePath = "unexisting_file.csv"; + t_internalStagePath = $"@{t_schemaName}.{t_stageName}"; + + // Act + using (var conn = new SnowflakeDbConnection(ConnectionString)) + { + // conn.Open(); // intentionally closed + var snowflakeDbException = Assert.Throws(() => ProcessFile(command, conn)); + Assert.NotNull(snowflakeDbException); + Assert.IsNull(snowflakeDbException.QueryId); + SnowflakeDbExceptionAssert.HasErrorCode(snowflakeDbException, SFError.EXECUTE_COMMAND_ON_CLOSED_CONNECTION); + } + } + + [Test] + public void TestGetNonExistentFileReturnsFalseAndDoesNotThrow() + { + t_inputFilePath = "non_existent_file.csv"; + t_internalStagePath = $"@{t_schemaName}.{t_stageName}"; + + // Act + using (var conn = new SnowflakeDbConnection(ConnectionString)) + { + conn.Open(); + var sql = $"GET {t_internalStagePath}/{t_fileName} file://{s_outputDirectory}"; + using (var command = conn.CreateCommand()) + { + command.CommandText = sql; + var reader = command.ExecuteReader(); + Assert.AreEqual(false, reader.Read()); + } + } + } + + [Test] + public void TestPutNonExistentFileThrowsWithQueryId() + { + t_inputFilePath = "non_existent_file.csv"; + t_internalStagePath = $"@{t_schemaName}.{t_stageName}"; + + // Act + using (var conn = new SnowflakeDbConnection(ConnectionString)) + { + conn.Open(); + var snowflakeDbException = Assert.Throws(() => PutFile(conn)); + Assert.IsNotNull(snowflakeDbException); + Assert.IsNotNull(snowflakeDbException.QueryId); + SnowflakeDbExceptionAssert.HasErrorCode(snowflakeDbException, SFError.IO_ERROR_ON_GETPUT_COMMAND); + } + } + [Test] + public void TestPutFileProvidesQueryIdOnFailure() + { + // Arrange + // Set the PUT query variables but do not create a file + t_inputFilePath = "unexisting_file.csv"; + t_internalStagePath = $"@{t_schemaName}.{t_stageName}"; + + // Act + using (var conn = new SnowflakeDbConnection(ConnectionString)) + { + conn.Open(); + var snowflakeDbException = Assert.Throws(()=>PutFile(conn)); + var queryId = snowflakeDbException.QueryId; + + // Assert + Assert.IsNotEmpty(queryId); + Assert.DoesNotThrow(()=>Guid.Parse(queryId)); + SnowflakeDbExceptionAssert.HasErrorCode(snowflakeDbException, SFError.IO_ERROR_ON_GETPUT_COMMAND); + } + } + + [Test] + public void TestPutFileWithSyntaxErrorProvidesQueryIdOnFailure() + { + // Arrange + // Set the PUT query variables but do not create a file + t_inputFilePath = "unexisting_file.csv SOME CODE FORCING SYNTAX ERROR"; + t_internalStagePath = $"@{t_schemaName}.{t_stageName}"; + + // Act + using (var conn = new SnowflakeDbConnection(ConnectionString)) + { + conn.Open(); + var snowflakeDbException = Assert.Throws(()=>PutFile(conn)); + var queryId = snowflakeDbException.QueryId; + + // Assert + Assert.IsNotEmpty(queryId); + Assert.DoesNotThrow(()=>Guid.Parse(queryId)); + Assert.That(snowflakeDbException.ErrorCode, Is.EqualTo(1003)); + Assert.That(snowflakeDbException.InnerException, Is.Null); + } + } + + [Test] + public void TestPutFileProvidesQueryIdOnSuccess() + { + // Arrange + // Set the PUT query variables + t_inputFilePath = $"{Guid.NewGuid()}_1.csv"; + t_internalStagePath = $"@{t_schemaName}.{t_stageName}"; + PrepareFileData(t_inputFilePath); + + // Act + using (var conn = new SnowflakeDbConnection(ConnectionString)) + { + conn.Open(); + var queryId = PutFile(conn); + + // Assert + Assert.IsNotNull(queryId); + Assert.DoesNotThrow(()=>Guid.Parse(queryId)); + VerifyFilesAreUploaded(conn, new List { t_inputFilePath }, t_internalStagePath); + } + } + [Test] public void TestPutFileRelativePathWithDirectory() { @@ -459,11 +581,12 @@ private static bool IsCompressedByTheDriver() } // PUT - upload file from local directory to the stage - void PutFile( + string PutFile( SnowflakeDbConnection conn, String additionalAttribute = "", ResultStatus expectedStatus = ResultStatus.UPLOADED) { + string queryId; using (var command = conn.CreateCommand()) { // Prepare PUT query @@ -471,12 +594,23 @@ void PutFile( $"PUT file://{t_inputFilePath} {t_internalStagePath}" + $" AUTO_COMPRESS={(t_autoCompress ? "TRUE" : "FALSE")}" + $" {additionalAttribute}"; - // Upload file command.CommandText = putQuery; var reader = command.ExecuteReader(); - Assert.IsTrue(reader.Read()); - + try + { + Assert.IsTrue(reader.Read()); + } + catch (SnowflakeDbException e) + { + // to make sure in a failure case command was set properly with a failed QueryId + Assert.AreEqual(e.QueryId, ((SnowflakeDbCommand)command).GetQueryId()); + throw; + } + // Checking query id when reader succeeded + queryId = ((SnowflakeDbDataReader)reader).GetQueryId(); + // Checking if query Id is provided on the command level as well + Assert.AreEqual(queryId, ((SnowflakeDbCommand)command).GetQueryId()); // Check file status Assert.AreEqual(expectedStatus.ToString(), reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.ResultStatus)); @@ -497,6 +631,7 @@ void PutFile( } Assert.IsNull(reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.ErrorDetails)); } + return queryId; } // COPY INTO - Copy data from the stage into temp table @@ -565,6 +700,19 @@ private void GetFile(DbConnection conn) } } + private void ProcessFile(String command, SnowflakeDbConnection connection) + { + switch (command) + { + case "GET": + GetFile(connection); + break; + case "PUT": + PutFile(connection); + break; + } + } + private static string[] ReadOutputFileLines() { using (var outputStream = File.OpenRead(t_outputFilePath)) diff --git a/Snowflake.Data.Tests/UnitTests/SFFileTransferAgentTests.cs b/Snowflake.Data.Tests/UnitTests/SFFileTransferAgentTests.cs index 5d81a610e..4e7c2041e 100644 --- a/Snowflake.Data.Tests/UnitTests/SFFileTransferAgentTests.cs +++ b/Snowflake.Data.Tests/UnitTests/SFFileTransferAgentTests.cs @@ -2,6 +2,9 @@ * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. */ +using Snowflake.Data.Client; +using Snowflake.Data.Tests.Util; + namespace Snowflake.Data.Tests.UnitTests { using NUnit.Framework; @@ -67,7 +70,7 @@ class SFFileTransferAgentTest : SFBaseTest const string FileContent = "FTAFileContent"; [SetUp] - public void BeforeTest() + public void BeforeEachTest() { // Base object's names on worker thread id var threadSuffix = TestContext.CurrentContext.WorkerId?.Replace('#', '_'); @@ -118,7 +121,7 @@ public void BeforeTest() } [TearDown] - public void AfterTest() + public void AfterEachTest() { // Delete stage directory recursively if (Directory.Exists(t_locationStage)) @@ -308,7 +311,7 @@ public void TestUploadWithGZIPCompression() } [Test] - public void TestUploadWithWilcardInTheFilename() + public void TestUploadWithWildcardInTheFilename() { // Arrange UploadSetUpFile(); @@ -458,7 +461,7 @@ public void TestUploadWithWildcardInTheDirectoryPath() } [Test] - public void TestUploadThrowsArgumentExceptionForMissingRootDirectoryWithWildcard() + public void TestUploadThrowsExceptionForMissingRootDirectoryWithWildcard() { // Arrange UploadSetUpFile(); @@ -487,15 +490,18 @@ public void TestUploadThrowsArgumentExceptionForMissingRootDirectoryWithWildcard // Set command to upload _responseData.command = CommandTypes.UPLOAD.ToString(); + _responseData.queryId = Guid.NewGuid().ToString(); _fileTransferAgent = new SFFileTransferAgent(_putQuery, _session, _responseData, _cancellationToken); // Act - Exception ex = Assert.Throws(() => _fileTransferAgent.execute()); + SnowflakeDbException ex = Assert.Throws(() => _fileTransferAgent.execute()); // Assert + Assert.AreEqual(_responseData.queryId, ex.QueryId); + SnowflakeDbExceptionAssert.HasErrorCode(ex, SFError.IO_ERROR_ON_GETPUT_COMMAND); Assert.That(ex.Message, Does.Match($"No file found for: {tempUploadRootDirectory}\\*/{tempUploadSecondDirectory}\\*/{mockFileName}")); for (int i = 0; i < numberOfDirectories; i++) @@ -579,17 +585,22 @@ public void TestDownloadThrowsErrorFileNotFound() // Set command to download _responseData.command = CommandTypes.DOWNLOAD.ToString(); + _responseData.queryId = Guid.NewGuid().ToString(); _fileTransferAgent = new SFFileTransferAgent(GetQuery, _session, _responseData, _cancellationToken); // Act - Exception ex = Assert.Throws(() => _fileTransferAgent.execute()); + SnowflakeDbException ex = Assert.Throws(() => _fileTransferAgent.execute()); // Assert - Assert.IsInstanceOf(ex.InnerException); - Assert.That(ex.InnerException.Message, Does.Match("Could not find file .*")); + Assert.AreEqual(_responseData.queryId, ex.QueryId); + SnowflakeDbExceptionAssert.HasErrorCode(ex, SFError.IO_ERROR_ON_GETPUT_COMMAND); + Assert.IsInstanceOf(ex.InnerException); + var innerException = ((AggregateException)ex.InnerException)?.InnerExceptions[0]; + Assert.IsInstanceOf(innerException); + Assert.That(innerException?.Message, Does.Match("Could not find file .*")); } [Test] @@ -606,17 +617,22 @@ public void TestDownloadThrowsErrorDirectoryNotFound() // Set command to download _responseData.command = CommandTypes.DOWNLOAD.ToString(); + _responseData.queryId = Guid.NewGuid().ToString(); _fileTransferAgent = new SFFileTransferAgent(GetQuery, _session, _responseData, _cancellationToken); // Act - Exception ex = Assert.Throws(() => _fileTransferAgent.execute()); + SnowflakeDbException ex = Assert.Throws(() => _fileTransferAgent.execute()); // Assert - Assert.IsInstanceOf(ex.InnerException); - Assert.That(ex.InnerException.Message, Does.Match("Could not find a part of the path .*")); + Assert.AreEqual(_responseData.queryId, ex.QueryId); + SnowflakeDbExceptionAssert.HasErrorCode(ex, SFError.IO_ERROR_ON_GETPUT_COMMAND); + Assert.IsInstanceOf(ex.InnerException); + var innerException = ((AggregateException)ex.InnerException)?.InnerExceptions[0]; + Assert.IsInstanceOf(innerException); + Assert.That(innerException?.Message, Does.Match("Could not find a part of the path .*")); } } } diff --git a/Snowflake.Data.Tests/Util/SnowflakeDbExceptionAssert.cs b/Snowflake.Data.Tests/Util/SnowflakeDbExceptionAssert.cs new file mode 100644 index 000000000..63432da31 --- /dev/null +++ b/Snowflake.Data.Tests/Util/SnowflakeDbExceptionAssert.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Http; +using Snowflake.Data.Core; +using Snowflake.Data.Client; +using NUnit.Framework; + +namespace Snowflake.Data.Tests.Util +{ + public static class SnowflakeDbExceptionAssert + { + public static void HasErrorCode(SnowflakeDbException exception, SFError sfError) + { + Assert.AreEqual(exception.ErrorCode, sfError.GetAttribute().errorCode); + } + + public static void HasErrorCode(Exception exception, SFError sfError) + { + Assert.NotNull(exception); + switch (exception) + { + case SnowflakeDbException snowflakeDbException: + Assert.AreEqual(snowflakeDbException.ErrorCode, sfError.GetAttribute().errorCode); + break; + default: + Assert.Fail(exception.GetType() + " type is not " + typeof(SnowflakeDbException)); + break; + } + } + + public static void HasHttpErrorCodeInExceptionChain(Exception exception, HttpStatusCode expected) + { + var exceptions = CollectExceptions(exception); + Assert.AreEqual(true, + exceptions.Any(e => + { + switch (e) + { + case SnowflakeDbException se: + return se.ErrorCode == (int)expected; + case HttpRequestException he: +#if NETFRAMEWORK + return he.Message.Contains(((int)expected).ToString()); +#else + return he.StatusCode == expected; +#endif + default: + return false; + } + }), + $"Any of exceptions in the chain should have HTTP Status: {expected}"); + } + + public static void HasMessageInExceptionChain(Exception exception, string expected) + { + var exceptions = CollectExceptions(exception); + Assert.AreEqual(true, + exceptions.Any(e => e.Message.Contains(expected)), + $"Any of exceptions in the chain should contain message: {expected}"); + } + + private static List CollectExceptions(Exception exception) + { + var collected = new List(); + if (exception is null) + return collected; + switch (exception) + { + case AggregateException aggregate: + var inner = aggregate.Flatten().InnerExceptions; + collected.AddRange(inner); + collected.AddRange(inner + .Where(e => e.InnerException != null) + .SelectMany(e => CollectExceptions(e.InnerException))); + break; + case Exception general: + collected.AddRange(CollectExceptions(general.InnerException)); + collected.Add(general); + break; + } + return collected; + } + } +} diff --git a/Snowflake.Data/Client/SnowflakeDbCommand.cs b/Snowflake.Data/Client/SnowflakeDbCommand.cs index 1abd5e998..ca415bacc 100755 --- a/Snowflake.Data/Client/SnowflakeDbCommand.cs +++ b/Snowflake.Data/Client/SnowflakeDbCommand.cs @@ -339,12 +339,17 @@ private static Dictionary convertToBindList(List().errorCode; + QueryId = queryId; } public SnowflakeDbException(SFError error, params object[] args) : base(FormatExceptionMessage(error, args, string.Empty, string.Empty)) { - this.VendorCode = error.GetAttribute().errorCode; + VendorCode = error.GetAttribute().errorCode; } public SnowflakeDbException(string sqlState, SFError error, params object[] args) : base(FormatExceptionMessage(error, args, sqlState, string.Empty)) { - this.VendorCode = error.GetAttribute().errorCode; - this.SqlState = sqlState; + VendorCode = error.GetAttribute().errorCode; + SqlState = sqlState; } public SnowflakeDbException(Exception innerException, SFError error, params object[] args) : base(FormatExceptionMessage(error, args, string.Empty, string.Empty), innerException) { - this.VendorCode = error.GetAttribute().errorCode; + VendorCode = error.GetAttribute().errorCode; } public SnowflakeDbException(Exception innerException, string sqlState, SFError error, params object[] args) : base(FormatExceptionMessage(error, args, sqlState, string.Empty), innerException) { - this.VendorCode = error.GetAttribute().errorCode; - this.SqlState = sqlState; + VendorCode = error.GetAttribute().errorCode; + SqlState = sqlState; } static string FormatExceptionMessage(SFError error, object[] args, string sqlState, - string queryId) - { - return FormatExceptionMessage(string.Format(rm.GetString(error.ToString()), args) - , error.GetAttribute().errorCode - , sqlState - , queryId); - } - - static string FormatExceptionMessage(string errorMessage, - int vendorCode, - string sqlState, string queryId) - { - return string.Format("Error: {0} SqlState: {1}, VendorCode: {2}, QueryId: {3}", - errorMessage, sqlState, vendorCode, queryId); + { + return FormatExceptionMessage(string.Format(rm.GetString(error.ToString()), args) + , error.GetAttribute().errorCode + , sqlState + , queryId); + } + + static string FormatExceptionMessage(string errorMessage, + int vendorCode, + string sqlState, + string queryId) + { + return string.Format("Error: {0} SqlState: {1}, VendorCode: {2}, QueryId: {3}", + errorMessage, sqlState, vendorCode, queryId); } } } diff --git a/Snowflake.Data/Core/ErrorMessages.resx b/Snowflake.Data/Core/ErrorMessages.resx index 7159b86a0..b7db8c58c 100755 --- a/Snowflake.Data/Core/ErrorMessages.resx +++ b/Snowflake.Data/Core/ErrorMessages.resx @@ -183,4 +183,10 @@ Browser response timed out after {0} seconds. + + IO operation failed. Error: {0} + + + Executing command on a non-opened connection. + \ No newline at end of file diff --git a/Snowflake.Data/Core/FileTransfer/SFFileTransferAgent.cs b/Snowflake.Data/Core/FileTransfer/SFFileTransferAgent.cs index e5badcf19..b27daa51f 100644 --- a/Snowflake.Data/Core/FileTransfer/SFFileTransferAgent.cs +++ b/Snowflake.Data/Core/FileTransfer/SFFileTransferAgent.cs @@ -185,45 +185,61 @@ public SFFileTransferAgent( /// public void execute() { - // Initialize the encryption metadata - initEncryptionMaterial(); - - if (CommandTypes.UPLOAD == CommandType) - { - initFileMetadataForUpload(); - } - else if (CommandTypes.DOWNLOAD == CommandType) + try { - initFileMetadata(TransferMetadata.src_locations); + // Initialize the encryption metadata + initEncryptionMaterial(); - Directory.CreateDirectory(TransferMetadata.localLocation); - } + if (CommandTypes.UPLOAD == CommandType) + { + initFileMetadataForUpload(); + } + else if (CommandTypes.DOWNLOAD == CommandType) + { + initFileMetadata(TransferMetadata.src_locations); - // Update the file metadata with GCS presigned URL - updatePresignedUrl(); + Directory.CreateDirectory(TransferMetadata.localLocation); + } - foreach (SFFileMetadata fileMetadata in FilesMetas) - { - // If the file is larger than the threshold, add it to the large files list - // Otherwise add it to the small files list - if (fileMetadata.srcFileSize > TransferMetadata.threshold) + // Update the file metadata with GCS presigned URL + updatePresignedUrl(); + + foreach (SFFileMetadata fileMetadata in FilesMetas) { - LargeFilesMetas.Add(fileMetadata); + // If the file is larger than the threshold, add it to the large files list + // Otherwise add it to the small files list + if (fileMetadata.srcFileSize > TransferMetadata.threshold) + { + LargeFilesMetas.Add(fileMetadata); + } + else + { + SmallFilesMetas.Add(fileMetadata); + } } - else + + // Check command type + if (CommandTypes.UPLOAD == CommandType) { - SmallFilesMetas.Add(fileMetadata); + upload(); + } + else if (CommandTypes.DOWNLOAD == CommandType) + { + download(); } } - - // Check command type - if (CommandTypes.UPLOAD == CommandType) - { - upload(); - } - else if (CommandTypes.DOWNLOAD == CommandType) + catch (Exception e) { - download(); + Logger.Error("Error while transferring file(s): " + e.Message); + if (e is SnowflakeDbException snowflakeException) + { + if (snowflakeException.QueryId == null) + { + snowflakeException.QueryId = TransferMetadata.queryId; + } + throw snowflakeException; + } + throw new SnowflakeDbException(SFError.IO_ERROR_ON_GETPUT_COMMAND, TransferMetadata.queryId, e); } } @@ -693,8 +709,9 @@ private int GetFileTransferMaxBytesInMemory() { return int.Parse(maxBytesInMemoryString); } - catch (Exception e) + catch (Exception) { + Logger.Warn("Default for FILE_TRANSFER_MEMORY_THRESHOLD used due to invalid session value."); return FileTransferConfiguration.DefaultMaxBytesInMemory; } } @@ -1262,7 +1279,8 @@ private async Task UploadSingleFileAsync( } catch (Exception ex) { - throw ex; + Logger.Error("UploadSingleFileAsync encountered an error: " + ex.Message); + throw; } finally { @@ -1299,7 +1317,8 @@ private SFFileMetadata DownloadSingleFile( } catch (Exception ex) { - throw ex; + Logger.Error("DownloadSingleFile encountered an error: " + ex.Message); + throw; } finally { @@ -1336,7 +1355,8 @@ private async Task DownloadSingleFileAsync( } catch (Exception ex) { - throw ex; + Logger.Error("DownloadSingleFileAsync encountered an error: " + ex.Message); + throw; } finally { diff --git a/Snowflake.Data/Core/SFBindUploader.cs b/Snowflake.Data/Core/SFBindUploader.cs index 56cd376bd..68af7405b 100644 --- a/Snowflake.Data/Core/SFBindUploader.cs +++ b/Snowflake.Data/Core/SFBindUploader.cs @@ -297,7 +297,7 @@ private void CreateStage() { session.SetArrayBindStageThreshold(0); logger.Error("Failed to create temporary stage for array binds.", e); - throw e; + throw; } } } @@ -321,7 +321,7 @@ internal async Task CreateStageAsync(CancellationToken cancellationToken) { session.SetArrayBindStageThreshold(0); logger.Error("Failed to create temporary stage for array binds.", e); - throw e; + throw; } } } diff --git a/Snowflake.Data/Core/SFError.cs b/Snowflake.Data/Core/SFError.cs index 2aac72f16..ee59e9241 100755 --- a/Snowflake.Data/Core/SFError.cs +++ b/Snowflake.Data/Core/SFError.cs @@ -78,6 +78,12 @@ public enum SFError [SFErrorAttr(errorCode = 270057)] BROWSER_RESPONSE_TIMEOUT, + + [SFErrorAttr(errorCode = 270058)] + IO_ERROR_ON_GETPUT_COMMAND, + + [SFErrorAttr(errorCode = 270059)] + EXECUTE_COMMAND_ON_CLOSED_CONNECTION } class SFErrorAttr : Attribute diff --git a/Snowflake.Data/Core/SFStatement.cs b/Snowflake.Data/Core/SFStatement.cs index 3c48688ee..a8ada8af2 100644 --- a/Snowflake.Data/Core/SFStatement.cs +++ b/Snowflake.Data/Core/SFStatement.cs @@ -343,79 +343,115 @@ internal SFBaseResultSet Execute(int timeout, string sql, Dictionary( - timeout, - sql, - bindings, - describeOnly); - - SFFileTransferAgent fileTransferAgent = - new SFFileTransferAgent(trimmedSql, SfSession, response.data, CancellationToken.None); - - // Start the file transfer - fileTransferAgent.execute(); - - // Get the results of the upload/download - return fileTransferAgent.result(); + return ExecuteSqlWithPutGet(timeout, trimmedSql, bindings, describeOnly); } - else + + return ExecuteSqlOtherThanPutGet(timeout, trimmedSql, bindings, describeOnly); + } + finally + { + CleanUpCancellationTokenSources(); + ClearQueryRequestId(); + } + } + + private SFBaseResultSet ExecuteSqlWithPutGet(int timeout, string sql, Dictionary bindings, bool describeOnly) + { + try + { + isPutGetQuery = true; + PutGetExecResponse response = + ExecuteHelper( + timeout, + sql, + bindings, + describeOnly); + + logger.Debug("PUT/GET queryId: " + (response.data != null ? response.data.queryId : "Unknown")); + + SFFileTransferAgent fileTransferAgent = + new SFFileTransferAgent(sql, SfSession, response.data, CancellationToken.None); + + // Start the file transfer + fileTransferAgent.execute(); + + if (response.data != null) + _lastQueryId = response.data.queryId; + + // Get the results of the upload/download + return fileTransferAgent.result(); + } + catch (SnowflakeDbException ex) + { + logger.Error($"Query execution failed, QueryId: {ex.QueryId??"unavailable"}", ex); + _lastQueryId = ex.QueryId ?? _lastQueryId; + throw; + } + catch (Exception ex) + { + logger.Error("Query execution failed.", ex); + throw new SnowflakeDbException(ex, SFError.INTERNAL_ERROR); + } + } + + private SFBaseResultSet ExecuteSqlOtherThanPutGet(int timeout, string sql, Dictionary bindings, bool describeOnly) + { + try + { + int arrayBindingThreshold = 0; + if (SfSession.ParameterMap.ContainsKey(SFSessionParameter.CLIENT_STAGE_ARRAY_BINDING_THRESHOLD)) { - int arrayBindingThreshold = 0; - if (SfSession.ParameterMap.ContainsKey(SFSessionParameter.CLIENT_STAGE_ARRAY_BINDING_THRESHOLD)) - { - String val = (String)SfSession.ParameterMap[SFSessionParameter.CLIENT_STAGE_ARRAY_BINDING_THRESHOLD]; - arrayBindingThreshold = Int32.Parse(val); - } + String val = + (String)SfSession.ParameterMap[SFSessionParameter.CLIENT_STAGE_ARRAY_BINDING_THRESHOLD]; + arrayBindingThreshold = Int32.Parse(val); + } - int numBinding = GetBindingCount(bindings); + int numBinding = GetBindingCount(bindings); - if (0 < arrayBindingThreshold - && arrayBindingThreshold <= numBinding - && !describeOnly) + if (0 < arrayBindingThreshold + && arrayBindingThreshold <= numBinding + && !describeOnly) + { + try { - try - { - AssignQueryRequestId(); - SFBindUploader uploader = new SFBindUploader(SfSession, _requestId); - uploader.Upload(bindings); - _bindStage = uploader.getStagePath(); - ClearQueryRequestId(); - } - catch (Exception e) - { - logger.Warn("Exception encountered trying to upload binds to stage. Attaching binds in payload instead. {0}", e); - } + AssignQueryRequestId(); + SFBindUploader uploader = new SFBindUploader(SfSession, _requestId); + uploader.Upload(bindings); + _bindStage = uploader.getStagePath(); + ClearQueryRequestId(); + } + catch (Exception e) + { + logger.Warn( + "Exception encountered trying to upload binds to stage. Attaching binds in payload instead. {0}", + e); } + } - QueryExecResponse response = - ExecuteHelper( - timeout, - sql, - bindings, - describeOnly); + QueryExecResponse response = + ExecuteHelper( + timeout, + sql, + bindings, + describeOnly); - return BuildResultSet(response, CancellationToken.None); - } + return BuildResultSet(response, CancellationToken.None); } catch (Exception ex) { logger.Error("Query execution failed.", ex); + if (ex is SnowflakeDbException snowflakeDbException) + { + _lastQueryId = snowflakeDbException.QueryId ?? _lastQueryId; + } throw; } - finally - { - CleanUpCancellationTokenSources(); - ClearQueryRequestId(); - } } - + internal async Task GetResultWithIdAsync(string resultId, CancellationToken cancellationToken) { var req = BuildResultRequestWithId(resultId);