diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolAsyncIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolAsyncIT.cs new file mode 100644 index 000000000..a3a9d8416 --- /dev/null +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolAsyncIT.cs @@ -0,0 +1,360 @@ +/* + * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. + */ + +using System; +using System.Data; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using Moq; +using Snowflake.Data.Client; +using Snowflake.Data.Core; +using Snowflake.Data.Tests.Mock; +using Snowflake.Data.Tests.Util; + +namespace Snowflake.Data.Tests.IntegrationTests +{ + [TestFixture, NonParallelizable] + class SFConnectionPoolITAsync : SFBaseTestAsync + { + private static readonly PoolConfig s_previousPoolConfigRestorer = new PoolConfig(); + + [SetUp] + public new void BeforeTest() + { + s_previousPoolConfigRestorer.Reset(); + SnowflakeDbConnectionPool.SetPooling(true); + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [TearDown] + public new void AfterTest() + { + s_previousPoolConfigRestorer.Reset(); + } + + [OneTimeTearDown] + public static void AfterAllTests() + { + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [Test] + public void TestConnectionPoolWithAsync() + { + using (var conn = new MockSnowflakeDbConnection()) + { + SnowflakeDbConnectionPool.SetMaxPoolSize(1); + SnowflakeDbConnectionPool.ClearAllPools(); + + int timeoutSec = 0; + string infiniteLoginTimeOut = String.Format("" + ";connection_timeout={0}", + timeoutSec); + + conn.ConnectionString = infiniteLoginTimeOut; + + Assert.AreEqual(conn.State, ConnectionState.Closed); + + CancellationTokenSource connectionCancelToken = new CancellationTokenSource(); + try + { + conn.OpenAsync(connectionCancelToken.Token); + } + catch (SnowflakeDbException ex) + { + Console.WriteLine("connection failed:" + ex); + conn.CloseAsync(connectionCancelToken.Token); + } + + Thread.Sleep(10 * 1000); + Assert.AreEqual(ConnectionState.Closed, conn.State); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + } + } + + [Test] + public void TestConnectionPoolWithInvalidOpenAsync() + { + SnowflakeDbConnectionPool.SetMaxPoolSize(10); + // make the connection string unique so it won't pick up connection + // pooled by other test cases. + string connStr = ConnectionString + ";application=conn_pool_test_invalid_openasync"; + using (var connection = new SnowflakeDbConnection()) + { + connection.ConnectionString = connStr; + // call openAsync but do not wait and destroy it direct + // so the session is initialized with empty token + connection.OpenAsync(); + } + + // use the same connection string to make a new connection + // to ensure the invalid connection made previously is not pooled + using (var connection1 = new SnowflakeDbConnection()) + { + connection1.ConnectionString = connStr; + // this will not open a new session but get the invalid connection from pool + connection1.Open(); + // Now run query with connection1 + var command = connection1.CreateCommand(); + command.CommandText = "select 1, 2, 3"; + + try + { + using (var reader = command.ExecuteReader()) + { + while (reader.Read()) + { + for (int i = 0; i < reader.FieldCount; i++) + { + // Process each column as appropriate + reader.GetFieldValue(i); + } + } + } + } + catch (SnowflakeDbException) + { + // fail the test case if anything wrong. + Assert.Fail(); + } + } + } + + [Test] + // test connection pooling with concurrent connection using async calls + public void TestConcurrentConnectionPoolingAsync() + { + // add test case name in connection string to make in unique for each test case + string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingAsync"; + ConcurrentPoolingAsyncHelper(connStr, true); + } + + [Test] + public void TestRollbackTransactionOnPooledWhenExceptionOccurred() + { + SnowflakeDbConnectionPool.SetMaxPoolSize(1); + + object firstOpenedSessionId; + using (var connection = new SnowflakeDbConnection()) + { + connection.ConnectionString = ConnectionString; + connection.Open(); + firstOpenedSessionId = connection.SfSession.sessionId; + connection.BeginTransaction(); + Assert.AreEqual(true, connection.HasActiveExplicitTransaction()); + Assert.Throws(() => + { + using (var command = connection.CreateCommand()) + { + command.CommandText = "invalid command will throw exception and leave session with an unfinished transaction"; + command.ExecuteNonQuery(); + } + }); + } + + using (var connectionWithSessionReused = new SnowflakeDbConnection()) + { + connectionWithSessionReused.ConnectionString = ConnectionString; + connectionWithSessionReused.Open(); + + Assert.AreEqual(firstOpenedSessionId, connectionWithSessionReused.SfSession.sessionId); + Assert.AreEqual(false, connectionWithSessionReused.HasActiveExplicitTransaction()); + using (var cmd = connectionWithSessionReused.CreateCommand()) + { + cmd.CommandText = "SELECT CURRENT_TRANSACTION()"; + Assert.AreEqual(DBNull.Value, cmd.ExecuteScalar()); + } + } + + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be reused and any pending transaction rolled back before it gets back to the pool"); + } + + [Test] + public void TestTransactionStatusNotTrackedForNonExplicitTransactionCalls() + { + SnowflakeDbConnectionPool.SetMaxPoolSize(1); + using (var connection = new SnowflakeDbConnection()) + { + connection.ConnectionString = ConnectionString; + connection.Open(); + using (var command = connection.CreateCommand()) + { + command.CommandText = "BEGIN"; // in general can be put as a part of a multi statement call and mixed with commit as well + command.ExecuteNonQuery(); + Assert.AreEqual(false, connection.HasActiveExplicitTransaction()); + } + } + } + + [Test] + public void TestRollbackTransactionOnPooledWhenConnectionClose() + { + SnowflakeDbConnectionPool.SetMaxPoolSize(1); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool"); + + string firstOpenedSessionId; + using (var connection1 = new SnowflakeDbConnection()) + { + connection1.ConnectionString = ConnectionString; + connection1.Open(); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection session is added to the pool after close connection"); + connection1.BeginTransaction(); + Assert.AreEqual(true, connection1.HasActiveExplicitTransaction()); + using (var command = connection1.CreateCommand()) + { + firstOpenedSessionId = connection1.SfSession.sessionId; + command.CommandText = "SELECT CURRENT_TRANSACTION()"; + Assert.AreNotEqual(DBNull.Value, command.ExecuteScalar()); + } + } + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool"); + + using (var connection2 = new SnowflakeDbConnection()) + { + connection2.ConnectionString = ConnectionString; + connection2.Open(); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection session should be now removed from the pool"); + Assert.AreEqual(false, connection2.HasActiveExplicitTransaction()); + using (var command = connection2.CreateCommand()) + { + Assert.AreEqual(firstOpenedSessionId, connection2.SfSession.sessionId); + command.CommandText = "SELECT CURRENT_TRANSACTION()"; + Assert.AreEqual(DBNull.Value, command.ExecuteScalar()); + } + } + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool"); + } + + [Test] + public void TestFailureOfTransactionRollbackOnConnectionClosePreventsAddingToPool() + { + SnowflakeDbConnectionPool.SetPooling(true); + SnowflakeDbConnectionPool.SetMaxPoolSize(10); + var commandThrowingExceptionOnlyForRollback = new Mock(); + commandThrowingExceptionOnlyForRollback.CallBase = true; + commandThrowingExceptionOnlyForRollback.SetupSet(it => it.CommandText = "ROLLBACK") + .Throws(new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unexpected failure on transaction rollback when connection is returned to the pool with pending transaction")); + var mockDbProviderFactory = new Mock(); + mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object); + + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + using (var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object)) + { + connection.ConnectionString = ConnectionString; + connection.Open(); + connection.BeginTransaction(); + Assert.AreEqual(true, connection.HasActiveExplicitTransaction()); + // no Rollback or Commit; during internal Rollback while closing a connection a mocked exception will be thrown + } + + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Should not return connection to the pool"); + } + + [Test] + // test connection pooling with concurrent connection and using async calls no close + // call for connection. Connection is closed when Dispose() is called + // by framework. + public void TestConcurrentConnectionPoolingDisposeAsync() + { + // add test case name in connection string to make in unique for each test case + string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingDisposeAsync"; + ConcurrentPoolingAsyncHelper(connStr, false); + } + + static void ConcurrentPoolingAsyncHelper(string connectionString, bool closeConnection) + { + // task number a bit larger than pool size so some connections + // would fail on pooling while some connections could success + const int TaskNum = 12; + // set short pooling timeout to cover the case that connection expired + const int PoolTimeout = 3; + + // reset to default settings in case it changed by other test cases + SnowflakeDbConnectionPool.SetPooling(true); + SnowflakeDbConnectionPool.SetMaxPoolSize(10); + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetTimeout(PoolTimeout); + + var tasks = new Task[TaskNum + 1]; + for (int i = 0; i < TaskNum; i++) + { + tasks[i] = QueryExecutionTaskAsync(connectionString, closeConnection); + } + // cover the case of invalid sessions to ensure that won't + // break connection pooling + tasks[TaskNum] = InvalidConnectionTaskAsync(connectionString); + Task.WaitAll(tasks); + + // set pooling timeout back to default to avoid impact on other test cases + SnowflakeDbConnectionPool.SetTimeout(3600); + } + + // task to execute query with new connection in a loop + static async Task QueryExecutionTaskAsync(string connectionString, bool closeConnection) + { + for (int i = 0; i < 100; i++) + { + using (var conn = new SnowflakeDbConnection(connectionString)) + { + await conn.OpenAsync(); + using (DbCommand cmd = conn.CreateCommand()) + { + cmd.CommandText = "select 1, 2, 3"; + try + { + using (DbDataReader reader = await cmd.ExecuteReaderAsync()) + { + while (await reader.ReadAsync()) + { + for (int j = 0; j < reader.FieldCount; j++) + { + // Process each column as appropriate + await reader.GetFieldValueAsync(j); + } + } + } + } + catch (Exception e) + { + Assert.Fail("Caught unexpected exception: " + e); + } + } + + if (closeConnection) + { + await conn.CloseAsync(new CancellationTokenSource().Token); + } + } + } + } + + // task to generate invalid(not finish open) connections in a loop + static async Task InvalidConnectionTaskAsync(string connectionString) + { + for (int i = 0; i < 100; i++) + { + using (var conn = new SnowflakeDbConnection(connectionString)) + { + // intentially not using await so the connection + // will be disposed with invalid underlying session + conn.OpenAsync(); + }; + // wait 100ms each time so the invalid sessions are generated + // roughly at the same speed as connections for query tasks + await Task.Delay(100); + } + } + + private class TestSnowflakeDbConnection : SnowflakeDbConnection + { + public TestSnowflakeDbConnection(DbProviderFactory dbProviderFactory) + { + DbProviderFactory = dbProviderFactory; + } + + protected override DbProviderFactory DbProviderFactory { get; } + } + } +} diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolIT.cs new file mode 100644 index 000000000..75bdf9144 --- /dev/null +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolIT.cs @@ -0,0 +1,567 @@ +/* + * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. + */ + +using System; +using System.Data; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using Snowflake.Data.Core; +using Snowflake.Data.Client; +using Snowflake.Data.Core.Session; +using Snowflake.Data.Log; +using Snowflake.Data.Tests.Util; + +namespace Snowflake.Data.Tests.IntegrationTests +{ + [TestFixture(ConnectionPoolType.SingleConnectionCache)] + [TestFixture(ConnectionPoolType.MultipleConnectionPool)] + [NonParallelizable] + class SFConnectionPoolIT : SFBaseTest + { + private readonly ConnectionPoolType _connectionPoolTypeUnderTest; + private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); + private static PoolConfig s_previousPoolConfig; + + public SFConnectionPoolIT(ConnectionPoolType connectionPoolTypeUnderTest) + { + _connectionPoolTypeUnderTest = connectionPoolTypeUnderTest; + s_previousPoolConfig = new PoolConfig(); + SnowflakeDbConnectionPool.SetConnectionPoolVersion(connectionPoolTypeUnderTest); + } + + [SetUp] + public new void BeforeTest() + { + SnowflakeDbConnectionPool.GetPool(ConnectionString); // to instantiate the pool used in tests + SnowflakeDbConnectionPool.GetPool(ConnectionString + " retryCount=1"); + SnowflakeDbConnectionPool.GetPool(ConnectionString + " retryCount=2"); + SnowflakeDbConnectionPool.SetPooling(true); // TODO: when no session pool created it doesn't do anything!!!! maybe this state should be at pool management layer, not session pool layer + SnowflakeDbConnectionPool.ClearAllPools(); + s_logger.Debug($"---------------- BeforeTest ---------------------"); + s_logger.Debug($"Testing Pool Type: {SnowflakeDbConnectionPool.GetConnectionPoolVersion()}"); + } + + [TearDown] + public new void AfterTest() + { + s_previousPoolConfig.Reset(); + } + + [OneTimeTearDown] + public static void AfterAllTests() + { + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [Test] + // test connection pooling with concurrent connection + public void TestConcurrentConnectionPooling() + { + // add test case name in connection string to make in unique for each test case + string connStr = ConnectionString + ";application=TestConcurrentConnectionPooling"; + ConcurrentPoolingHelper(connStr, true); + } + + [Test] + // test connection pooling with concurrent connection and no close + // call for connection. Connection is closed when Dispose() is called + // by framework. + public void TestConcurrentConnectionPoolingDispose() + { + // add test case name in connection string to make in unique for each test case + string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingNoClose"; + ConcurrentPoolingHelper(connStr, false); + } + + static void ConcurrentPoolingHelper(string connectionString, bool closeConnection) + { + // thread number a bit larger than pool size so some connections + // would fail on pooling while some connections could success + const int ThreadNum = 12; + // set short pooling timeout to cover the case that connection expired + const int PoolTimeout = 3; + + // reset to default settings in case it changed by other test cases + SnowflakeDbConnectionPool.GetPool(connectionString); // to instantiate pool + SnowflakeDbConnectionPool.SetPooling(true); + SnowflakeDbConnectionPool.SetMaxPoolSize(10); + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetTimeout(PoolTimeout); + + var threads = new Task[ThreadNum]; + for (int i = 0; i < ThreadNum; i++) + { + threads[i] = Task.Factory.StartNew(() => + { + QueryExecutionThread(connectionString, closeConnection); + }); + } + Task.WaitAll(threads); + // set pooling timeout back to default to avoid impact on other test cases + SnowflakeDbConnectionPool.SetTimeout(3600); + } + + // thead to execute query with new connection in a loop + static void QueryExecutionThread(string connectionString, bool closeConnection) + { + for (int i = 0; i < 100; i++) + { + using (DbConnection conn = new SnowflakeDbConnection(connectionString)) + { + conn.Open(); + using (DbCommand cmd = conn.CreateCommand()) + { + cmd.CommandText = "select 1, 2, 3"; + try + { + using (var reader = cmd.ExecuteReader()) + { + while (reader.Read()) + { + for (int j = 0; j < reader.FieldCount; j++) + { + // Process each column as appropriate + reader.GetFieldValue(j); + } + } + } + } + catch (Exception e) + { + Assert.Fail("Caught unexpected exception: " + e); + } + } + + if (closeConnection) + { + conn.Close(); + } + } + } + } + + [Test] + public void TestBasicConnectionPool() + { + SnowflakeDbConnectionPool.SetMaxPoolSize(1); + + var conn1 = new SnowflakeDbConnection(ConnectionString); + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + conn1.Close(); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + } + + [Test] + public void TestConnectionPool() + { + SnowflakeDbConnectionPool.ClearAllPools(); + var conn1 = new SnowflakeDbConnection(ConnectionString); + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + conn1.Close(); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + + var conn2 = new SnowflakeDbConnection(); + conn2.ConnectionString = ConnectionString; + conn2.Open(); + Assert.AreEqual(ConnectionState.Open, conn2.State); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + + conn2.Close(); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(ConnectionState.Closed, conn2.State); + SnowflakeDbConnectionPool.ClearAllPools(); + } + + // [Test] + // public void TestConnectionPoolIsFull() + // { + // TestOnlyForOldPool(); + // + // SnowflakeDbConnectionPool.SetPooling(true); + // SnowflakeDbConnectionPool.ClearAllPools(); + // SnowflakeDbConnectionPool.SetMaxPoolSize(2); + // var conn1 = new SnowflakeDbConnection(); + // conn1.ConnectionString = ConnectionString; + // conn1.Open(); + // Assert.AreEqual(ConnectionState.Open, conn1.State); + // + // var conn2 = new SnowflakeDbConnection(); + // conn2.ConnectionString = ConnectionString + " retryCount=1"; + // conn2.Open(); + // Assert.AreEqual(ConnectionState.Open, conn2.State); + // + // var conn3 = new SnowflakeDbConnection(); + // conn3.ConnectionString = ConnectionString + " retryCount=2"; + // conn3.Open(); + // Assert.AreEqual(ConnectionState.Open, conn3.State); + // SnowflakeDbConnectionPool.ClearAllPools(); + // + // conn1.Close(); + // Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + // conn2.Close(); + // Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + // conn3.Close(); + // Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + // + // Assert.AreEqual(ConnectionState.Closed, conn1.State); + // Assert.AreEqual(ConnectionState.Closed, conn2.State); + // Assert.AreEqual(ConnectionState.Closed, conn3.State); + // SnowflakeDbConnectionPool.ClearAllPools(); + // } + + [Test] + public void TestNewConnectionPoolIsFull() + { + // TestOnlyForNewPool(); + + var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString); + SnowflakeDbConnectionPool.SetPooling(true); + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetMaxPoolSize(2); + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = ConnectionString; + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + + var conn2 = new SnowflakeDbConnection(); + conn2.ConnectionString = ConnectionString; + conn2.Open(); + Assert.AreEqual(ConnectionState.Open, conn2.State); + + var conn3 = new SnowflakeDbConnection(); + conn3.ConnectionString = ConnectionString; + conn3.Open(); + Assert.AreEqual(ConnectionState.Open, conn3.State); + SnowflakeDbConnectionPool.ClearAllPools(); + + conn1.Close(); + Assert.AreEqual(1, pool.GetCurrentPoolSize()); + conn2.Close(); + Assert.AreEqual(2, pool.GetCurrentPoolSize()); + conn3.Close(); + Assert.AreEqual(2, pool.GetCurrentPoolSize()); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(ConnectionState.Closed, conn2.State); + Assert.AreEqual(ConnectionState.Closed, conn3.State); + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [Test] + public void TestConnectionPoolExpirationWorks() + { + Thread.Sleep(10000); // wait for 10 seconds, in case other test still running. + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetMaxPoolSize(2); + SnowflakeDbConnectionPool.SetTimeout(10); + SnowflakeDbConnectionPool.SetPooling(true); + + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = ConnectionString; + + conn1.Open(); + conn1.Close(); + SnowflakeDbConnectionPool.SetTimeout(-1); + + var conn2 = new SnowflakeDbConnection(); + conn2.ConnectionString = ConnectionString; + conn2.Open(); + conn2.Close(); + var conn3 = new SnowflakeDbConnection(); + conn3.ConnectionString = ConnectionString; + conn3.Open(); + conn3.Close(); + + // The pooling timeout should apply to all connections being pooled, + // not just the connections created after the new setting, + // so expected result should be 0 + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + SnowflakeDbConnectionPool.SetPooling(false); + } + + [Test] + public void TestConnectionPoolClean() + { + TestOnlyForOldPool(); + + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetMaxPoolSize(2); + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = ConnectionString; + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + + var conn2 = new SnowflakeDbConnection(); + conn2.ConnectionString = ConnectionString + " retryCount=1"; + conn2.Open(); + Assert.AreEqual(ConnectionState.Open, conn2.State); + + var conn3 = new SnowflakeDbConnection(); + conn3.ConnectionString = ConnectionString + " retryCount=2"; + conn3.Open(); + Assert.AreEqual(ConnectionState.Open, conn3.State); + + conn1.Close(); + conn2.Close(); + Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + SnowflakeDbConnectionPool.ClearAllPools(); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + conn3.Close(); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(ConnectionState.Closed, conn2.State); + Assert.AreEqual(ConnectionState.Closed, conn3.State); + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [Test] + public void TestNewConnectionPoolClean() + { + TestOnlyForNewPool(); + + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetMaxPoolSize(2); + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = ConnectionString; + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + + var conn2 = new SnowflakeDbConnection(); + conn2.ConnectionString = ConnectionString + " retryCount=1"; + conn2.Open(); + Assert.AreEqual(ConnectionState.Open, conn2.State); + + var conn3 = new SnowflakeDbConnection(); + conn3.ConnectionString = ConnectionString + " retryCount=2"; + conn3.Open(); + Assert.AreEqual(ConnectionState.Open, conn3.State); + + conn1.Close(); + conn2.Close(); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(conn1.ConnectionString).GetCurrentPoolSize()); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(conn2.ConnectionString).GetCurrentPoolSize()); + SnowflakeDbConnectionPool.ClearAllPools(); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(conn1.ConnectionString).GetCurrentPoolSize()); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(conn2.ConnectionString).GetCurrentPoolSize()); + conn3.Close(); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(conn3.ConnectionString).GetCurrentPoolSize()); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(ConnectionState.Closed, conn2.State); + Assert.AreEqual(ConnectionState.Closed, conn3.State); + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [Test] + public void TestConnectionPoolFull() + { + TestOnlyForOldPool(); + + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetMaxPoolSize(2); + SnowflakeDbConnectionPool.SetPooling(true); + + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = ConnectionString; + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + + var conn2 = new SnowflakeDbConnection(); + conn2.ConnectionString = ConnectionString + " retryCount=1"; + conn2.Open(); + Assert.AreEqual(ConnectionState.Open, conn2.State); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + conn1.Close(); + conn2.Close(); + Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + var conn3 = new SnowflakeDbConnection(); + conn3.ConnectionString = ConnectionString + " retryCount=2"; + conn3.Open(); + Assert.AreEqual(ConnectionState.Open, conn3.State); + + var conn4 = new SnowflakeDbConnection(); + conn4.ConnectionString = ConnectionString + " retryCount=3"; + conn4.Open(); + Assert.AreEqual(ConnectionState.Open, conn4.State); + + conn3.Close(); + Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + conn4.Close(); + Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(ConnectionState.Closed, conn2.State); + Assert.AreEqual(ConnectionState.Closed, conn3.State); + Assert.AreEqual(ConnectionState.Closed, conn4.State); + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [Test] + public void TestNewConnectionPoolFull() + { + TestOnlyForNewPool(); + + var sessionPool = SnowflakeDbConnectionPool.GetPool(ConnectionString); + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetMaxPoolSize(2); + SnowflakeDbConnectionPool.SetPooling(true); + + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = ConnectionString; + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + + var conn2 = new SnowflakeDbConnection(); + conn2.ConnectionString = ConnectionString; + conn2.Open(); + Assert.AreEqual(ConnectionState.Open, conn2.State); + + Assert.AreEqual(0, sessionPool.GetCurrentPoolSize()); + conn1.Close(); + conn2.Close(); + Assert.AreEqual(2, sessionPool.GetCurrentPoolSize()); + + var conn3 = new SnowflakeDbConnection(); + conn3.ConnectionString = ConnectionString; + conn3.Open(); + Assert.AreEqual(ConnectionState.Open, conn3.State); + + var conn4 = new SnowflakeDbConnection(); + conn4.ConnectionString = ConnectionString; + conn4.Open(); + Assert.AreEqual(ConnectionState.Open, conn4.State); + + conn3.Close(); + Assert.AreEqual(1, sessionPool.GetCurrentPoolSize()); // TODO: when SNOW-937189 complete should be 2 + conn4.Close(); + Assert.AreEqual(2, sessionPool.GetCurrentPoolSize()); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(ConnectionState.Closed, conn2.State); + Assert.AreEqual(ConnectionState.Closed, conn3.State); + Assert.AreEqual(ConnectionState.Closed, conn4.State); + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [Test] + public void TestConnectionPoolMultiThreading() + { + Thread t1 = new Thread(() => ThreadProcess1(ConnectionString)); + Thread t2 = new Thread(() => ThreadProcess2(ConnectionString)); + + t1.Start(); + t2.Start(); + + t1.Join(); + t2.Join(); + } + + void ThreadProcess1(string connstr) + { + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = connstr; + conn1.Open(); + Thread.Sleep(1000); + conn1.Close(); + Thread.Sleep(4000); + Assert.AreEqual(ConnectionState.Closed, conn1.State); + } + + void ThreadProcess2(string connstr) + { + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = connstr; + conn1.Open(); + + Thread.Sleep(5000); + SFStatement statement = new SFStatement(conn1.SfSession); + SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false); + Assert.AreEqual(true, resultSet.Next()); + Assert.AreEqual("1", resultSet.GetString(0)); + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetMaxPoolSize(0); + SnowflakeDbConnectionPool.SetPooling(false); + } + + [Test] + public void TestConnectionPoolDisable() + { + SnowflakeDbConnectionPool.ClearAllPools(); + SnowflakeDbConnectionPool.SetPooling(false); + + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = ConnectionString; + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + conn1.Close(); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + } + + [Test] + public void TestConnectionPoolWithDispose() + { + SnowflakeDbConnectionPool.SetPooling(true); + SnowflakeDbConnectionPool.SetMaxPoolSize(1); + SnowflakeDbConnectionPool.ClearAllPools(); + + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = "bad connection string"; + try + { + conn1.Open(); + } + catch (SnowflakeDbException ex) + { + Console.WriteLine("connection failed:" + ex); + conn1.Close(); + } + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(conn1.ConnectionString).GetCurrentPoolSize()); + } + + [Test] + public void TestConnectionPoolTurnOff() + { + SnowflakeDbConnectionPool.SetPooling(false); + SnowflakeDbConnectionPool.SetPooling(true); + SnowflakeDbConnectionPool.SetMaxPoolSize(1); + SnowflakeDbConnectionPool.ClearAllPools(); + + var conn1 = new SnowflakeDbConnection(); + conn1.ConnectionString = ConnectionString; + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + conn1.Close(); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + + SnowflakeDbConnectionPool.SetPooling(false); + //Put a breakpoint at SFSession close function, after connection pool is off, it will send close session request. + } + + private void TestOnlyForOldPool() + { + if (_connectionPoolTypeUnderTest != ConnectionPoolType.SingleConnectionCache) + Assert.Ignore($"Test case relates only to {ConnectionPoolType.SingleConnectionCache} pool type"); + } + + private void TestOnlyForNewPool() + { + if (_connectionPoolTypeUnderTest != ConnectionPoolType.MultipleConnectionPool) + Assert.Ignore($"Test case relates only to {ConnectionPoolType.MultipleConnectionPool} pool type"); + } + } +} diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs deleted file mode 100644 index 7ea58702d..000000000 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs +++ /dev/null @@ -1,775 +0,0 @@ -/* - * Copyright (c) 2012-2021 Snowflake Computing Inc. All rights reserved. - */ - -namespace Snowflake.Data.Tests.IntegrationTests -{ - using NUnit.Framework; - using Snowflake.Data.Client; - using System.Data; - using System; - using Snowflake.Data.Core; - using System.Threading.Tasks; - using System.Threading; - using Snowflake.Data.Log; - using Snowflake.Data.Tests.Mock; - using System.Data.Common; - using Moq; - - class PoolConfig { - private readonly bool _pooling; - private readonly long _timeout; - private readonly int _maxPoolSize; - - public PoolConfig() - { - _maxPoolSize = SnowflakeDbConnectionPool.GetMaxPoolSize(); - _timeout = SnowflakeDbConnectionPool.GetTimeout(); - _pooling = SnowflakeDbConnectionPool.GetPooling(); - } - - public void Reset() - { - SnowflakeDbConnectionPool.SetMaxPoolSize(_maxPoolSize); - SnowflakeDbConnectionPool.SetTimeout(_timeout); - SnowflakeDbConnectionPool.SetPooling(_pooling); - } - } - - [TestFixture, NonParallelizable] - class SFConnectionPoolT : SFBaseTest - { - private static readonly PoolConfig s_previousPoolConfig = new PoolConfig(); - - [SetUp] - public void BeforeTest() - { - s_previousPoolConfig.Reset(); - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.ClearAllPools(); - } - - [TearDown] - public void AfterTest() - { - s_previousPoolConfig.Reset(); - } - - [OneTimeTearDown] - public static void AfterAllTests() - { - SnowflakeDbConnectionPool.ClearAllPools(); - } - - [Test] - // test connection pooling with concurrent connection - public void TestConcurrentConnectionPooling() - { - // add test case name in connection string to make in unique for each test case - string connStr = ConnectionString + ";application=TestConcurrentConnectionPooling"; - ConcurrentPoolingHelper(connStr, true); - } - - [Test] - // test connection pooling with concurrent connection and no close - // call for connection. Connection is closed when Dispose() is called - // by framework. - public void TestConcurrentConnectionPoolingDispose() - { - // add test case name in connection string to make in unique for each test case - string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingNoClose"; - ConcurrentPoolingHelper(connStr, false); - } - - static void ConcurrentPoolingHelper(string connectionString, bool closeConnection) - { - // thread number a bit larger than pool size so some connections - // would fail on pooling while some connections could success - const int threadNum = 12; - // set short pooling timeout to cover the case that connection expired - const int poolTimeout = 3; - - // reset to default settings in case it changed by other test cases - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.SetMaxPoolSize(10); - SnowflakeDbConnectionPool.ClearAllPools(); - SnowflakeDbConnectionPool.SetTimeout(poolTimeout); - - var threads = new Task[threadNum]; - for (int i = 0; i < threadNum; i++) - { - threads[i] = Task.Factory.StartNew(() => - { - QueryExecutionThread(connectionString, closeConnection); - }); - } - Task.WaitAll(threads); - // set pooling timeout back to default to avoid impact on other test cases - SnowflakeDbConnectionPool.SetTimeout(3600); - } - - // thead to execute query with new connection in a loop - static void QueryExecutionThread(string connectionString, bool closeConnection) - { - for (int i = 0; i < 100; i++) - { - using (DbConnection conn = new SnowflakeDbConnection(connectionString)) - { - conn.Open(); - using (DbCommand cmd = conn.CreateCommand()) - { - cmd.CommandText = "select 1, 2, 3"; - try - { - using (var reader = cmd.ExecuteReader()) - { - while (reader.Read()) - { - for (int j = 0; j < reader.FieldCount; j++) - { - // Process each column as appropriate - object obj = reader.GetFieldValue(j); - } - } - } - } - catch (Exception e) - { - Assert.Fail("Caught unexpected exception: " + e); - } - } - - if (closeConnection) - { - conn.Close(); - } - }; - } - } - - [Test] - public void TestBasicConnectionPool() - { - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.SetMaxPoolSize(1); - SnowflakeDbConnectionPool.ClearAllPools(); - - var conn1 = new SnowflakeDbConnection(ConnectionString); - conn1.Open(); - Assert.AreEqual(ConnectionState.Open, conn1.State); - conn1.Close(); - - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - } - - [Test] - public void TestConnectionPool() - { - SnowflakeDbConnectionPool.ClearAllPools(); - var conn1 = new SnowflakeDbConnection(ConnectionString); - conn1.Open(); - Assert.AreEqual(ConnectionState.Open, conn1.State); - conn1.Close(); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - - var conn2 = new SnowflakeDbConnection(); - conn2.ConnectionString = ConnectionString; - conn2.Open(); - Assert.AreEqual(ConnectionState.Open, conn2.State); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - - conn2.Close(); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(ConnectionState.Closed, conn2.State); - SnowflakeDbConnectionPool.ClearAllPools(); - } - - [Test] - public void TestConnectionPoolIsFull() - { - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.ClearAllPools(); - SnowflakeDbConnectionPool.SetMaxPoolSize(2); - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ConnectionString; - conn1.Open(); - Assert.AreEqual(ConnectionState.Open, conn1.State); - - var conn2 = new SnowflakeDbConnection(); - conn2.ConnectionString = ConnectionString + " retryCount=1"; - conn2.Open(); - Assert.AreEqual(ConnectionState.Open, conn2.State); - - var conn3 = new SnowflakeDbConnection(); - conn3.ConnectionString = ConnectionString + " retryCount=2"; - conn3.Open(); - Assert.AreEqual(ConnectionState.Open, conn3.State); - SnowflakeDbConnectionPool.ClearAllPools(); - - conn1.Close(); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - conn2.Close(); - Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - conn3.Close(); - Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(ConnectionState.Closed, conn2.State); - Assert.AreEqual(ConnectionState.Closed, conn3.State); - SnowflakeDbConnectionPool.ClearAllPools(); - } - - [Test] - public void TestConnectionPoolExpirationWorks() - { - System.Threading.Thread.Sleep(10000); // wait for 10 seconds, in case other test still running. - SnowflakeDbConnectionPool.ClearAllPools(); - SnowflakeDbConnectionPool.SetMaxPoolSize(2); - SnowflakeDbConnectionPool.SetTimeout(10); - SnowflakeDbConnectionPool.SetPooling(true); - - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ConnectionString; - - conn1.Open(); - conn1.Close(); - SnowflakeDbConnectionPool.SetTimeout(-1); - - var conn2 = new SnowflakeDbConnection(); - conn2.ConnectionString = ConnectionString; - conn2.Open(); - conn2.Close(); - var conn3 = new SnowflakeDbConnection(); - conn3.ConnectionString = ConnectionString; - conn3.Open(); - conn3.Close(); - - // The pooling timeout should apply to all connections being pooled, - // not just the connections created after the new setting, - // so expected result should be 0 - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - SnowflakeDbConnectionPool.SetPooling(false); - } - - [Test] - public void TestConnectionPoolClean() - { - SnowflakeDbConnectionPool.ClearAllPools(); - SnowflakeDbConnectionPool.SetMaxPoolSize(2); - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ConnectionString; - conn1.Open(); - Assert.AreEqual(ConnectionState.Open, conn1.State); - - var conn2 = new SnowflakeDbConnection(); - conn2.ConnectionString = ConnectionString + " retryCount=1"; - conn2.Open(); - Assert.AreEqual(ConnectionState.Open, conn2.State); - - var conn3 = new SnowflakeDbConnection(); - conn3.ConnectionString = ConnectionString + " retryCount=2"; - conn3.Open(); - Assert.AreEqual(ConnectionState.Open, conn3.State); - - conn1.Close(); - conn2.Close(); - Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - SnowflakeDbConnectionPool.ClearAllPools(); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - conn3.Close(); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(ConnectionState.Closed, conn2.State); - Assert.AreEqual(ConnectionState.Closed, conn3.State); - SnowflakeDbConnectionPool.ClearAllPools(); - } - - [Test] - public void TestConnectionPoolFull() - { - SnowflakeDbConnectionPool.ClearAllPools(); - SnowflakeDbConnectionPool.SetMaxPoolSize(2); - SnowflakeDbConnectionPool.SetPooling(true); - - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ConnectionString; - conn1.Open(); - Assert.AreEqual(ConnectionState.Open, conn1.State); - - var conn2 = new SnowflakeDbConnection(); - conn2.ConnectionString = ConnectionString + " retryCount=1"; - conn2.Open(); - Assert.AreEqual(ConnectionState.Open, conn2.State); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - conn1.Close(); - conn2.Close(); - Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - var conn3 = new SnowflakeDbConnection(); - conn3.ConnectionString = ConnectionString + " retryCount=2"; - conn3.Open(); - Assert.AreEqual(ConnectionState.Open, conn3.State); - - var conn4 = new SnowflakeDbConnection(); - conn4.ConnectionString = ConnectionString + " retryCount=3"; - conn4.Open(); - Assert.AreEqual(ConnectionState.Open, conn4.State); - - conn3.Close(); - Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - conn4.Close(); - Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(ConnectionState.Closed, conn2.State); - Assert.AreEqual(ConnectionState.Closed, conn3.State); - Assert.AreEqual(ConnectionState.Closed, conn4.State); - SnowflakeDbConnectionPool.ClearAllPools(); - } - - [Test] - public void TestConnectionPoolMultiThreading() - { - Thread t1 = new Thread(() => ThreadProcess1(ConnectionString)); - Thread t2 = new Thread(() => ThreadProcess2(ConnectionString)); - - t1.Start(); - t2.Start(); - - t1.Join(); - t2.Join(); - } - - void ThreadProcess1(string connstr) - { - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = connstr; - conn1.Open(); - Thread.Sleep(1000); - conn1.Close(); - Thread.Sleep(4000); - Assert.AreEqual(ConnectionState.Closed, conn1.State); - } - - void ThreadProcess2(string connstr) - { - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = connstr; - conn1.Open(); - - Thread.Sleep(5000); - SFStatement statement = new SFStatement(conn1.SfSession); - SFBaseResultSet resultSet = statement.Execute(0, "select 1", null, false); - Assert.AreEqual(true, resultSet.Next()); - Assert.AreEqual("1", resultSet.GetString(0)); - SnowflakeDbConnectionPool.ClearAllPools(); - SnowflakeDbConnectionPool.SetMaxPoolSize(0); - SnowflakeDbConnectionPool.SetPooling(false); - } - - [Test] - public void TestConnectionPoolDisable() - { - SnowflakeDbConnectionPool.ClearAllPools(); - SnowflakeDbConnectionPool.SetPooling(false); - - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ConnectionString; - conn1.Open(); - Assert.AreEqual(ConnectionState.Open, conn1.State); - conn1.Close(); - - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - } - - [Test] - public void TestConnectionPoolWithDispose() - { - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.SetMaxPoolSize(1); - SnowflakeDbConnectionPool.ClearAllPools(); - - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ""; - try - { - conn1.Open(); - } - catch (SnowflakeDbException ex) - { - Console.WriteLine($"connection failed:" + ex); - conn1.Close(); - } - - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - } - - [Test] - public void TestConnectionPoolTurnOff() - { - SnowflakeDbConnectionPool.SetPooling(false); - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.SetMaxPoolSize(1); - SnowflakeDbConnectionPool.ClearAllPools(); - - var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ConnectionString; - conn1.Open(); - Assert.AreEqual(ConnectionState.Open, conn1.State); - conn1.Close(); - - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - - SnowflakeDbConnectionPool.SetPooling(false); - //Put a breakpoint at SFSession close function, after connection pool is off, it will send close session request. - } - } - - [TestFixture, NonParallelizable] - class SFConnectionPoolITAsync : SFBaseTestAsync - { - private static SFLogger logger = SFLoggerFactory.GetLogger(); - private static readonly PoolConfig s_previousPoolConfigRestorer = new PoolConfig(); - - [SetUp] - public void BeforeTest() - { - s_previousPoolConfigRestorer.Reset(); - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.ClearAllPools(); - } - - [TearDown] - public void AfterTest() - { - s_previousPoolConfigRestorer.Reset(); - } - - [OneTimeTearDown] - public static void AfterAllTests() - { - SnowflakeDbConnectionPool.ClearAllPools(); - } - - [Test] - public void TestConnectionPoolWithAsync() - { - using (var conn = new MockSnowflakeDbConnection()) - { - SnowflakeDbConnectionPool.SetMaxPoolSize(1); - SnowflakeDbConnectionPool.ClearAllPools(); - - int timeoutSec = 0; - string infiniteLoginTimeOut = String.Format("" + ";connection_timeout={0}", - timeoutSec); - - conn.ConnectionString = infiniteLoginTimeOut; - - Assert.AreEqual(conn.State, ConnectionState.Closed); - - CancellationTokenSource connectionCancelToken = new CancellationTokenSource(); - try - { - Task connectTask = conn.OpenAsync(connectionCancelToken.Token); - } - catch (SnowflakeDbException ex) - { - Console.WriteLine($"connection failed:" + ex); - conn.CloseAsync(connectionCancelToken.Token); - } - - Thread.Sleep(10 * 1000); - Assert.AreEqual(ConnectionState.Closed, conn.State); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - } - } - - [Test] - public void TestConnectionPoolWithInvalidOpenAsync() - { - SnowflakeDbConnectionPool.SetMaxPoolSize(10); - // make the connection string unique so it won't pick up connection - // pooled by other test cases. - string connStr = ConnectionString + ";application=conn_pool_test_invalid_openasync"; - using (var connection = new SnowflakeDbConnection()) - { - connection.ConnectionString = connStr; - // call openAsync but do not wait and destroy it direct - // so the session is initialized with empty token - connection.OpenAsync(); - } - - // use the same connection string to make a new connection - // to ensure the invalid connection made previously is not pooled - using (var connection1 = new SnowflakeDbConnection()) - { - connection1.ConnectionString = connStr; - // this will not open a new session but get the invalid connection from pool - connection1.Open(); - // Now run query with connection1 - var command = connection1.CreateCommand(); - command.CommandText = "select 1, 2, 3"; - - try - { - using (var reader = command.ExecuteReader()) - { - while (reader.Read()) - { - for (int i = 0; i < reader.FieldCount; i++) - { - // Process each column as appropriate - object obj = reader.GetFieldValue(i); - } - } - } - } - catch (SnowflakeDbException ex) - { - // fail the test case if anything wrong. - Assert.Fail(); - } - } - } - - [Test] - // test connection pooling with concurrent connection using async calls - public void TestConcurrentConnectionPoolingAsync() - { - // add test case name in connection string to make in unique for each test case - string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingAsync"; - ConcurrentPoolingAsyncHelper(connStr, true); - } - - [Test] - public void TestRollbackTransactionOnPooledWhenExceptionOccurred() - { - SnowflakeDbConnectionPool.SetMaxPoolSize(1); - - object firstOpenedSessionId = null; - using (var connection = new SnowflakeDbConnection()) - { - connection.ConnectionString = ConnectionString; - connection.Open(); - firstOpenedSessionId = connection.SfSession.sessionId; - connection.BeginTransaction(); - Assert.AreEqual(true, connection.HasActiveExplicitTransaction()); - Assert.Throws(() => - { - using (var command = connection.CreateCommand()) - { - command.CommandText = "invalid command will throw exception and leave session with an unfinished transaction"; - command.ExecuteNonQuery(); - } - }); - } - - using (var connectionWithSessionReused = new SnowflakeDbConnection()) - { - connectionWithSessionReused.ConnectionString = ConnectionString; - connectionWithSessionReused.Open(); - - Assert.AreEqual(firstOpenedSessionId, connectionWithSessionReused.SfSession.sessionId); - Assert.AreEqual(false, connectionWithSessionReused.HasActiveExplicitTransaction()); - using (var cmd = connectionWithSessionReused.CreateCommand()) - { - cmd.CommandText = "SELECT CURRENT_TRANSACTION()"; - Assert.AreEqual(DBNull.Value, cmd.ExecuteScalar()); - } - } - - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be reused and any pending transaction rolled back before it gets back to the pool"); - } - - [Test] - public void TestTransactionStatusNotTrackedForNonExplicitTransactionCalls() - { - SnowflakeDbConnectionPool.SetMaxPoolSize(1); - using (var connection = new SnowflakeDbConnection()) - { - connection.ConnectionString = ConnectionString; - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = "BEGIN"; // in general can be put as a part of a multi statement call and mixed with commit as well - command.ExecuteNonQuery(); - Assert.AreEqual(false, connection.HasActiveExplicitTransaction()); - } - } - } - - [Test] - public void TestRollbackTransactionOnPooledWhenConnectionClose() - { - SnowflakeDbConnectionPool.SetMaxPoolSize(1); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool"); - - string firstOpenedSessionId = null; - using (var connection1 = new SnowflakeDbConnection()) - { - connection1.ConnectionString = ConnectionString; - connection1.Open(); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection session is added to the pool after close connection"); - connection1.BeginTransaction(); - Assert.AreEqual(true, connection1.HasActiveExplicitTransaction()); - using (var command = connection1.CreateCommand()) - { - firstOpenedSessionId = connection1.SfSession.sessionId; - command.CommandText = "SELECT CURRENT_TRANSACTION()"; - Assert.AreNotEqual(DBNull.Value, command.ExecuteScalar()); - } - } - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool"); - - using (var connection2 = new SnowflakeDbConnection()) - { - connection2.ConnectionString = ConnectionString; - connection2.Open(); - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection session should be now removed from the pool"); - Assert.AreEqual(false, connection2.HasActiveExplicitTransaction()); - using (var command = connection2.CreateCommand()) - { - Assert.AreEqual(firstOpenedSessionId, connection2.SfSession.sessionId); - command.CommandText = "SELECT CURRENT_TRANSACTION()"; - Assert.AreEqual(DBNull.Value, command.ExecuteScalar()); - } - } - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool"); - } - - [Test] - public void TestFailureOfTransactionRollbackOnConnectionClosePreventsAddingToPool() - { - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.SetMaxPoolSize(10); - var commandThrowingExceptionOnlyForRollback = new Mock(); - commandThrowingExceptionOnlyForRollback.CallBase = true; - commandThrowingExceptionOnlyForRollback.SetupSet(it => it.CommandText = "ROLLBACK") - .Throws(new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unexpected failure on transaction rollback when connection is returned to the pool with pending transaction")); - var mockDbProviderFactory = new Mock(); - mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object); - - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - using (var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object)) - { - connection.ConnectionString = ConnectionString; - connection.Open(); - connection.BeginTransaction(); - Assert.AreEqual(true, connection.HasActiveExplicitTransaction()); - // no Rollback or Commit; during internal Rollback while closing a connection a mocked exception will be thrown - } - - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Should not return connection to the pool"); - } - - [Test] - // test connection pooling with concurrent connection and using async calls no close - // call for connection. Connection is closed when Dispose() is called - // by framework. - public void TestConcurrentConnectionPoolingDisposeAsync() - { - // add test case name in connection string to make in unique for each test case - string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingDisposeAsync"; - ConcurrentPoolingAsyncHelper(connStr, false); - } - - static void ConcurrentPoolingAsyncHelper(string connectionString, bool closeConnection) - { - // task number a bit larger than pool size so some connections - // would fail on pooling while some connections could success - const int taskNum = 12; - // set short pooling timeout to cover the case that connection expired - const int poolTimeout = 3; - - // reset to default settings in case it changed by other test cases - SnowflakeDbConnectionPool.SetPooling(true); - SnowflakeDbConnectionPool.SetMaxPoolSize(10); - SnowflakeDbConnectionPool.ClearAllPools(); - SnowflakeDbConnectionPool.SetTimeout(poolTimeout); - - var tasks = new Task[taskNum + 1]; - for (int i = 0; i < taskNum; i++) - { - tasks[i] = QueryExecutionTaskAsync(connectionString, closeConnection); - } - // cover the case of invalid sessions to ensure that won't - // break connection pooling - tasks[taskNum] = InvalidConnectionTaskAsync(connectionString); - Task.WaitAll(tasks); - - // set pooling timeout back to default to avoid impact on other test cases - SnowflakeDbConnectionPool.SetTimeout(3600); - } - - // task to execute query with new connection in a loop - static async Task QueryExecutionTaskAsync(string connectionString, bool closeConnection) - { - for (int i = 0; i < 100; i++) - { - using (var conn = new SnowflakeDbConnection(connectionString)) - { - await conn.OpenAsync(); - using (DbCommand cmd = conn.CreateCommand()) - { - cmd.CommandText = "select 1, 2, 3"; - try - { - using (DbDataReader reader = await cmd.ExecuteReaderAsync()) - { - while (await reader.ReadAsync()) - { - for (int j = 0; j < reader.FieldCount; j++) - { - // Process each column as appropriate - object obj = await reader.GetFieldValueAsync(j); - } - } - } - } - catch (Exception e) - { - Assert.Fail("Caught unexpected exception: " + e); - } - } - - if (closeConnection) - { - await conn.CloseAsync(new CancellationTokenSource().Token); - } - }; - } - } - - // task to generate invalid(not finish open) connections in a loop - static async Task InvalidConnectionTaskAsync(string connectionString) - { - for (int i = 0; i < 100; i++) - { - using (var conn = new SnowflakeDbConnection(connectionString)) - { - // intentially not using await so the connection - // will be disposed with invalid underlying session - conn.OpenAsync(); - }; - // wait 100ms each time so the invalid sessions are generated - // roughly at the same speed as connections for query tasks - await Task.Delay(100); - } - } - - private class TestSnowflakeDbConnection : SnowflakeDbConnection - { - public TestSnowflakeDbConnection(DbProviderFactory dbProviderFactory) - { - DbProviderFactory = dbProviderFactory; - } - - protected override DbProviderFactory DbProviderFactory { get; } - } - } -} diff --git a/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs b/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs new file mode 100644 index 000000000..4238b04c5 --- /dev/null +++ b/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2023 Snowflake Computing Inc. All rights reserved. + */ + +using System; +using System.Security; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using Snowflake.Data.Core; +using Snowflake.Data.Core.Session; +using Moq; + +namespace Snowflake.Data.Tests.UnitTests +{ + [TestFixture, NonParallelizable] + class ConnectionPoolManagerTest + { + private readonly ConnectionPoolManager _connectionPoolManager = new ConnectionPoolManager(); + private readonly string _connectionString1 = "database=D1;warehouse=W1;account=A1;user=U1;password=P1;role=R1;"; + private readonly string _connectionString2 = "database=D2;warehouse=W2;account=A2;user=U2;password=P2;role=R2;"; + private readonly SecureString _password = new SecureString(); + + [OneTimeSetUp] + public static void BeforeAllTests() + { + // SnowflakeDbConnectionPool.SwapVersion(); // TODO: swap when new version is the default + SessionPool.SessionFactory = new MockSessionFactory(); + } + + [OneTimeTearDown] + public void AfterAllTests() + { + SessionPool.SessionFactory = new SessionFactory(); + } + + [Test] + public void TestPoolManagerReturnsSessionPoolForGivenConnectionString() + { + // Act + var sessionPool = _connectionPoolManager.GetPool(_connectionString1, _password); + + // Assert + Assert.AreEqual(_connectionString1, sessionPool.ConnectionString); + Assert.AreEqual(_password, sessionPool.Password); + } + + [Test] + public void TestPoolManagerReturnsSamePoolForGivenConnectionString() + { + // Arrange + var anotherConnectionString = _connectionString1; + + // Act + var sessionPool1 = _connectionPoolManager.GetPool(_connectionString1, _password); + var sessionPool2 = _connectionPoolManager.GetPool(anotherConnectionString, _password); + + // Assert + Assert.AreEqual(sessionPool1, sessionPool2); + } + + [Test] + public void TestDifferentPoolsAreReturnedForDifferentConnectionStrings() + { + // Arrange + Assert.AreNotSame(_connectionString1, _connectionString2); + + // Act + var sessionPool1 = _connectionPoolManager.GetPool(_connectionString1, _password); + var sessionPool2 = _connectionPoolManager.GetPool(_connectionString2, _password); + + // Assert + Assert.AreNotSame(sessionPool1, sessionPool2); + Assert.AreEqual(_connectionString1, sessionPool1.ConnectionString); + Assert.AreEqual(_connectionString2, sessionPool2.ConnectionString); + } + + + [Test] + public void TestGetSessionWorksForSpecifiedConnectionString() + { + // Act + var sfSession = _connectionPoolManager.GetSession(_connectionString1, _password); + + // Assert + Assert.AreEqual(_connectionString1, sfSession.ConnectionString); + Assert.AreEqual(_password, sfSession.Password); + } + + [Test] + public async Task TestGetSessionAsyncWorksForSpecifiedConnectionString() + { + // Act + var sfSession = await _connectionPoolManager.GetSessionAsync(_connectionString1, _password, CancellationToken.None); + + // Assert + Assert.AreEqual(_connectionString1, sfSession.ConnectionString); + Assert.AreEqual(_password, sfSession.Password); + } + + [Test] + [Ignore("Enable after completion of SNOW-937189")] // TODO: + public void TestCountingOfSessionProvidedByPool() + { + // Act + _connectionPoolManager.GetSession(_connectionString1, _password); + + // Assert + var sessionPool = _connectionPoolManager.GetPool(_connectionString1, _password); + Assert.AreEqual(1, sessionPool.GetCurrentPoolSize()); + } + + [Test] + [Ignore("Enable after completion of SNOW-937189")] // TODO: + public void TestCountingOfSessionReturnedBackToPool() + { + // Arrange + var sfSession = _connectionPoolManager.GetSession(_connectionString1, _password); + + // Act + _connectionPoolManager.AddSession(sfSession); + + // Assert + var sessionPool = _connectionPoolManager.GetPool(_connectionString1, _password); + Assert.AreEqual(1, sessionPool.GetCurrentPoolSize()); + } + + [Test] + public void TestSetMaxPoolSizeForAllPools() + { + // Arrange + var sessionPool1 = _connectionPoolManager.GetPool(_connectionString1, _password); + var sessionPool2 = _connectionPoolManager.GetPool(_connectionString2, _password); + + // Act + _connectionPoolManager.SetMaxPoolSize(3); + + // Assert + Assert.AreEqual(3, sessionPool1.GetMaxPoolSize()); + Assert.AreEqual(3, sessionPool2.GetMaxPoolSize()); + } + + [Test] + public void TestSetTimeoutForAllPools() + { + // Arrange + var sessionPool1 = _connectionPoolManager.GetPool(_connectionString1, _password); + var sessionPool2 = _connectionPoolManager.GetPool(_connectionString2, _password); + + // Act + _connectionPoolManager.SetTimeout(3000); + + // Assert + Assert.AreEqual(3000, sessionPool1.GetTimeout()); + Assert.AreEqual(3000, sessionPool2.GetTimeout()); + } + + [Test] + public void TestSetPoolingDisabledForAllPools() + { + // Arrange + var sessionPool1 = _connectionPoolManager.GetPool(_connectionString1, _password); + + // Act + _connectionPoolManager.SetPooling(false); + + // Assert + Assert.AreEqual(false, sessionPool1.GetPooling()); + } + + [Test] + public void TestSetPoolingEnabledBack() + { + // Arrange + var sessionPool1 = _connectionPoolManager.GetPool(_connectionString1, _password); + _connectionPoolManager.SetPooling(false); + + // Act + _connectionPoolManager.SetPooling(true); + + // Assert + Assert.AreEqual(true, sessionPool1.GetPooling()); + } + + [Test] + public void TestGetPoolingOnManagerLevelNotSupported() + { + Assert.Throws(() => _connectionPoolManager.GetPooling()); + } + + [Test] + public void TestGetTimeoutOnManagerLevelNotSupported() + { + Assert.Throws(() => _connectionPoolManager.GetTimeout()); + } + + [Test] + public void TestGetMaxPoolSizeOnManagerLevelNotSupported() + { + Assert.Throws(() => _connectionPoolManager.GetMaxPoolSize()); + } + } + + class MockSessionFactory : ISessionFactory + { + public SFSession NewSession(string connectionString, SecureString password) + { + var mockSfSession = new Mock(connectionString, password); + mockSfSession.Setup(x => x.Open()).Verifiable(); + mockSfSession.Setup(x => x.OpenAsync(default)).Returns(Task.FromResult(this)); + return mockSfSession.Object; + } + } + +} diff --git a/Snowflake.Data.Tests/Util/PoolConfig.cs b/Snowflake.Data.Tests/Util/PoolConfig.cs new file mode 100644 index 000000000..b4fa6bc55 --- /dev/null +++ b/Snowflake.Data.Tests/Util/PoolConfig.cs @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2012-2021 Snowflake Computing Inc. All rights reserved. + */ + +using Snowflake.Data.Client; +using Snowflake.Data.Core.Session; + +namespace Snowflake.Data.Tests.Util +{ + class PoolConfig + { + private readonly bool _pooling; + private readonly long _timeout; + private readonly int _maxPoolSize; + private readonly ConnectionPoolType _connectionPoolType; + + public PoolConfig() + { + _maxPoolSize = SnowflakeDbConnectionPool.GetMaxPoolSize(); + _timeout = SnowflakeDbConnectionPool.GetTimeout(); + _pooling = SnowflakeDbConnectionPool.GetPooling(); + _connectionPoolType = SnowflakeDbConnectionPool.GetConnectionPoolVersion(); + } + + public void Reset() + { + SnowflakeDbConnectionPool.SetMaxPoolSize(_maxPoolSize); + SnowflakeDbConnectionPool.SetTimeout(_timeout); + SnowflakeDbConnectionPool.SetPooling(_pooling); + SnowflakeDbConnectionPool.SetConnectionPoolVersion(_connectionPoolType); + } + } +} diff --git a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs index 81cb3528a..5c5df3476 100644 --- a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs @@ -1,4 +1,9 @@ -using System.Security; +/* + * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. + */ + +using System; +using System.Security; using System.Threading; using System.Threading.Tasks; using Snowflake.Data.Core; @@ -10,63 +15,127 @@ namespace Snowflake.Data.Client public class SnowflakeDbConnectionPool { private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); + private static readonly Object s_connectionManagerInstanceLock = new Object(); + private static IConnectionManager s_connectionManager; + private static readonly ConnectionPoolType s_defaultConnectionPoolType = ConnectionPoolType.SingleConnectionCache; // TODO: set to public once development of entire ConnectionPoolManager epic is complete - internal static SFSession GetSession(string connStr, SecureString password) + private static IConnectionManager ConnectionManager + { + get + { + if (s_connectionManager != null) + return s_connectionManager; + SetConnectionPoolVersion(s_defaultConnectionPoolType); + return s_connectionManager; + } + } + + internal static SFSession GetSession(string connectionString, SecureString password) { - s_logger.Debug("SnowflakeDbConnectionPool::GetSession"); - return SessionPoolSingleton.Instance.GetSession(connStr, password); + s_logger.Debug($"SnowflakeDbConnectionPool::GetSession for {connectionString}"); + return ConnectionManager.GetSession(connectionString, password); } - internal static Task GetSessionAsync(string connStr, SecureString password, CancellationToken cancellationToken) + internal static Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) { - return SessionPoolSingleton.Instance.GetSessionAsync(connStr, password, cancellationToken); + s_logger.Debug($"SnowflakeDbConnectionPool::GetSessionAsync for {connectionString}"); + return ConnectionManager.GetSessionAsync(connectionString, password, cancellationToken); + } + + internal static SessionPool GetPool(string connectionString) + { + s_logger.Debug($"SnowflakeDbConnectionPool::GetPool"); + return ConnectionManager.GetPool(connectionString); } internal static bool AddSession(SFSession session) { s_logger.Debug("SnowflakeDbConnectionPool::AddSession"); - return SessionPoolSingleton.Instance.AddSession(session); + return ConnectionManager.AddSession(session); } public static void ClearAllPools() { s_logger.Debug("SnowflakeDbConnectionPool::ClearAllPools"); - SessionPoolSingleton.Instance.ClearAllPools(); + ConnectionManager.ClearAllPools(); } - public static void SetMaxPoolSize(int size) + public static void SetMaxPoolSize(int maxPoolSize) { - SessionPoolSingleton.Instance.SetMaxPoolSize(size); + s_logger.Debug("SnowflakeDbConnectionPool::SetMaxPoolSize"); + ConnectionManager.SetMaxPoolSize(maxPoolSize); } public static int GetMaxPoolSize() { - return SessionPoolSingleton.Instance.GetMaxPoolSize(); + s_logger.Debug("SnowflakeDbConnectionPool::GetMaxPoolSize"); + return ConnectionManager.GetMaxPoolSize(); } - public static void SetTimeout(long time) + public static void SetTimeout(long connectionTimeout) { - SessionPoolSingleton.Instance.SetTimeout(time); + s_logger.Debug("SnowflakeDbConnectionPool::SetTimeout"); + ConnectionManager.SetTimeout(connectionTimeout); } public static long GetTimeout() { - return SessionPoolSingleton.Instance.GetTimeout(); + s_logger.Debug("SnowflakeDbConnectionPool::GetTimeout"); + return ConnectionManager.GetTimeout(); } public static int GetCurrentPoolSize() { - return SessionPoolSingleton.Instance.GetCurrentPoolSize(); + s_logger.Debug("SnowflakeDbConnectionPool::GetCurrentPoolSize"); + return ConnectionManager.GetCurrentPoolSize(); } public static bool SetPooling(bool isEnable) { - return SessionPoolSingleton.Instance.SetPooling(isEnable); + s_logger.Debug("SnowflakeDbConnectionPool::SetPooling"); + return ConnectionManager.SetPooling(isEnable); } public static bool GetPooling() { - return SessionPoolSingleton.Instance.GetPooling(); + s_logger.Debug("SnowflakeDbConnectionPool::GetPooling"); + return ConnectionManager.GetPooling(); + } + + internal static void SetOldConnectionPoolVersion() // TODO: set to public once development of entire ConnectionPoolManager epic is complete + { + SetConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); + } + + internal static void SetConnectionPoolVersion(ConnectionPoolType requestedPoolType) + { + lock (s_connectionManagerInstanceLock) + { + s_connectionManager?.ClearAllPools(); + if (ConnectionPoolType.MultipleConnectionPool.Equals(requestedPoolType)) + { + s_connectionManager = new ConnectionPoolManager(); + s_logger.Info("SnowflakeDbConnectionPool - multiple connection pools enabled"); + } + if (ConnectionPoolType.SingleConnectionCache.Equals(requestedPoolType)) + { + s_connectionManager = new ConnectionManagerV1(); + s_logger.Warn("SnowflakeDbConnectionPool - connection cache enabled"); + } + } + } + + internal static ConnectionPoolType GetConnectionPoolVersion() + { + if (ConnectionManager != null) + { + switch (ConnectionManager) + { + case ConnectionManagerV1 _: return ConnectionPoolType.SingleConnectionCache; + case ConnectionPoolManager _: return ConnectionPoolType.MultipleConnectionPool; + } + } + return s_defaultConnectionPoolType; } } } diff --git a/Snowflake.Data/Core/Session/ConnectionManagerV1.cs b/Snowflake.Data/Core/Session/ConnectionManagerV1.cs new file mode 100644 index 000000000..f06caea6f --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionManagerV1.cs @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2012-2021 Snowflake Computing Inc. All rights reserved. + */ + +using System.Security; +using System.Threading; +using System.Threading.Tasks; + +namespace Snowflake.Data.Core.Session +{ + internal sealed class ConnectionManagerV1 : IConnectionManager + { + private readonly SessionPool _sessionPool = SessionPool.CreateSessionCache(); + public SFSession GetSession(string connectionString, SecureString password) => _sessionPool.GetSession(connectionString, password); + public Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + => _sessionPool.GetSessionAsync(connectionString, password, cancellationToken); + public bool AddSession(SFSession session) => _sessionPool.AddSession(session); + public void ClearAllPools() => _sessionPool.ClearAllPools(); + public void SetMaxPoolSize(int maxPoolSize) => _sessionPool.SetMaxPoolSize(maxPoolSize); + public int GetMaxPoolSize() => _sessionPool.GetMaxPoolSize(); + public void SetTimeout(long connectionTimeout) => _sessionPool.SetTimeout(connectionTimeout); + public long GetTimeout() => _sessionPool.GetTimeout(); + public int GetCurrentPoolSize() => _sessionPool.GetCurrentPoolSize(); + public bool SetPooling(bool poolingEnabled) => _sessionPool.SetPooling(poolingEnabled); + public bool GetPooling() => _sessionPool.GetPooling(); + public SessionPool GetPool(string _) => _sessionPool; + } +} diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManager.cs b/Snowflake.Data/Core/Session/ConnectionPoolManager.cs new file mode 100644 index 000000000..6f4bd5063 --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionPoolManager.cs @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security; +using System.Threading; +using System.Threading.Tasks; +using Snowflake.Data.Client; +using Snowflake.Data.Log; + +namespace Snowflake.Data.Core.Session +{ + internal sealed class ConnectionPoolManager : IConnectionManager + { + private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); + private static readonly Object s_poolsLock = new Object(); + private readonly Dictionary _pools; + + internal ConnectionPoolManager() + { + lock (s_poolsLock) + { + _pools = new Dictionary(); + } + } + + public SFSession GetSession(string connectionString, SecureString password) + { + s_logger.Debug($"ConnectionPoolManager::GetSession for {connectionString}"); + return GetPool(connectionString, password).GetSession(); + } + + public Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + s_logger.Debug($"ConnectionPoolManager::GetSessionAsync for {connectionString}"); + return GetPool(connectionString, password).GetSessionAsync(cancellationToken); + } + + public bool AddSession(SFSession session) + { + s_logger.Debug($"ConnectionPoolManager::AddSession for {session.ConnectionString}"); + return GetPool(session.ConnectionString, session.Password).AddSession(session); + } + + public void ClearAllPools() + { + s_logger.Debug("ConnectionPoolManager::ClearAllPools"); + foreach (var sessionPool in _pools.Values) + { + sessionPool.ClearAllPools(); + } + } + + public void SetMaxPoolSize(int maxPoolSize) + { + s_logger.Debug("ConnectionPoolManager::SetMaxPoolSize for all pools"); + foreach (var pool in _pools.Values) + { + pool.SetMaxPoolSize(maxPoolSize); + } + } + + public int GetMaxPoolSize() + { + s_logger.Debug("ConnectionPoolManager::GetMaxPoolSize"); + var values = _pools.Values.Select(it => it.GetMaxPoolSize()).Distinct().ToList(); + return values.Count == 1 + ? values.First() + : throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Multiple pools have different Max Pool Size values"); + } + + public void SetTimeout(long connectionTimeout) + { + s_logger.Debug("ConnectionPoolManager::SetTimeout for all pools"); + foreach (var pool in _pools.Values) + { + pool.SetTimeout(connectionTimeout); + } + } + + public long GetTimeout() + { + s_logger.Debug("ConnectionPoolManager::GetTimeout"); + var values = _pools.Values.Select(it => it.GetTimeout()).Distinct().ToList(); + return values.Count == 1 + ? values.First() + : throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Multiple pools have different Timeout values"); + } + + public int GetCurrentPoolSize() => throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Multiple pools have different Current Pool Size values"); + + public bool SetPooling(bool poolingEnabled) + { + s_logger.Debug("ConnectionPoolManager::SetPooling for all pools"); + bool switched = true; + foreach (var pool in _pools.Values) + { + if (!pool.SetPooling(poolingEnabled)) + switched = false; + } + return switched; + } + + public bool GetPooling() + { + s_logger.Debug("ConnectionPoolManager::GetPooling"); + var values = _pools.Values.Select(it => it.GetPooling()).Distinct().ToList(); + return values.Count == 1 + ? values.First() + : throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Multiple pools have different Pooling values"); + } + + internal SessionPool GetPool(string connectionString, SecureString password) + { + s_logger.Debug($"ConnectionPoolManager::GetPool for {connectionString}"); + var poolKey = GetPoolKey(connectionString); + + if (_pools.TryGetValue(poolKey, out var item)) + return item; + lock (s_poolsLock) + { + if (_pools.TryGetValue(poolKey, out var poolCreatedWhileWaitingOnLock)) + return poolCreatedWhileWaitingOnLock; + s_logger.Info($"Creating pool for connections to: {connectionString}"); + var pool = SessionPool.CreateSessionPool(connectionString, password); + _pools.Add(poolKey, pool); + return pool; + } + } + + public SessionPool GetPool(string connectionString) + { + s_logger.Debug($"ConnectionPoolManager::GetPool for {connectionString}"); + return GetPool(connectionString, null); + } + + // TODO: SNOW-937188 + private string GetPoolKey(string connectionString) + { + return connectionString; + } + } +} diff --git a/Snowflake.Data/Core/Session/ConnectionPoolType.cs b/Snowflake.Data/Core/Session/ConnectionPoolType.cs new file mode 100644 index 000000000..5844878fc --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionPoolType.cs @@ -0,0 +1,8 @@ +namespace Snowflake.Data.Core.Session +{ + internal enum ConnectionPoolType + { + SingleConnectionCache, + MultipleConnectionPool + } +} diff --git a/Snowflake.Data/Core/Session/IConnectionManager.cs b/Snowflake.Data/Core/Session/IConnectionManager.cs new file mode 100644 index 000000000..c64699d54 --- /dev/null +++ b/Snowflake.Data/Core/Session/IConnectionManager.cs @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. + */ + +using System.Security; +using System.Threading; +using System.Threading.Tasks; + +namespace Snowflake.Data.Core.Session +{ + internal interface IConnectionManager + { + SFSession GetSession(string connectionString, SecureString password); + Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken); + bool AddSession(SFSession session); + void ClearAllPools(); + void SetMaxPoolSize(int maxPoolSize); + int GetMaxPoolSize(); + void SetTimeout(long connectionTimeout); + long GetTimeout(); + int GetCurrentPoolSize(); + bool SetPooling(bool poolingEnabled); + bool GetPooling(); + SessionPool GetPool(string connectionString); + } +} diff --git a/Snowflake.Data/Core/Session/ISessionFactory.cs b/Snowflake.Data/Core/Session/ISessionFactory.cs new file mode 100644 index 000000000..f9416de8d --- /dev/null +++ b/Snowflake.Data/Core/Session/ISessionFactory.cs @@ -0,0 +1,9 @@ +using System.Security; + +namespace Snowflake.Data.Core.Session +{ + internal interface ISessionFactory + { + SFSession NewSession(string connectionString, SecureString password); + } +} diff --git a/Snowflake.Data/Core/Session/SFSession.cs b/Snowflake.Data/Core/Session/SFSession.cs index ad9aa07cd..99b7e66de 100755 --- a/Snowflake.Data/Core/Session/SFSession.cs +++ b/Snowflake.Data/Core/Session/SFSession.cs @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. */ using System; @@ -15,7 +15,6 @@ using System.Threading.Tasks; using System.Net.Http; using System.Text.RegularExpressions; -using Snowflake.Data.Configuration; namespace Snowflake.Data.Core { @@ -69,7 +68,8 @@ public class SFSession private readonly EasyLoggingStarter _easyLoggingStarter = EasyLoggingStarter.Instance; private long _startTime = 0; - internal string connStr = null; + internal readonly string ConnectionString; + internal readonly SecureString Password; private QueryContextCache _queryContextCache = new QueryContextCache(_defaultQueryContextCacheSize); @@ -145,8 +145,9 @@ internal SFSession( EasyLoggingStarter easyLoggingStarter) { _easyLoggingStarter = easyLoggingStarter; - connStr = connectionString; - properties = SFSessionProperties.parseConnectionString(connectionString, password); + ConnectionString = connectionString; + Password = password; + properties = SFSessionProperties.parseConnectionString(ConnectionString, Password); _disableQueryContextCache = bool.Parse(properties[SFSessionProperty.DISABLEQUERYCONTEXTCACHE]); ValidateApplicationName(properties); try @@ -215,7 +216,7 @@ internal Uri BuildUri(string path, Dictionary queryParams = null return uriBuilder.Uri; } - internal void Open() + internal virtual void Open() { logger.Debug("Open Session"); @@ -227,7 +228,7 @@ internal void Open() authenticator.Authenticate(); } - internal async Task OpenAsync(CancellationToken cancellationToken) + internal virtual async Task OpenAsync(CancellationToken cancellationToken) { logger.Debug("Open Session Async"); diff --git a/Snowflake.Data/Core/Session/SessionFactory.cs b/Snowflake.Data/Core/Session/SessionFactory.cs new file mode 100644 index 000000000..2eb0ba6df --- /dev/null +++ b/Snowflake.Data/Core/Session/SessionFactory.cs @@ -0,0 +1,12 @@ +using System.Security; + +namespace Snowflake.Data.Core.Session +{ + internal class SessionFactory : ISessionFactory + { + public SFSession NewSession(string connectionString, SecureString password) + { + return new SFSession(connectionString, password); + } + } +} diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index 6f9dd58dd..a4e477b84 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. + */ + using System; using System.Collections.Generic; using System.Linq; @@ -9,51 +13,51 @@ namespace Snowflake.Data.Core.Session { - sealed class SessionPoolSingleton : IDisposable + sealed class SessionPool : IDisposable { - private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); - private static SessionPoolSingleton s_instance = null; + private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); private static readonly object s_sessionPoolLock = new object(); - - private readonly List _sessionPool; + private readonly List _idleSessions; private int _maxPoolSize; private long _timeout; private const int MaxPoolSize = 10; private const long Timeout = 3600; + internal string ConnectionString; + internal SecureString Password; private bool _pooling = true; + private bool _allowExceedMaxPoolSize = true; + internal static ISessionFactory SessionFactory = new SessionFactory(); - SessionPoolSingleton() + private SessionPool() { lock (s_sessionPoolLock) { - _sessionPool = new List(); + _idleSessions = new List(); _maxPoolSize = MaxPoolSize; _timeout = Timeout; } } - ~SessionPoolSingleton() + + private SessionPool(string connectionString, SecureString password) : this() { - ClearAllPools(); + ConnectionString = connectionString; + Password = password; + _allowExceedMaxPoolSize = false; // TODO: SNOW-937190 } - public void Dispose() + internal static SessionPool CreateSessionCache() => new SessionPool(); + + internal static SessionPool CreateSessionPool(string connectionString, SecureString password) => + new SessionPool(connectionString, password); + + ~SessionPool() { ClearAllPools(); } - public static SessionPoolSingleton Instance + public void Dispose() { - get - { - lock (s_sessionPoolLock) - { - if(s_instance == null) - { - s_instance = new SessionPoolSingleton(); - } - return s_instance; - } - } + ClearAllPools(); } private void CleanExpiredSessions() @@ -63,11 +67,11 @@ private void CleanExpiredSessions() { long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); - foreach (var item in _sessionPool.ToList()) + foreach (var item in _idleSessions.ToList()) { if (item.IsExpired(_timeout, timeNow)) { - _sessionPool.Remove(item); + _idleSessions.Remove(item); item.close(); } } @@ -92,17 +96,22 @@ internal Task GetSessionAsync(string connStr, SecureString password, return session != null ? Task.FromResult(session) : NewSessionAsync(connStr, password, cancellationToken); } + internal SFSession GetSession() => GetSession(ConnectionString, Password); + + internal Task GetSessionAsync(CancellationToken cancellationToken) => + GetSessionAsync(ConnectionString, Password, cancellationToken); + private SFSession GetIdleSession(string connStr) { s_logger.Debug("SessionPool::GetIdleSession"); lock (s_sessionPoolLock) { - for (int i = 0; i < _sessionPool.Count; i++) + for (int i = 0; i < _idleSessions.Count; i++) { - if (_sessionPool[i].connStr.Equals(connStr)) + if (_idleSessions[i].ConnectionString.Equals(connStr)) { - SFSession session = _sessionPool[i]; - _sessionPool.RemoveAt(i); + SFSession session = _idleSessions[i]; + _idleSessions.RemoveAt(i); long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); if (session.IsExpired(_timeout, timeNow)) { @@ -125,7 +134,7 @@ private SFSession NewSession(String connectionString, SecureString password) s_logger.Debug("SessionPool::NewSession"); try { - var session = new SFSession(connectionString, password); + var session = SessionFactory.NewSession(connectionString, password); session.Open(); return session; } @@ -145,7 +154,7 @@ private SFSession NewSession(String connectionString, SecureString password) private Task NewSessionAsync(String connectionString, SecureString password, CancellationToken cancellationToken) { s_logger.Debug("SessionPool::NewSessionAsync"); - var session = new SFSession(connectionString, password); + var session = SessionFactory.NewSession(connectionString, password); return session .OpenAsync(cancellationToken) .ContinueWith(previousTask => @@ -174,18 +183,18 @@ internal bool AddSession(SFSession session) lock (s_sessionPoolLock) { - if (_sessionPool.Count >= _maxPoolSize) + if (_idleSessions.Count >= _maxPoolSize) { CleanExpiredSessions(); } - if (_sessionPool.Count >= _maxPoolSize) + if (_idleSessions.Count >= _maxPoolSize) { - // pool is full + s_logger.Warn($"Pool is full - unable to add session with sid {session.sessionId}"); return false; } s_logger.Debug($"pool connection with sid {session.sessionId}"); - _sessionPool.Add(session); + _idleSessions.Add(session); return true; } } @@ -195,11 +204,11 @@ internal void ClearAllPools() s_logger.Debug("SessionPool::ClearAllPools"); lock (s_sessionPoolLock) { - foreach (SFSession session in _sessionPool) + foreach (SFSession session in _idleSessions) { session.close(); } - _sessionPool.Clear(); + _idleSessions.Clear(); } } @@ -225,7 +234,7 @@ public long GetTimeout() public int GetCurrentPoolSize() { - return _sessionPool.Count; + return _idleSessions.Count; } public bool SetPooling(bool isEnable) @@ -246,5 +255,4 @@ public bool GetPooling() return _pooling; } } - -} \ No newline at end of file +}