Skip to content

Commit

Permalink
SNOW-1344341 make new pool as a default one (#931)
Browse files Browse the repository at this point in the history
### Description
Make new pool as a default one.

### Checklist
- [x] Code compiles correctly
- [x] Code is formatted according to [Coding
Conventions](../blob/master/CodingConventions.md)
- [x] Created tests which fail without the change (if possible)
- [x] All tests passing (`dotnet test`)
- [x] Extended the README / documentation, if necessary
- [x] Provide JIRA issue id (if possible) or GitHub issue id in PR name
  • Loading branch information
sfc-gh-knozderko authored Apr 26, 2024
1 parent 43d8fd4 commit 4f2a167
Show file tree
Hide file tree
Showing 9 changed files with 535 additions and 425 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core;
using Snowflake.Data.Core.Session;
using Snowflake.Data.Tests.Mock;
using Snowflake.Data.Tests.Util;
Expand All @@ -29,6 +31,93 @@ public class ConnectionMultiplePoolsAsyncIT: SFBaseTestAsync
_previousPoolConfig.Reset();
}

[Test]
public async Task TestAddToPoolOnOpenAsync()
{
// arrange
var connection = new SnowflakeDbConnection(ConnectionString + "minPoolSize=1");

// act
await connection.OpenAsync().ConfigureAwait(false);

// assert
var pool = SnowflakeDbConnectionPool.GetPool(connection.ConnectionString);
Assert.AreEqual(1, pool.GetCurrentPoolSize());

// cleanup
await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false);
}

[Test]
public async Task TestDoNotAddToPoolInvalidConnectionAsync()
{
// arrange
var invalidConnectionString = ";connection_timeout=123";
var connection = new SnowflakeDbConnection(invalidConnectionString);

// act
try
{
await connection.OpenAsync().ConfigureAwait(false);
Assert.Fail("OpenAsync should fail for invalid connection string");
}
catch {}

// assert
var pool = SnowflakeDbConnectionPool.GetPool(connection.ConnectionString);
Assert.Less(pool.GetCurrentPoolSize(), SFSessionHttpClientProperties.DefaultMinPoolSize); // for invalid connection string it is used default min pool size

// cleanup
await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false);
}

[Test]
public void TestConnectionPoolWithInvalidOpenAsync()
{
// make the connection string unique so it won't pick up connection
// pooled by other test cases.
string connStr = ConnectionString + "minPoolSize=0;maxPoolSize=10;application=conn_pool_test_invalid_openasync2";
using (var connection = new SnowflakeDbConnection())
{
connection.ConnectionString = connStr;
// call openAsync but do not wait and destroy it direct
// so the session is initialized with empty token
connection.OpenAsync();
}

// use the same connection string to make a new connection
// to ensure the invalid connection made previously is not pooled
using (var connection1 = new SnowflakeDbConnection())
{
connection1.ConnectionString = connStr;
// this will not open a new session but get the invalid connection from pool
connection1.Open();
// Now run query with connection1
var command = connection1.CreateCommand();
command.CommandText = "select 1, 2, 3";

try
{
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
for (int i = 0; i < reader.FieldCount; i++)
{
// Process each column as appropriate
reader.GetFieldValue<object>(i);
}
}
}
}
catch (SnowflakeDbException)
{
// fail the test case if anything wrong.
Assert.Fail();
}
}
}

[Test]
public async Task TestMinPoolSizeAsync()
{
Expand Down Expand Up @@ -88,5 +177,23 @@ public async Task TestReleaseConnectionWhenRollbackFailsAsync()
// assert
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "Should not return connection to the pool");
}

[Test(Description = "test connection pooling with concurrent connection using async calls")]
public void TestConcurrentConnectionPoolingAsync()
{
// add test case name in connection string to make in unique for each test case
// set short expiration timeout to cover the case that connection expired
string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingAsync2;ExpirationTimeout=3";
ConnectionSinglePoolCacheAsyncIT.ConcurrentPoolingAsyncHelper(connStr, true, 7, 100, 2);
}

[Test(Description = "test connection pooling with concurrent connection and using async calls no close call for connection. Connection is closed when Dispose() is called by framework.")]
public void TestConcurrentConnectionPoolingDisposeAsync()
{
// add test case name in connection string to make in unique for each test case
// set short expiration timeout to cover the case that connection expired
string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingDisposeAsync2;ExpirationTimeout=3";
ConnectionSinglePoolCacheAsyncIT.ConcurrentPoolingAsyncHelper(connStr, false, 7, 100, 2);
}
}
}
107 changes: 107 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,112 @@ public void TestFailWhenPreventingFromReturningToPoolNotOpenedConnection()
// assert
Assert.That(thrown.Message, Does.Contain("Session not yet created for this connection. Unable to prevent the session from pooling"));
}

[Test]
public void TestRollbackTransactionOnPooledWhenExceptionOccurred()
{
var connectionString = SetPoolWithOneElement();
object firstOpenedSessionId;
using (var connection = new SnowflakeDbConnection(connectionString))
{
connection.Open();
firstOpenedSessionId = connection.SfSession.sessionId;
connection.BeginTransaction();
Assert.AreEqual(true, connection.HasActiveExplicitTransaction());
Assert.Throws<SnowflakeDbException>(() =>
{
using (var command = connection.CreateCommand())
{
command.CommandText = "invalid command will throw exception and leave session with an unfinished transaction";
command.ExecuteNonQuery();
}
});
}

using (var connectionWithSessionReused = new SnowflakeDbConnection(connectionString))
{
connectionWithSessionReused.Open();

Assert.AreEqual(firstOpenedSessionId, connectionWithSessionReused.SfSession.sessionId);
Assert.AreEqual(false, connectionWithSessionReused.HasActiveExplicitTransaction());
using (var cmd = connectionWithSessionReused.CreateCommand())
{
cmd.CommandText = "SELECT CURRENT_TRANSACTION()";
Assert.AreEqual(DBNull.Value, cmd.ExecuteScalar());
}
}

Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be reused and any pending transaction rolled back before it gets back to the pool");
}

[Test]
public void TestTransactionStatusNotTrackedForNonExplicitTransactionCalls()
{
var connectionString = SetPoolWithOneElement();
using (var connection = new SnowflakeDbConnection(connectionString))
{
connection.Open();
using (var command = connection.CreateCommand())
{
command.CommandText = "BEGIN"; // in general can be put as a part of a multi statement call and mixed with commit as well
command.ExecuteNonQuery();
Assert.AreEqual(false, connection.HasActiveExplicitTransaction());
}
}
}

[Test]
public void TestRollbackTransactionOnPooledWhenConnectionClose()
{
var connectionString = SetPoolWithOneElement();
Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool");

string firstOpenedSessionId;
using (var connection1 = new SnowflakeDbConnection(connectionString))
{
connection1.Open();
Assert.AreEqual(ExpectedPoolCountAfterOpen(), SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection session is added to the pool after close connection");
connection1.BeginTransaction();
Assert.AreEqual(true, connection1.HasActiveExplicitTransaction());
using (var command = connection1.CreateCommand())
{
firstOpenedSessionId = connection1.SfSession.sessionId;
command.CommandText = "SELECT CURRENT_TRANSACTION()";
Assert.AreNotEqual(DBNull.Value, command.ExecuteScalar());
}
}
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool");

using (var connection2 = new SnowflakeDbConnection(connectionString))
{
connection2.Open();
Assert.AreEqual(ExpectedPoolCountAfterOpen(), SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection session should be now removed from the pool");
Assert.AreEqual(false, connection2.HasActiveExplicitTransaction());
using (var command = connection2.CreateCommand())
{
Assert.AreEqual(firstOpenedSessionId, connection2.SfSession.sessionId);
command.CommandText = "SELECT CURRENT_TRANSACTION()";
Assert.AreEqual(DBNull.Value, command.ExecuteScalar());
}
}
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool");
}


private string SetPoolWithOneElement()
{
if (_connectionPoolTypeUnderTest == ConnectionPoolType.SingleConnectionCache)
{
SnowflakeDbConnectionPool.SetMaxPoolSize(1);
return ConnectionString;
}
return ConnectionString + "maxPoolSize=1;minPoolSize=0";
}

private int ExpectedPoolCountAfterOpen()
{
return _connectionPoolTypeUnderTest == ConnectionPoolType.SingleConnectionCache ? 0 : 1;
}

}
}
Loading

0 comments on commit 4f2a167

Please sign in to comment.