Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asynchronous query abilities #421

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 142 additions & 16 deletions Snowflake.Data.Tests/SFDbCommandIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void TestRowsAffectedOverflowInt()

}

[TestFixture]
[TestFixture]
class SFDbCommandIT : SFBaseTest
{
[Test]
Expand All @@ -163,7 +163,7 @@ public void TestSimpleCommand()
cmd.CommandType = CommandType.StoredProcedure;
Assert.Fail();
}
catch(SnowflakeDbException e)
catch (SnowflakeDbException e)
{
Assert.AreEqual(270009, e.ErrorCode);
}
Expand All @@ -174,7 +174,7 @@ public void TestSimpleCommand()
cmd.UpdatedRowSource = UpdateRowSource.FirstReturnedRecord;
Assert.Fail();
}
catch(SnowflakeDbException e)
catch (SnowflakeDbException e)
{
Assert.AreEqual(270009, e.ErrorCode);
}
Expand All @@ -185,7 +185,7 @@ public void TestSimpleCommand()
cmd.Connection = null;
Assert.Fail();
}
catch(SnowflakeDbException e)
catch (SnowflakeDbException e)
{
Assert.AreEqual(270009, e.ErrorCode);
}
Expand All @@ -196,7 +196,7 @@ public void TestSimpleCommand()
((SnowflakeDbCommand)cmd).DesignTimeVisible = true;
Assert.Fail();
}
catch(SnowflakeDbException e)
catch (SnowflakeDbException e)
{
Assert.AreEqual(270009, e.ErrorCode);
}
Expand All @@ -212,7 +212,7 @@ public void TestSimpleCommand()
// Skip SimpleLargeResultSet test on GCP as it will fail
// on row 8192 consistently on Appveyor.
[IgnoreOnEnvIs("snowflake_cloud_env",
new string[] {"GCP" })]
new string[] { "GCP" })]
public void TestSimpleLargeResultSet()
{
using (IDbConnection conn = new SnowflakeDbConnection())
Expand Down Expand Up @@ -334,7 +334,7 @@ public void TestCancelQuery()
cmd.ExecuteScalar();
Assert.Fail();
}
catch(SnowflakeDbException e)
catch (SnowflakeDbException e)
{
// 604 is error code from server meaning query has been canceled
if (604 != e.ErrorCode)
Expand Down Expand Up @@ -383,7 +383,7 @@ public void TestQueryTimeout()
// timelimit = 17min
cmd.CommandText = "select count(seq4()) from table(generator(timelimit => 1020)) v";
// timeout = 16min - Using a timeout > default Rest timeout of 15min
cmd.CommandTimeout = 16*60;
cmd.CommandTimeout = 16 * 60;

try
{
Expand All @@ -396,7 +396,7 @@ public void TestQueryTimeout()
Assert.GreaterOrEqual(stopwatch.ElapsedMilliseconds, 16 * 60 * 1000);
Assert.Fail();
}
catch(SnowflakeDbException e)
catch (SnowflakeDbException e)
{
// 604 is error code from server meaning query has been canceled
Assert.AreEqual(e.ErrorCode, 604);
Expand Down Expand Up @@ -450,7 +450,7 @@ public void TestTransaction()
Assert.IsTrue(reader.Read());
Assert.AreEqual("test", reader.GetString(0));
command.Transaction.Rollback();

// no value will be in table since it has been rollbacked
command.CommandText = "select * from testtransaction";
reader = command.ExecuteReader();
Expand All @@ -475,7 +475,7 @@ public void TestRowsAffected()

int[] expectedResult =
{
0, 2, 1, 0
0, 2, 1, 0
};

using (IDbConnection conn = new SnowflakeDbConnection())
Expand All @@ -487,7 +487,7 @@ public void TestRowsAffected()
using (IDbCommand command = conn.CreateCommand())
{
int rowsAffected = -1;
for (int i=0; i<testCommands.Length; i++)
for (int i = 0; i < testCommands.Length; i++)
{
command.CommandText = testCommands[i];
rowsAffected = command.ExecuteNonQuery();
Expand All @@ -496,7 +496,7 @@ public void TestRowsAffected()
}
}
conn.Close();
}
}
}

[Test]
Expand All @@ -521,11 +521,11 @@ public void TestExecuteScalarNull()
[Test]
public void TestCreateCommandBeforeOpeningConnection()
{
using(var conn = new SnowflakeDbConnection())
using (var conn = new SnowflakeDbConnection())
{
conn.ConnectionString = ConnectionString;
using(var command = conn.CreateCommand())

using (var command = conn.CreateCommand())
{
conn.Open();
command.CommandText = "select 1";
Expand Down Expand Up @@ -571,4 +571,130 @@ public void TestRowsAffectedUnload()
}
}
}

[TestFixture]
class SFDbCommandAsynchronous : SFBaseTest
{
SnowflakeDbConnection StartSnowflakeConnection()
{
var conn = new SnowflakeDbConnection();
conn.ConnectionString = ConnectionString;

conn.Open();

return conn;
}

[Test]
public void TestLongRunningQuery()
{
string queryId;
using (var conn = StartSnowflakeConnection())
{
using (var cmd = (SnowflakeDbCommand)conn.CreateCommand())
{
cmd.CommandText = "select count(seq4()) from table(generator(timelimit => 15)) v order by 1";
var status = cmd.StartAsynchronousQuery();
Assert.False(status.IsQueryDone);
Assert.False(status.IsQuerySuccessful);
queryId = status.QueryId;
}

Assert.IsNotEmpty(queryId);
}

// start a new connection to make sure works across sessions
using (var conn = StartSnowflakeConnection())
{

SnowflakeQueryStatus status;
do
{
status = SnowflakeDbAsynchronousQueryHelper.GetQueryStatus(conn, queryId);
if (status.IsQueryDone)
{
break;
}
else
{
Assert.False(status.IsQuerySuccessful);
}

Thread.Sleep(5000);
} while (true);


// once it finished, it should be successfull
Assert.True(status.IsQuerySuccessful);
}

// start a new connection to make sure works across sessions
using (var conn = StartSnowflakeConnection())
{

using (var cmd = SnowflakeDbAsynchronousQueryHelper.CreateQueryResultsCommand(conn, queryId))
{
using (IDataReader reader = cmd.ExecuteReader())
{
// only one result is returned
Assert.IsTrue(reader.Read());
}
}

conn.Close();
}

}

[Test]
public void TestSimpleCommand()
{
string queryId;

using (var conn = StartSnowflakeConnection())
{

using (var cmd = (SnowflakeDbCommand)conn.CreateCommand())
{
cmd.CommandText = "select 1";

var status = cmd.StartAsynchronousQuery();
// even a fast asynchronous call will not be done initially
Assert.False(status.IsQueryDone);
Assert.False(status.IsQuerySuccessful);
queryId = status.QueryId;

Assert.IsNotEmpty(queryId);
}
}

// start a new connection to make sure works across sessions
using (var conn = StartSnowflakeConnection())
{
SnowflakeQueryStatus status;
status = SnowflakeDbAsynchronousQueryHelper.GetQueryStatus(conn, queryId);
// since query is so fast, expect it to be done the first time we check the status
Assert.True(status.IsQueryDone);
Assert.True(status.IsQuerySuccessful);
}

// start a new connection to make sure works across sessions
using (var conn = StartSnowflakeConnection())
{

// because this query is so quick, we do not need to check the status before fetching the result

using (var cmd = SnowflakeDbAsynchronousQueryHelper.CreateQueryResultsCommand(conn, queryId))
{
var val = cmd.ExecuteScalar();

Assert.AreEqual(1L, (long)val);

}

conn.Close();
}
}

}
}
6 changes: 3 additions & 3 deletions Snowflake.Data.Tests/SFStatementTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void TestSessionRenew()
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
Assert.AreEqual("new_session_token", sfSession.sessionToken);
Expand All @@ -37,7 +37,7 @@ public void TestSessionRenewDuringQueryExec()
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false, false);
Assert.AreEqual(true, resultSet.Next());
Assert.AreEqual("1", resultSet.GetString(0));
}
Expand All @@ -57,7 +57,7 @@ public void TestServiceName()
for (int i = 0; i < 5; i++)
{
SFStatement statement = new SFStatement(sfSession);
SFBaseResultSet resultSet = statement.Execute(0, "SELECT 1", null, false);
SFBaseResultSet resultSet = statement.Execute(0, "SELECT 1", null, false, false);
expectServiceName += "a";
Assert.AreEqual(expectServiceName, sfSession.ParameterMap[SFSessionParameter.SERVICE_NAME]);
}
Expand Down
98 changes: 98 additions & 0 deletions Snowflake.Data/Client/SnowflakeDbAsynchronousQueryHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using Snowflake.Data.Core;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Snowflake.Data.Client
{
/// <summary>
/// Methods to help perform asynchronous queries.
/// </summary>
public static class SnowflakeDbAsynchronousQueryHelper
{
/// <summary>
/// Starts a query asynchronously.
/// </summary>
/// <param name="cmd"></param>
/// <returns>The query id.</returns>
public static SnowflakeQueryStatus StartAsynchronousQuery(SnowflakeDbCommand cmd)
{
return cmd.StartAsynchronousQuery();
}

/// <summary>
/// Starts a query asynchronously.
/// </summary>
/// <param name="cmd"></param>
/// <param name="cancellationToken"></param>
/// <returns>The query id.</returns>
public static async Task<SnowflakeQueryStatus> StartAsynchronousQueryAsync(SnowflakeDbCommand cmd, CancellationToken cancellationToken)
{
return await cmd.StartAsynchronousQueryAsync(cancellationToken).ConfigureAwait(false);
}

// https://docs.snowflake.com/en/sql-reference/functions/result_scan.html
// select * from table(result_scan('query id'));

// https://docs.snowflake.com/en/sql-reference/functions/query_history.html
// select *
// from table(information_schema.query_history())
// only returns

// https://docs.snowflake.com/en/sql-reference/account-usage/query_history.html
// Latency for the view may be up to 45 minutes.

/// <summary>
/// Use to get the status of a query to determine if you can fetch the result.
/// </summary>
/// <param name="conn"></param>
/// <param name="queryId"></param>
/// <returns></returns>
public static SnowflakeQueryStatus GetQueryStatus(SnowflakeDbConnection conn, string queryId)
{
return GetQueryStatusAsync(conn, queryId, CancellationToken.None).Result;
}

/// <summary>
/// Use to get the status of a query to determine if you can fetch the result.
/// </summary>
/// <param name="conn"></param>
/// <param name="queryId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async Task<SnowflakeQueryStatus> GetQueryStatusAsync(SnowflakeDbConnection conn,
string queryId, CancellationToken cancellationToken)
{
return await GetStatusUsingRestApiAsync(conn, queryId, cancellationToken).ConfigureAwait(false);
}

private static async Task<SnowflakeQueryStatus> GetStatusUsingRestApiAsync(SnowflakeDbConnection conn, string queryId, CancellationToken cancellationToken)
{

var sfStatement = new SFStatement(conn.SfSession);
var r = await sfStatement.CheckQueryStatusAsync(0, queryId, cancellationToken).ConfigureAwait(false);
return r;
}

/// <summary>
/// Can use the resulting <see cref="SnowflakeDbCommand"/> to fetch the results of the query.
/// </summary>
/// <param name="conn"></param>
/// <param name="queryId"></param>
/// <returns></returns>
public static SnowflakeDbCommand CreateQueryResultsCommand(SnowflakeDbConnection conn, string queryId)
{
return CreateQueryResultsCommandForRestApi(conn, queryId);
}

private static SnowflakeDbCommand CreateQueryResultsCommandForRestApi(SnowflakeDbConnection conn, string queryId)
{
var cmd = (SnowflakeDbCommand)conn.CreateCommand();
cmd.HandleAsyncResponse = true;
cmd.CommandText = queryId;
return cmd;
}
}
}
Loading
Loading