Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-817091: Async execution #887

Merged
merged 48 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
727b460
SNOW-817091: Add async execution
sfc-gh-ext-simba-lf Mar 9, 2024
812cba0
SNOW-817091: Add tests
sfc-gh-ext-simba-lf Mar 9, 2024
3810211
SNOW-817091: Fix session closing issue
sfc-gh-ext-simba-lf Mar 9, 2024
03d7b64
SNOW-817091: Mark nonparallelizable
sfc-gh-ext-simba-lf Mar 9, 2024
97904a9
Revert last commit
sfc-gh-ext-simba-lf Mar 9, 2024
eae1e7d
SNOW-817091: Add check for NET Framework
sfc-gh-ext-simba-lf Mar 9, 2024
e52aded
Remove empty line
sfc-gh-ext-simba-lf Mar 15, 2024
c064688
Include uppercase letters in uuid regex
sfc-gh-ext-simba-lf Mar 15, 2024
b0827ff
Add short description for async mode
sfc-gh-ext-simba-lf Mar 16, 2024
d9288ab
Add check for PUT/GET in async exec mode
sfc-gh-ext-simba-lf Mar 16, 2024
69c8a41
Add GET command to test
sfc-gh-ext-simba-lf Mar 18, 2024
95cd938
Add check for cancellation while getting results
sfc-gh-ext-simba-lf Mar 18, 2024
20518fe
Replace GetQueryStatus with GetQueryStatusAsync
sfc-gh-ext-simba-lf Mar 18, 2024
6ce4a8a
Refactor async exec retry into one function
sfc-gh-ext-simba-lf Mar 18, 2024
e4beae6
Fix indent
sfc-gh-ext-simba-lf Mar 19, 2024
60335ec
Replace task with await
sfc-gh-ext-simba-lf Mar 19, 2024
78b0272
Replace task with await
sfc-gh-ext-simba-lf Mar 19, 2024
bc20b8a
Fix async test for .NET Framework
sfc-gh-ext-simba-lf Mar 19, 2024
ad8d40f
Fix async test for .NET Framework
sfc-gh-ext-simba-lf Mar 19, 2024
c94a51d
Use PascalCase for enum values
sfc-gh-ext-simba-lf Mar 19, 2024
c899f26
Remove comment about QueryDTO
sfc-gh-ext-simba-lf Mar 19, 2024
d874805
Remove AsyncQueries property and related functions
sfc-gh-ext-simba-lf Mar 19, 2024
4a5cb1d
Add missing indent
sfc-gh-ext-simba-lf Mar 20, 2024
0c3fbb5
Break while loop after a number of status retries
sfc-gh-ext-simba-lf Mar 20, 2024
b9a58a3
Rename test
sfc-gh-ext-simba-lf Mar 20, 2024
912f4a2
Use different sessions for the test
sfc-gh-ext-simba-lf Mar 20, 2024
a622537
Modify test tag
sfc-gh-ext-simba-lf Mar 21, 2024
21915dc
Assign enum with string values
sfc-gh-ext-simba-lf Mar 21, 2024
7185f79
Re-add check if token is cancelled
sfc-gh-ext-simba-lf Mar 21, 2024
6ec657b
Remove unused imports
sfc-gh-ext-simba-lf Mar 22, 2024
1d7dafb
Create variable for max status retry
sfc-gh-ext-simba-lf Mar 22, 2024
d4994d2
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Mar 22, 2024
ce10981
Refactor QueryStatuses
sfc-gh-ext-simba-lf Mar 22, 2024
198a39b
Refactor QueryStatuses
sfc-gh-ext-simba-lf Mar 22, 2024
6438057
Make comparison case insensitive
sfc-gh-ext-simba-lf Mar 22, 2024
8957d17
Add Unknown status to enum
sfc-gh-ext-simba-lf Mar 23, 2024
293f58d
Add test for GetQueryStatusByStringValue
sfc-gh-ext-simba-lf Mar 23, 2024
29b8149
Replaced Description attribute with StringAttr attribute
sfc-gh-ext-simba-lf Mar 23, 2024
95f0986
Remove unused import
sfc-gh-ext-simba-lf Mar 25, 2024
470907c
Rename parameter
sfc-gh-ext-simba-lf Mar 25, 2024
d49d83d
Split test case into positive/negative cases
sfc-gh-ext-simba-lf Mar 25, 2024
e6ab842
Remove Unknown enum and refactor GetQueryStatusByStringValue
sfc-gh-ext-simba-lf Mar 25, 2024
73358b1
Refactor async query retry into its own class
sfc-gh-ext-simba-lf Mar 25, 2024
e1c50b6
Add util functions for user to check query status is still running or…
sfc-gh-ext-simba-lf Mar 25, 2024
3801171
Remove public constructor
sfc-gh-ext-simba-lf Mar 25, 2024
d05c248
Revert removing newline
sfc-gh-ext-simba-lf Mar 25, 2024
1877755
Add missing test comment
sfc-gh-ext-simba-lf Mar 25, 2024
91f9ac8
Mark class internal and use static instance
sfc-gh-ext-simba-lf Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ void ThreadProcess2(string connstr)

Thread.Sleep(5000);
SFStatement statement = new SFStatement(conn1.SfSession);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
SnowflakeDbConnectionPool.ClearAllPools();
Expand Down
521 changes: 521 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/SFDbCommandIT.cs

Large diffs are not rendered by default.

99 changes: 92 additions & 7 deletions Snowflake.Data.Tests/UnitTests/SFStatementTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Snowflake.Data.Tests.UnitTests
{
using Snowflake.Data.Core;
using NUnit.Framework;
using System;

/**
* Mock rest request test
Expand All @@ -21,7 +22,7 @@ public void TestSessionRenew()
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
Assert.AreEqual("new_session_token", sfSession.sessionToken);
Expand All @@ -37,7 +38,7 @@ public void TestSessionRenewDuringQueryExec()
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
}
Expand All @@ -57,7 +58,7 @@ public void TestServiceName()
for (int i = 0; i < 5; i++)
{
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "SELECT 1", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "SELECT 1", null, false, false);
expectServiceName += "a";
Assert.AreEqual(expectServiceName, sfSession.ParameterMap[SFSessionParameter.SERVICE_NAME]);
}
Expand All @@ -73,7 +74,7 @@ public void TestTrimSqlBlockComment()
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "/*comment*/select 1/*comment*/", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "/*comment*/select 1/*comment*/", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
}
Expand All @@ -88,7 +89,7 @@ public void TestTrimSqlBlockCommentMultiline()
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "/*comment\r\ncomment*/select 1/*comment\r\ncomment*/", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "/*comment\r\ncomment*/select 1/*comment\r\ncomment*/", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
}
Expand All @@ -103,7 +104,7 @@ public void TestTrimSqlLineComment()
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "--comment\r\nselect 1\r\n--comment", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "--comment\r\nselect 1\r\n--comment", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
}
Expand All @@ -118,9 +119,93 @@ public void TestTrimSqlLineCommentWithClosingNewline()
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "--comment\r\nselect 1\r\n--comment\r\n", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "--comment\r\nselect 1\r\n--comment\r\n", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
}

[Test]
[TestCase("UNKNOWN", QueryStatus.Unknown)]
[TestCase("RANDOM_STATUS", QueryStatus.Unknown)]
[TestCase("aBcZyX", QueryStatus.Unknown)]
[TestCase("running", QueryStatus.Running)]
[TestCase("RUNNING", QueryStatus.Running)]
[TestCase("resuming_warehouse", QueryStatus.ResumingWarehouse)]
[TestCase("RESUMING_WAREHOUSE", QueryStatus.ResumingWarehouse)]
[TestCase("queued", QueryStatus.Queued)]
[TestCase("QUEUED", QueryStatus.Queued)]
[TestCase("queued_reparing_warehouse", QueryStatus.QueuedReparingWarehouse)]
[TestCase("QUEUED_REPARING_WAREHOUSE", QueryStatus.QueuedReparingWarehouse)]
[TestCase("no_data", QueryStatus.NoData)]
[TestCase("NO_DATA", QueryStatus.NoData)]
[TestCase("aborting", QueryStatus.Aborting)]
[TestCase("ABORTING", QueryStatus.Aborting)]
[TestCase("success", QueryStatus.Success)]
[TestCase("SUCCESS", QueryStatus.Success)]
[TestCase("failed_with_error", QueryStatus.FailedWithError)]
[TestCase("FAILED_WITH_ERROR", QueryStatus.FailedWithError)]
[TestCase("aborted", QueryStatus.Aborted)]
[TestCase("ABORTED", QueryStatus.Aborted)]
[TestCase("failed_with_incident", QueryStatus.FailedWithIncident)]
[TestCase("FAILED_WITH_INCIDENT", QueryStatus.FailedWithIncident)]
[TestCase("disconnected", QueryStatus.Disconnected)]
[TestCase("DISCONNECTED", QueryStatus.Disconnected)]
[TestCase("restarted", QueryStatus.Restarted)]
[TestCase("RESTARTED", QueryStatus.Restarted)]
[TestCase("blocked", QueryStatus.Blocked)]
[TestCase("BLOCKED", QueryStatus.Blocked)]
public void TestGetQueryStatusByStringValue(string status, QueryStatus expectedStatus)
sfc-gh-ext-simba-lf marked this conversation as resolved.
Show resolved Hide resolved
{
if (expectedStatus == QueryStatus.Unknown)
{
var thrown = Assert.Throws<Exception>(() => QueryStatusExtensions.GetQueryStatusByStringValue(status));
Assert.IsTrue(thrown.Message.Contains("The query status returned by the server is not recognized"));
}
else
{
var actualStatus = QueryStatusExtensions.GetQueryStatusByStringValue(status);
Assert.AreEqual(expectedStatus, actualStatus);
}
}

[Test]
[TestCase(QueryStatus.Running, true)]
[TestCase(QueryStatus.ResumingWarehouse, true)]
[TestCase(QueryStatus.Queued, true)]
[TestCase(QueryStatus.QueuedReparingWarehouse, true)]
[TestCase(QueryStatus.NoData, true)]
[TestCase(QueryStatus.Aborting, false)]
[TestCase(QueryStatus.Success, false)]
[TestCase(QueryStatus.FailedWithError, false)]
[TestCase(QueryStatus.Aborted, false)]
[TestCase(QueryStatus.FailedWithIncident, false)]
[TestCase(QueryStatus.Disconnected, false)]
[TestCase(QueryStatus.Restarted, false)]
[TestCase(QueryStatus.Blocked, false)]
[TestCase(QueryStatus.Unknown, false)]
public void TestIsStillRunning(QueryStatus status, bool expectedResult)
{
Assert.AreEqual(expectedResult, QueryStatusExtensions.IsStillRunning(status));
}

[Test]
[TestCase(QueryStatus.Aborting, true)]
[TestCase(QueryStatus.FailedWithError, true)]
[TestCase(QueryStatus.Aborted, true)]
[TestCase(QueryStatus.FailedWithIncident, true)]
[TestCase(QueryStatus.Disconnected, true)]
[TestCase(QueryStatus.Blocked, true)]
[TestCase(QueryStatus.Running, false)]
[TestCase(QueryStatus.ResumingWarehouse, false)]
[TestCase(QueryStatus.Queued, false)]
[TestCase(QueryStatus.QueuedReparingWarehouse, false)]
[TestCase(QueryStatus.NoData, false)]
[TestCase(QueryStatus.Success, false)]
[TestCase(QueryStatus.Restarted, false)]
[TestCase(QueryStatus.Unknown, false)]
public void TestIsAnError(QueryStatus status, bool expectedResult)
{
Assert.AreEqual(expectedResult, QueryStatusExtensions.IsAnError(status));
}
}
}
180 changes: 175 additions & 5 deletions Snowflake.Data/Client/SnowflakeDbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,27 @@
using System.Threading.Tasks;
using Newtonsoft.Json;
using Snowflake.Data.Log;
using System.Text.RegularExpressions;

namespace Snowflake.Data.Client
{
[System.ComponentModel.DesignerCategory("Code")]
public class SnowflakeDbCommand : DbCommand
{
private DbConnection connection;
private SnowflakeDbConnection connection;

private SFStatement sfStatement;

private SnowflakeDbParameterCollection parameterCollection;

private SFLogger logger = SFLoggerFactory.GetLogger<SnowflakeDbCommand>();

// Async max retry and retry pattern
private const int AsyncNoDataMaxRetry = 24;
private readonly int[] _asyncRetryPattern = { 1, 1, 2, 3, 4, 8, 10 };
sfc-gh-knozderko marked this conversation as resolved.
Show resolved Hide resolved

private static readonly Regex UuidRegex = new Regex("^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");

public SnowflakeDbCommand()
{
logger.Debug("Constructing SnowflakeDbCommand class");
Expand Down Expand Up @@ -274,6 +281,169 @@
}
}

/// <summary>
/// Execute a query in async mode.
sfc-gh-knozderko marked this conversation as resolved.
Show resolved Hide resolved
/// Async mode means the server will respond immediately with the query ID and execute the query asynchronously
/// </summary>
/// <returns>The query id.</returns>
public string ExecuteInAsyncMode()
{
logger.Debug($"ExecuteInAsyncMode");
SFBaseResultSet resultSet = ExecuteInternal(asyncExec: true);
sfc-gh-knozderko marked this conversation as resolved.
Show resolved Hide resolved
return resultSet.queryId;
}

/// <summary>
/// Executes an asynchronous query in async mode.
/// Async mode means the server will respond immediately with the query ID and execute the query asynchronously
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns>The query id.</returns>
public async Task<string> ExecuteAsyncInAsyncMode(CancellationToken cancellationToken)
{
logger.Debug($"ExecuteAsyncInAsyncMode");
var resultSet = await ExecuteInternalAsync(cancellationToken, asyncExec: true).ConfigureAwait(false);
return resultSet.queryId;
}

/// <summary>
/// Gets the query status based on query ID.
/// </summary>
/// <param name="queryId"></param>
/// <returns>The query status.</returns>
public QueryStatus GetQueryStatus(string queryId)
{
logger.Debug($"GetQueryStatus");

if (UuidRegex.IsMatch(queryId))
{
var sfStatement = new SFStatement(connection.SfSession);
return sfStatement.GetQueryStatus(queryId);
}
else
{
var errorMessage = $"The given query id {queryId} is not valid uuid";
logger.Error(errorMessage);
throw new Exception(errorMessage);
}
}

/// <summary>
/// Gets the query status based on query ID.
/// </summary>
/// <param name="queryId"></param>
/// <param name="cancellationToken"></param>
/// <returns>The query status.</returns>
public async Task<QueryStatus> GetQueryStatusAsync(string queryId, CancellationToken cancellationToken)
{
logger.Debug($"GetQueryStatusAsync");

// Check if queryId is valid uuid
if (UuidRegex.IsMatch(queryId))
{
var sfStatement = new SFStatement(connection.SfSession);
return await sfStatement.GetQueryStatusAsync(queryId, cancellationToken).ConfigureAwait(false);
}
else
{
var errorMessage = $"The given query id {queryId} is not valid uuid";
logger.Error(errorMessage);
throw new Exception(errorMessage);
}
}

/// <summary>
/// Checks query status until it is done executing.
/// </summary>
/// <param name="queryId"></param>
/// <param name="cancellationToken"></param>
/// <param name="isAsync"></param>
internal async Task RetryUntilQueryResultIsAvailable(string queryId, CancellationToken cancellationToken, bool isAsync)
{
int retryPatternPos = 0;
int noDataCounter = 0;

QueryStatus status;
while (true)
{
if (cancellationToken.IsCancellationRequested)
{
logger.Debug("Cancellation requested for getting results from query id");
cancellationToken.ThrowIfCancellationRequested();
}

Check warning on line 373 in Snowflake.Data/Client/SnowflakeDbCommand.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Client/SnowflakeDbCommand.cs#L373

Added line #L373 was not covered by tests

status = isAsync ? await GetQueryStatusAsync(queryId, cancellationToken) : GetQueryStatus(queryId);

if (!QueryStatusExtensions.IsStillRunning(status))
{
return;
}

// Timeout based on query status retry rules
if (isAsync)
{
await Task.Delay(TimeSpan.FromSeconds(_asyncRetryPattern[retryPatternPos]), cancellationToken).ConfigureAwait(false);
}
else
{
Thread.Sleep(TimeSpan.FromSeconds(_asyncRetryPattern[retryPatternPos]));
}

// If no data, increment the no data counter
if (status == QueryStatus.NoData)
{
noDataCounter++;

// Check if retry for no data is exceeded
if (noDataCounter > AsyncNoDataMaxRetry)
{
var errorMessage = "Max retry for no data is reached";
logger.Error(errorMessage);
throw new Exception(errorMessage);
}
}

if (retryPatternPos < _asyncRetryPattern.Length - 1)
{
retryPatternPos++;
}
}
}

/// <summary>
/// Gets the query results based on query ID.
/// </summary>
/// <param name="queryId"></param>
/// <returns>The query results.</returns>
public DbDataReader GetResultsFromQueryId(string queryId)
{
logger.Debug($"GetResultsFromQueryId");

Task task = RetryUntilQueryResultIsAvailable(queryId, CancellationToken.None, false);
task.Wait();

SFBaseResultSet resultSet = sfStatement.GetResultWithId(queryId);

return new SnowflakeDbDataReader(this, resultSet);
}

/// <summary>
/// Gets the query results based on query ID.
/// </summary>
/// <param name="queryId"></param>
/// <param name="cancellationToken"></param>
/// <returns>The query results.</returns>
public async Task<DbDataReader> GetResultsFromQueryIdAsync(string queryId, CancellationToken cancellationToken)
{
logger.Debug($"GetResultsFromQueryIdAsync");

await RetryUntilQueryResultIsAvailable(queryId, cancellationToken, true);

SFBaseResultSet resultSet = await sfStatement.GetResultWithIdAsync(queryId, cancellationToken).ConfigureAwait(false);

return new SnowflakeDbDataReader(this, resultSet);
}

private static Dictionary<string, BindingDTO> convertToBindList(List<SnowflakeDbParameter> parameters)
{
if (parameters == null || parameters.Count == 0)
Expand Down Expand Up @@ -354,18 +524,18 @@
this.sfStatement = new SFStatement(session);
}

private SFBaseResultSet ExecuteInternal(bool describeOnly = false)
private SFBaseResultSet ExecuteInternal(bool describeOnly = false, bool asyncExec = false)
{
CheckIfCommandTextIsSet();
SetStatement();
return sfStatement.Execute(CommandTimeout, CommandText, convertToBindList(parameterCollection.parameterList), describeOnly);
return sfStatement.Execute(CommandTimeout, CommandText, convertToBindList(parameterCollection.parameterList), describeOnly, asyncExec);
}

private Task<SFBaseResultSet> ExecuteInternalAsync(CancellationToken cancellationToken, bool describeOnly = false)
private Task<SFBaseResultSet> ExecuteInternalAsync(CancellationToken cancellationToken, bool describeOnly = false, bool asyncExec = false)
{
CheckIfCommandTextIsSet();
SetStatement();
return sfStatement.ExecuteAsync(CommandTimeout, CommandText, convertToBindList(parameterCollection.parameterList), describeOnly, cancellationToken);
return sfStatement.ExecuteAsync(CommandTimeout, CommandText, convertToBindList(parameterCollection.parameterList), describeOnly, asyncExec, cancellationToken);
}

private void CheckIfCommandTextIsSet()
Expand Down
Loading
Loading