Skip to content

Commit

Permalink
Merge branch 'master' into SNOW-1672654-encryptionMaterial-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dstempniak authored Oct 28, 2024
2 parents 83a5227 + b21bfb3 commit edb3203
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 14 deletions.
90 changes: 90 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/SFDbCommandIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using System.Threading;
using System.Threading.Tasks;
using Snowflake.Data.Core;
using System.Linq;
using System.IO;

namespace Snowflake.Data.Tests.IntegrationTests
{
Expand Down Expand Up @@ -1674,5 +1676,93 @@ public async Task TestCommandWithCommentEmbeddedAsync()
Assert.AreEqual("--", reader.GetString(0));
}
}

[Test]
public void TestExecuteNonQueryReturnsCorrectRowCountForUploadWithMultipleFiles()
{
const int NumberOfFiles = 5;
const int NumberOfRows = 3;
const int ExpectedRowCount = NumberOfFiles * NumberOfRows;

using (SnowflakeDbConnection conn = new SnowflakeDbConnection())
{
conn.ConnectionString = ConnectionString + "poolingEnabled=false";
conn.Open();

using (SnowflakeDbCommand cmd = (SnowflakeDbCommand)conn.CreateCommand())
{
var tempFolder = $"{Path.GetTempPath()}Temp_{Guid.NewGuid()}";

try
{
// Arrange
Directory.CreateDirectory(tempFolder);
var data = string.Concat(Enumerable.Repeat(string.Join(",", "TestData") + "\n", NumberOfRows));
for (int i = 0; i < NumberOfFiles; i++)
{
File.WriteAllText(Path.Combine(tempFolder, $"{TestContext.CurrentContext.Test.Name}_{i}.csv"), data);
}
CreateOrReplaceTable(conn, TableName, new[] { "COL1 STRING" });
cmd.CommandText = $"PUT file://{Path.Combine(tempFolder, "*.csv")} @%{TableName} AUTO_COMPRESS=FALSE";
var reader = cmd.ExecuteReader();

// Act
cmd.CommandText = $"COPY INTO {TableName} FROM @%{TableName} PATTERN='.*.csv' FILE_FORMAT=(TYPE=CSV)";
int actualRowCount = cmd.ExecuteNonQuery();

// Assert
Assert.AreEqual(ExpectedRowCount, actualRowCount);
}
finally
{
Directory.Delete(tempFolder, true);
}
}
}
}

[Test]
public async Task TestExecuteNonQueryAsyncReturnsCorrectRowCountForUploadWithMultipleFiles()
{
const int NumberOfFiles = 5;
const int NumberOfRows = 3;
const int ExpectedRowCount = NumberOfFiles * NumberOfRows;

using (SnowflakeDbConnection conn = new SnowflakeDbConnection())
{
conn.ConnectionString = ConnectionString + "poolingEnabled=false";
conn.Open();

using (SnowflakeDbCommand cmd = (SnowflakeDbCommand)conn.CreateCommand())
{
var tempFolder = $"{Path.GetTempPath()}Temp_{Guid.NewGuid()}";

try
{
// Arrange
Directory.CreateDirectory(tempFolder);
var data = string.Concat(Enumerable.Repeat(string.Join(",", "TestData") + "\n", NumberOfRows));
for (int i = 0; i < NumberOfFiles; i++)
{
File.WriteAllText(Path.Combine(tempFolder, $"{TestContext.CurrentContext.Test.Name}_{i}.csv"), data);
}
CreateOrReplaceTable(conn, TableName, new[] { "COL1 STRING" });
cmd.CommandText = $"PUT file://{Path.Combine(tempFolder, "*.csv")} @%{TableName} AUTO_COMPRESS=FALSE";
var reader = cmd.ExecuteReader();

// Act
cmd.CommandText = $"COPY INTO {TableName} FROM @%{TableName} PATTERN='.*.csv' FILE_FORMAT=(TYPE=CSV)";
int actualRowCount = await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);

// Assert
Assert.AreEqual(ExpectedRowCount, actualRowCount);
}
finally
{
Directory.Delete(tempFolder, true);
}
}
}
}
}
}
19 changes: 8 additions & 11 deletions Snowflake.Data/Core/HttpUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
}
else if (childCts != null && childCts.Token.IsCancellationRequested)
{
logger.Warn($"Http request timeout. Retry the request after {backOffInSec} sec.");
logger.Warn($"Http request timeout. Retry the request after max {backOffInSec} sec.");
}
else
{
Expand Down Expand Up @@ -465,7 +465,7 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
logger.Info("Response returned was null.");
}

if (restTimeout.TotalSeconds > 0 && totalRetryTime > restTimeout.TotalSeconds)
if (restTimeout.TotalSeconds > 0 && totalRetryTime >= restTimeout.TotalSeconds)
{
logger.Debug($"stop retry as connection_timeout {restTimeout.TotalSeconds} sec. reached");
if (response != null)
Expand All @@ -478,6 +478,12 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
throw new OperationCanceledException(errorMessage);
}

if (restTimeout.TotalSeconds > 0 && totalRetryTime + backOffInSec > restTimeout.TotalSeconds)
{
// No need to wait more than necessary if it can be avoided.
backOffInSec = (int)restTimeout.TotalSeconds - totalRetryTime;
}

retryCount++;
if ((maxRetryCount > 0) && (retryCount > maxRetryCount))
{
Expand Down Expand Up @@ -516,15 +522,6 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
// Multiply sleep by 2 for non-login requests
backOffInSec *= 2;
}

totalRetryTime = (int)((DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - startTimeInMilliseconds) / 1000);
if ((restTimeout.TotalSeconds > 0) && (totalRetryTime + backOffInSec > restTimeout.TotalSeconds))
{
// No need to wait more than necessary if it can be avoided.
// If the rest timeout will be reached before the next back-off,
// then use the remaining connection timeout.
backOffInSec = Math.Min(backOffInSec, (int)restTimeout.TotalSeconds - totalRetryTime + 1);
}
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions Snowflake.Data/Core/ResultSetUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ internal static int CalculateUpdateCount(this SFBaseResultSet resultSet)
var index = resultSet.sfResultSetMetaData.GetColumnIndexByName("rows_loaded");
if (index >= 0)
{
resultSet.Next();
updateCount = resultSet.GetInt64(index);
resultSet.Rewind();
while (resultSet.Next())
{
updateCount += resultSet.GetInt64(index);
}
while (resultSet.Rewind()) {}
}
break;
case SFStatementType.COPY_UNLOAD:
Expand Down

0 comments on commit edb3203

Please sign in to comment.