Skip to content

Commit

Permalink
SNOW-950923 fix to provide QueryID for failures during GET/PUT file o…
Browse files Browse the repository at this point in the history
…perations
  • Loading branch information
sfc-gh-mhofman committed Nov 30, 2023
1 parent 62f2206 commit 4d2421f
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 69 deletions.
93 changes: 74 additions & 19 deletions Snowflake.Data.Tests/IntegrationTests/SFPutGetTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,49 @@ public void TestPutFileRelativePathWithoutDirectory()
VerifyFilesAreUploaded(conn, new List<string> { t_inputFilePath }, t_internalStagePath);
}
}

[Test]
public void TestPutFileProvidesQueryIdOnFailure()
{
// Arrange
// Set the PUT query variables but do not create a file
t_inputFilePath = "unexisting_file.csv";
t_internalStagePath = $"@{t_schemaName}.{t_stageName}";

// Act
using (var conn = new SnowflakeDbConnection(ConnectionString))
{
conn.Open();
var queryId = PutFile(conn);

// Assert
Assert.IsNotNull(queryId);
Assert.DoesNotThrow(()=>Guid.Parse(queryId));
}
}

[Test]
public void TestPutFileProvidesQueryIdOnSuccess()
{
// Arrange
// Set the PUT query variables
t_inputFilePath = $"{Guid.NewGuid()}_1.csv";
t_internalStagePath = $"@{t_schemaName}.{t_stageName}";
PrepareFileData(t_inputFilePath);

// Act
using (var conn = new SnowflakeDbConnection(ConnectionString))
{
conn.Open();
var queryId = PutFile(conn);

// Assert
Assert.IsNotNull(queryId);
Assert.DoesNotThrow(()=>Guid.Parse(queryId));
VerifyFilesAreUploaded(conn, new List<string> { t_inputFilePath }, t_internalStagePath);
}
}

[Test]
public void TestPutFileRelativePathWithDirectory()
{
Expand Down Expand Up @@ -459,11 +501,12 @@ private static bool IsCompressedByTheDriver()
}

// PUT - upload file from local directory to the stage
void PutFile(
string PutFile(
SnowflakeDbConnection conn,
String additionalAttribute = "",
ResultStatus expectedStatus = ResultStatus.UPLOADED)
{
String queryId;
using (var command = conn.CreateCommand())
{
// Prepare PUT query
Expand All @@ -474,29 +517,41 @@ void PutFile(

// Upload file
command.CommandText = putQuery;
var reader = command.ExecuteReader();
Assert.IsTrue(reader.Read());

// Check file status
Assert.AreEqual(expectedStatus.ToString(),
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.ResultStatus));
// Check source and destination compression type
if (t_autoCompress)
try
{
Assert.AreEqual(t_sourceCompressionType,
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.SourceCompressionType));
Assert.AreEqual(t_destCompressionType,
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.DestinationCompressionType));
var reader = command.ExecuteReader();
Assert.IsTrue(reader.Read());
// Checking query id when reader succeeded
queryId = ((SnowflakeDbDataReader)reader).GetQueryId();
// Checking if query Id is provided on the command level as well
Assert.AreEqual(queryId, ((SnowflakeDbCommand)command).GetQueryId());
// Check file status
Assert.AreEqual(expectedStatus.ToString(),
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.ResultStatus));
// Check source and destination compression type
if (t_autoCompress)
{
Assert.AreEqual(t_sourceCompressionType,
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.SourceCompressionType));
Assert.AreEqual(t_destCompressionType,
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.DestinationCompressionType));
}
else
{
Assert.AreEqual(SFFileCompressionTypes.NONE.Name,
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.SourceCompressionType));
Assert.AreEqual(SFFileCompressionTypes.NONE.Name,
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.DestinationCompressionType));
}
Assert.IsNull(reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.ErrorDetails));
}
else
catch (SnowflakeDbException e)
{
Assert.AreEqual(SFFileCompressionTypes.NONE.Name,
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.SourceCompressionType));
Assert.AreEqual(SFFileCompressionTypes.NONE.Name,
reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.DestinationCompressionType));
queryId = e.QueryId;
Assert.AreEqual(queryId, ((SnowflakeDbCommand)command).GetQueryId());
}
Assert.IsNull(reader.GetString((int)SFResultSet.PutGetResponseRowTypeInfo.ErrorDetails));
}
return queryId;
}

// COPY INTO - Copy data from the stage into temp table
Expand Down
53 changes: 30 additions & 23 deletions Snowflake.Data/Client/SnowflakeDbException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,55 +40,62 @@ public override int ErrorCode
public SnowflakeDbException(string sqlState, int vendorCode, string errorMessage, string queryId)
: base(FormatExceptionMessage(errorMessage, vendorCode, sqlState, queryId))
{
this.SqlState = sqlState;
this.VendorCode = vendorCode;
this.QueryId = queryId;
SqlState = sqlState;
VendorCode = vendorCode;
QueryId = queryId;
}

public SnowflakeDbException(SFError error, string queryId, params object[] args)
: base(FormatExceptionMessage(error, args, string.Empty, queryId))
{
VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
QueryId = queryId;
}

public SnowflakeDbException(SFError error, params object[] args)
: base(FormatExceptionMessage(error, args, string.Empty, string.Empty))
{
this.VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
}

public SnowflakeDbException(string sqlState, SFError error, params object[] args)
: base(FormatExceptionMessage(error, args, sqlState, string.Empty))
{
this.VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
this.SqlState = sqlState;
VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
SqlState = sqlState;
}

public SnowflakeDbException(Exception innerException, SFError error, params object[] args)
: base(FormatExceptionMessage(error, args, string.Empty, string.Empty), innerException)
{
this.VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
}

public SnowflakeDbException(Exception innerException, string sqlState, SFError error, params object[] args)
: base(FormatExceptionMessage(error, args, sqlState, string.Empty), innerException)
{
this.VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
this.SqlState = sqlState;
VendorCode = error.GetAttribute<SFErrorAttr>().errorCode;
SqlState = sqlState;
}

static string FormatExceptionMessage(SFError error,
object[] args,
string sqlState,
string queryId)
{
return FormatExceptionMessage(string.Format(rm.GetString(error.ToString()), args)
, error.GetAttribute<SFErrorAttr>().errorCode
, sqlState
, queryId);
}

static string FormatExceptionMessage(string errorMessage,
int vendorCode,
string sqlState,
string queryId)
{
return string.Format("Error: {0} SqlState: {1}, VendorCode: {2}, QueryId: {3}",
errorMessage, sqlState, vendorCode, queryId);
{
return FormatExceptionMessage(string.Format(rm.GetString(error.ToString()), args)
, error.GetAttribute<SFErrorAttr>().errorCode
, sqlState
, queryId);
}

static string FormatExceptionMessage(string errorMessage,
int vendorCode,
string sqlState,
string queryId)
{
return string.Format("Error: {0} SqlState: {1}, VendorCode: {2}, QueryId: {3}",
errorMessage, sqlState, vendorCode, queryId);
}
}
}
3 changes: 3 additions & 0 deletions Snowflake.Data/Core/ErrorMessages.resx
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,7 @@
<data name="BROWSER_RESPONSE_TIMEOUT" xml:space="preserve">
<value>Browser response timed out after {0} seconds.</value>
</data>
<data name="IO_ERROR_ON_GETPUT_COMMAND" xml:space="preserve">
<value>IO operation failed. Error: {0}</value>
</data>
</root>
68 changes: 41 additions & 27 deletions Snowflake.Data/Core/FileTransfer/SFFileTransferAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,46 +185,60 @@ public SFFileTransferAgent(
/// </summary>
public void execute()
{
// Initialize the encryption metadata
initEncryptionMaterial();

if (CommandTypes.UPLOAD == CommandType)
{
initFileMetadataForUpload();
}
else if (CommandTypes.DOWNLOAD == CommandType)
try
{
initFileMetadata(TransferMetadata.src_locations);
// Initialize the encryption metadata
initEncryptionMaterial();

Directory.CreateDirectory(TransferMetadata.localLocation);
}
if (CommandTypes.UPLOAD == CommandType)
{
initFileMetadataForUpload();
}
else if (CommandTypes.DOWNLOAD == CommandType)
{
initFileMetadata(TransferMetadata.src_locations);

// Update the file metadata with GCS presigned URL
updatePresignedUrl();
Directory.CreateDirectory(TransferMetadata.localLocation);
}

foreach (SFFileMetadata fileMetadata in FilesMetas)
{
// If the file is larger than the threshold, add it to the large files list
// Otherwise add it to the small files list
if (fileMetadata.srcFileSize > TransferMetadata.threshold)
// Update the file metadata with GCS presigned URL
updatePresignedUrl();

foreach (SFFileMetadata fileMetadata in FilesMetas)
{
LargeFilesMetas.Add(fileMetadata);
// If the file is larger than the threshold, add it to the large files list
// Otherwise add it to the small files list
if (fileMetadata.srcFileSize > TransferMetadata.threshold)
{
LargeFilesMetas.Add(fileMetadata);
}
else
{
SmallFilesMetas.Add(fileMetadata);
}
}
else

// Check command type
if (CommandTypes.UPLOAD == CommandType)
{
SmallFilesMetas.Add(fileMetadata);
upload();
}
else if (CommandTypes.DOWNLOAD == CommandType)
{
download();
}
}

// Check command type
if (CommandTypes.UPLOAD == CommandType)
catch (FileNotFoundException e)
{
upload();
Logger.Error("File not found while transferring file(s): " + e.Message);
throw new SnowflakeDbException(SFError.IO_ERROR_ON_GETPUT_COMMAND, TransferMetadata.queryId, e);
}
else if (CommandTypes.DOWNLOAD == CommandType)
catch (IOException e)
{
download();
Logger.Error("IO operation error while transferring file(s): " + e.Message);
throw new SnowflakeDbException(SFError.IO_ERROR_ON_GETPUT_COMMAND, TransferMetadata.queryId, e);
}

}

public async Task executeAsync(CancellationToken cancellationToken)
Expand Down
3 changes: 3 additions & 0 deletions Snowflake.Data/Core/SFError.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public enum SFError

[SFErrorAttr(errorCode = 270057)]
BROWSER_RESPONSE_TIMEOUT,

[SFErrorAttr(errorCode = 270058)]
IO_ERROR_ON_GETPUT_COMMAND
}

class SFErrorAttr : Attribute
Expand Down
11 changes: 11 additions & 0 deletions Snowflake.Data/Core/SFStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,18 @@ internal SFBaseResultSet Execute(int timeout, string sql, Dictionary<string, Bin
bindings,
describeOnly);

if (logger.IsDebugEnabled())
logger.Debug("PUT/GET queryId: " + (response.data != null ? response.data.queryId : "Unknown"));

SFFileTransferAgent fileTransferAgent =
new SFFileTransferAgent(trimmedSql, SfSession, response.data, CancellationToken.None);

// Start the file transfer
fileTransferAgent.execute();

if (response.data != null)
_lastQueryId = response.data.queryId;

// Get the results of the upload/download
return fileTransferAgent.result();
}
Expand Down Expand Up @@ -407,6 +413,11 @@ internal SFBaseResultSet Execute(int timeout, string sql, Dictionary<string, Bin
catch (Exception ex)
{
logger.Error("Query execution failed.", ex);
if (ex is SnowflakeDbException)
{
var snowflakeDbException = (SnowflakeDbException)ex;
this._lastQueryId = snowflakeDbException.QueryId;
}
throw;
}
finally
Expand Down

0 comments on commit 4d2421f

Please sign in to comment.