diff --git a/README.md b/README.md index 43845773a..7e748c8a5 100644 --- a/README.md +++ b/README.md @@ -667,6 +667,12 @@ CancellationTokenSource cancellationTokenSource = new CancellationTokenSource() ((SnowflakeDbConnection)conn).CloseAsync(cancellationTokenSource.Token); ``` +Evict the Connection +-------------------- + +For the open connection, call the `PreventPooling()` to mark the connection to be removed on close instead being still pooled. +The busy sessions counter will be decreased when the connection is closed. + Logging ------- The Snowflake Connector for .NET uses [log4net](http://logging.apache.org/log4net/) as the logging framework. diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs index 56814a99f..05e05ebbc 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs @@ -1,8 +1,12 @@ +using System.Data; +using System.Data.Common; using System.Threading; using System.Threading.Tasks; +using Moq; using NUnit.Framework; using Snowflake.Data.Client; using Snowflake.Data.Core.Session; +using Snowflake.Data.Tests.Mock; using Snowflake.Data.Tests.Util; namespace Snowflake.Data.Tests.IntegrationTests @@ -44,5 +48,46 @@ public async Task TestMinPoolSizeAsync() // cleanup await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false); } + + [Test] + public async Task TestPreventConnectionFromReturningToPool() + { + // arrange + var connectionString = ConnectionString + "minPoolSize=0"; + var connection = new SnowflakeDbConnection(connectionString); + await connection.OpenAsync().ConfigureAwait(false); + var pool = SnowflakeDbConnectionPool.GetPool(connectionString); + Assert.AreEqual(1, pool.GetCurrentPoolSize()); + + // act + connection.PreventPooling(); + await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false); + + // assert + Assert.AreEqual(0, pool.GetCurrentPoolSize()); + } + + [Test] + public async Task TestReleaseConnectionWhenRollbackFailsAsync() + { + // arrange + var connectionString = ConnectionString + "minPoolSize=0"; + var pool = SnowflakeDbConnectionPool.GetPool(connectionString); + var commandThrowingExceptionOnlyForRollback = MockHelper.CommandThrowingExceptionOnlyForRollback(); + var mockDbProviderFactory = new Mock(); + mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object); + Assert.AreEqual(0, pool.GetCurrentPoolSize()); + var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object); + connection.ConnectionString = connectionString; + await connection.OpenAsync().ConfigureAwait(false); + connection.BeginTransaction(); // not using async version because it is not available on .net framework + Assert.AreEqual(true, connection.HasActiveExplicitTransaction()); + + // act + await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false); + + // assert + Assert.AreEqual(0, pool.GetCurrentPoolSize(), "Should not return connection to the pool"); + } } } diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs index 28f2181c1..9adfc6b4a 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs @@ -1,11 +1,14 @@ using System; using System.Data; +using System.Data.Common; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Moq; using NUnit.Framework; using Snowflake.Data.Client; using Snowflake.Data.Core.Session; +using Snowflake.Data.Tests.Mock; using Snowflake.Data.Tests.Util; namespace Snowflake.Data.Tests.IntegrationTests @@ -374,12 +377,52 @@ public void TestMinPoolSize() connection.Close(); } + [Test] + public void TestPreventConnectionFromReturningToPool() + { + // arrange + var connectionString = ConnectionString + "minPoolSize=0"; + var connection = OpenConnection(connectionString); + var pool = SnowflakeDbConnectionPool.GetPool(connectionString); + Assert.AreEqual(1, pool.GetCurrentPoolSize()); + + // act + connection.PreventPooling(); + connection.Close(); + + // assert + Assert.AreEqual(0, pool.GetCurrentPoolSize()); + } + + [Test] + public void TestReleaseConnectionWhenRollbackFails() + { + // arrange + var connectionString = ConnectionString + "minPoolSize=0"; + var pool = SnowflakeDbConnectionPool.GetPool(connectionString); + var commandThrowingExceptionOnlyForRollback = MockHelper.CommandThrowingExceptionOnlyForRollback(); + var mockDbProviderFactory = new Mock(); + mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object); + Assert.AreEqual(0, pool.GetCurrentPoolSize()); + var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object); + connection.ConnectionString = connectionString; + connection.Open(); + connection.BeginTransaction(); + Assert.AreEqual(true, connection.HasActiveExplicitTransaction()); + + // act + connection.Close(); + + // assert + Assert.AreEqual(0, pool.GetCurrentPoolSize(), "Should not return connection to the pool"); + } + private void WaitUntilAllSessionsCreatedOrTimeout(SessionPool pool) { var expectingToWaitAtMostForSessionCreations = TimeSpan.FromSeconds(15); Awaiter.WaitUntilConditionOrTimeout(() => pool.OngoingSessionCreationsCount() == 0, expectingToWaitAtMostForSessionCreations); } - + private SnowflakeDbConnection OpenConnection(string connectionString) { var connection = new SnowflakeDbConnection(); diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs index c3d69d97d..796779883 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs @@ -2,6 +2,7 @@ * Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. */ +using System; using System.Data; using System.Threading; using NUnit.Framework; @@ -107,5 +108,18 @@ public void TestConnectionPoolWithDispose() Assert.AreEqual(ConnectionState.Closed, conn1.State); Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(conn1.ConnectionString).GetCurrentPoolSize()); } + + [Test] + public void TestFailWhenPreventingFromReturningToPoolNotOpenedConnection() + { + // arrange + var connection = new SnowflakeDbConnection(ConnectionString); + + // act + var thrown = Assert.Throws(() => connection.PreventPooling()); + + // assert + Assert.That(thrown.Message, Does.Contain("Session not yet created for this connection. Unable to prevent the session from pooling")); + } } } diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs index acf4f49e5..3ec344f63 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs @@ -1,12 +1,13 @@ using System; using System.Data; using System.Data.Common; -using System.Threading; using System.Threading.Tasks; using NUnit.Framework; using Snowflake.Data.Client; using Snowflake.Data.Core.Session; +using Snowflake.Data.Tests.Mock; using Snowflake.Data.Tests.Util; +using Moq; namespace Snowflake.Data.Tests.IntegrationTests { @@ -300,5 +301,45 @@ public void TestConnectionPoolExpirationWorks() // so expected result should be 0 Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); } + + [Test] + public void TestPreventConnectionFromReturningToPool() + { + // arrange + var connection = new SnowflakeDbConnection(ConnectionString); + connection.Open(); + var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString); + Assert.AreEqual(0, pool.GetCurrentPoolSize()); + + // act + connection.PreventPooling(); + connection.Close(); + + // assert + Assert.AreEqual(0, pool.GetCurrentPoolSize()); + } + + [Test] + public void TestReleaseConnectionWhenRollbackFails() + { + // arrange + SnowflakeDbConnectionPool.SetMaxPoolSize(10); + var commandThrowingExceptionOnlyForRollback = MockHelper.CommandThrowingExceptionOnlyForRollback(); + var mockDbProviderFactory = new Mock(); + mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object); + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object); + connection.ConnectionString = ConnectionString; + connection.Open(); + connection.BeginTransaction(); + Assert.AreEqual(true, connection.HasActiveExplicitTransaction()); + // no Rollback or Commit; during internal Rollback while closing a connection a mocked exception will be thrown + + // act + connection.Close(); + + // assert + Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Should not return connection to the pool"); + } } } diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolAsyncIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolAsyncIT.cs index 05f7ed17f..4ba5ca83d 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolAsyncIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolAsyncIT.cs @@ -9,10 +9,7 @@ using System.Threading; using System.Threading.Tasks; using Snowflake.Data.Client; -using Snowflake.Data.Core; -using Snowflake.Data.Log; using Snowflake.Data.Tests.Mock; -using Moq; using NUnit.Framework; namespace Snowflake.Data.Tests.IntegrationTests @@ -230,30 +227,6 @@ public void TestRollbackTransactionOnPooledWhenConnectionClose() Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool"); } - [Test] - public void TestFailureOfTransactionRollbackOnConnectionClosePreventsAddingToPool() - { - SnowflakeDbConnectionPool.SetMaxPoolSize(10); - var commandThrowingExceptionOnlyForRollback = new Mock(); - commandThrowingExceptionOnlyForRollback.CallBase = true; - commandThrowingExceptionOnlyForRollback.SetupSet(it => it.CommandText = "ROLLBACK") - .Throws(new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unexpected failure on transaction rollback when connection is returned to the pool with pending transaction")); - var mockDbProviderFactory = new Mock(); - mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object); - - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize()); - using (var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object)) - { - connection.ConnectionString = ConnectionString; - connection.Open(); - connection.BeginTransaction(); - Assert.AreEqual(true, connection.HasActiveExplicitTransaction()); - // no Rollback or Commit; during internal Rollback while closing a connection a mocked exception will be thrown - } - - Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Should not return connection to the pool"); - } - [Test] // test connection pooling with concurrent connection and using async calls no close // call for connection. Connection is closed when Dispose() is called @@ -346,15 +319,5 @@ static async Task InvalidConnectionTaskAsync(string connectionString) await Task.Delay(100); } } - - private class TestSnowflakeDbConnection : SnowflakeDbConnection - { - public TestSnowflakeDbConnection(DbProviderFactory dbProviderFactory) - { - DbProviderFactory = dbProviderFactory; - } - - protected override DbProviderFactory DbProviderFactory { get; } - } } } diff --git a/Snowflake.Data.Tests/Mock/MockHelper.cs b/Snowflake.Data.Tests/Mock/MockHelper.cs new file mode 100644 index 000000000..e4cf1d218 --- /dev/null +++ b/Snowflake.Data.Tests/Mock/MockHelper.cs @@ -0,0 +1,18 @@ +using Moq; +using Snowflake.Data.Client; +using Snowflake.Data.Core; + +namespace Snowflake.Data.Tests.Mock +{ + public static class MockHelper + { + public static Mock CommandThrowingExceptionOnlyForRollback() + { + var command = new Mock(); + command.CallBase = true; + command.SetupSet(it => it.CommandText = "ROLLBACK") + .Throws(new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unexpected failure on transaction rollback when connection is returned to the pool with pending transaction")); + return command; + } + } +} diff --git a/Snowflake.Data.Tests/Mock/TestSnowflakeDbConnection.cs b/Snowflake.Data.Tests/Mock/TestSnowflakeDbConnection.cs new file mode 100644 index 000000000..621ca5dd9 --- /dev/null +++ b/Snowflake.Data.Tests/Mock/TestSnowflakeDbConnection.cs @@ -0,0 +1,15 @@ +using System.Data.Common; +using Snowflake.Data.Client; + +namespace Snowflake.Data.Tests.Mock +{ + public class TestSnowflakeDbConnection : SnowflakeDbConnection + { + public TestSnowflakeDbConnection(DbProviderFactory dbProviderFactory) + { + DbProviderFactory = dbProviderFactory; + } + + protected override DbProviderFactory DbProviderFactory { get; } + } +} diff --git a/Snowflake.Data/Client/SnowflakeDbConnection.cs b/Snowflake.Data/Client/SnowflakeDbConnection.cs index b773a0150..6d0fdfc17 100755 --- a/Snowflake.Data/Client/SnowflakeDbConnection.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnection.cs @@ -105,9 +105,37 @@ public override string DataSource public override ConnectionState State => _connectionState; internal SnowflakeDbTransaction ExplicitTransaction { get; set; } // tracks only explicit transaction operations + + public void PreventPooling() + { + if (SfSession == null) + { + throw new Exception("Session not yet created for this connection. Unable to prevent the session from pooling"); + } + SfSession.SetPooling(false); + logger.Debug($"Session {SfSession.sessionId} marked not to be pooled any more"); + } internal bool HasActiveExplicitTransaction() => ExplicitTransaction != null && ExplicitTransaction.IsActive; + private bool TryToReturnSessionToPool() + { + var pooling = SnowflakeDbConnectionPool.GetPooling() && SfSession.GetPooling(); + var transactionRollbackStatus = pooling ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; + var canReuseSession = CanReuseSession(transactionRollbackStatus); + if (!canReuseSession) + { + SnowflakeDbConnectionPool.ReleaseBusySession(SfSession); + return false; + } + var sessionReturnedToPool = SnowflakeDbConnectionPool.AddSession(SfSession); + if (sessionReturnedToPool) + { + logger.Debug($"Session pooled: {SfSession.sessionId}"); + } + return sessionReturnedToPool; + } + private TransactionRollbackStatus TerminateTransactionForDirtyConnectionReturningToPool() { if (!HasActiveExplicitTransaction()) @@ -125,7 +153,7 @@ private TransactionRollbackStatus TerminateTransactionForDirtyConnectionReturnin return TransactionRollbackStatus.Success; } } - catch (SnowflakeDbException exception) + catch (Exception exception) { // error to indicate a problem with rollback of an active transaction and inability to return dirty connection to the pool logger.Error("Closing dirty connection: rollback transaction in session: " + SfSession.sessionId + " failed, exception: " + exception.Message); @@ -151,19 +179,13 @@ public override void Close() logger.Debug("Close Connection."); if (IsNonClosedWithSession()) { - var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; - - if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession)) - { - logger.Debug($"Session pooled: {SfSession.sessionId}"); - } - else + var returnedToPool = TryToReturnSessionToPool(); + if (!returnedToPool) { SfSession.close(); } SfSession = null; } - _connectionState = ConnectionState.Closed; } @@ -189,11 +211,9 @@ public virtual Task CloseAsync(CancellationToken cancellationToken) { if (IsNonClosedWithSession()) { - var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; - - if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession)) + var returnedToPool = TryToReturnSessionToPool(); + if (returnedToPool) { - logger.Debug($"Session pooled: {SfSession.sessionId}"); _connectionState = ConnectionState.Closed; taskCompletionSource.SetResult(null); } diff --git a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs index 46348cf61..0355ddd92 100644 --- a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs @@ -54,6 +54,12 @@ internal static bool AddSession(SFSession session) return ConnectionManager.AddSession(session); } + internal static void ReleaseBusySession(SFSession session) + { + s_logger.Debug("SnowflakeDbConnectionPool::ReleaseBusySession"); + ConnectionManager.ReleaseBusySession(session); + } + public static void ClearAllPools() { s_logger.Debug("SnowflakeDbConnectionPool::ClearAllPools"); diff --git a/Snowflake.Data/Core/Session/ConnectionCacheManager.cs b/Snowflake.Data/Core/Session/ConnectionCacheManager.cs index 01f32df4a..b7c885234 100644 --- a/Snowflake.Data/Core/Session/ConnectionCacheManager.cs +++ b/Snowflake.Data/Core/Session/ConnectionCacheManager.cs @@ -15,6 +15,7 @@ internal sealed class ConnectionCacheManager : IConnectionManager public Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) => _sessionPool.GetSessionAsync(connectionString, password, cancellationToken); public bool AddSession(SFSession session) => _sessionPool.AddSession(session, false); + public void ReleaseBusySession(SFSession session) => _sessionPool.ReleaseBusySession(session); public void ClearAllPools() => _sessionPool.ClearSessions(); public void SetMaxPoolSize(int maxPoolSize) => _sessionPool.SetMaxPoolSize(maxPoolSize); public int GetMaxPoolSize() => _sessionPool.GetMaxPoolSize(); diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManager.cs b/Snowflake.Data/Core/Session/ConnectionPoolManager.cs index 71bb58adb..fae78a014 100644 --- a/Snowflake.Data/Core/Session/ConnectionPoolManager.cs +++ b/Snowflake.Data/Core/Session/ConnectionPoolManager.cs @@ -46,6 +46,12 @@ public bool AddSession(SFSession session) return GetPool(session.ConnectionString, session.Password).AddSession(session, true); } + public void ReleaseBusySession(SFSession session) + { + s_logger.Debug($"ConnectionPoolManager::ReleaseBusySession for {session.ConnectionString}"); + GetPool(session.ConnectionString, session.Password).ReleaseBusySession(session); + } + public void ClearAllPools() { s_logger.Debug("ConnectionPoolManager::ClearAllPools"); diff --git a/Snowflake.Data/Core/Session/IConnectionManager.cs b/Snowflake.Data/Core/Session/IConnectionManager.cs index c64699d54..247cbe2e6 100644 --- a/Snowflake.Data/Core/Session/IConnectionManager.cs +++ b/Snowflake.Data/Core/Session/IConnectionManager.cs @@ -13,6 +13,7 @@ internal interface IConnectionManager SFSession GetSession(string connectionString, SecureString password); Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken); bool AddSession(SFSession session); + void ReleaseBusySession(SFSession session); void ClearAllPools(); void SetMaxPoolSize(int maxPoolSize); int GetMaxPoolSize(); diff --git a/Snowflake.Data/Core/Session/SFSession.cs b/Snowflake.Data/Core/Session/SFSession.cs index b310e4676..f079412f6 100755 --- a/Snowflake.Data/Core/Session/SFSession.cs +++ b/Snowflake.Data/Core/Session/SFSession.cs @@ -77,6 +77,13 @@ public class SFSession private bool _disableQueryContextCache = false; + public bool GetPooling() => _poolConfig.PoolingEnabled; + + public void SetPooling(bool isEnabled) + { + _poolConfig.PoolingEnabled = isEnabled; + } + internal void ProcessLoginResponse(LoginResponse authnResponse) { if (authnResponse.success) diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index 6816a9d98..6f1fdd18b 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -388,18 +388,36 @@ private Task NewSessionAsync(String connectionString, SecureString pa }, TaskContinuationOptions.NotOnCanceled); } + internal void ReleaseBusySession(SFSession session) + { + s_logger.Debug("SessionPool::ReleaseBusySession"); + int currentPoolSize; + lock (_sessionPoolLock) + { + _busySessionsCounter.Decrease(); + currentPoolSize = GetCurrentPoolSize(); + } + var currentSizeMessageOldPool = $"After releasing a busy session from the pool, the pool size is: {currentPoolSize}"; + var poolSizeMessage = IsMultiplePoolsVersion() + ? $"{currentSizeMessageOldPool} - pool identified by: {ConnectionString}" + : currentSizeMessageOldPool; + s_logger.Debug(poolSizeMessage); + } + internal bool AddSession(SFSession session, bool ensureMinPoolSize) { if (!GetPooling()) return false; - if (IsMultiplePoolsVersion()) + if (!session.GetPooling()) { - s_logger.Debug($"SessionPool::AddSession - returning session to pool identified by connection string: {ConnectionString}"); - } - else - { - s_logger.Debug("SessionPool::AddSession"); + ReleaseBusySession(session); + return false; } + const string AddSessionMessage = "SessionPool::AddSession"; + var addSessionMessage = IsMultiplePoolsVersion() + ? $"{AddSessionMessage} - returning session to pool identified by connection string: {ConnectionString}" + : AddSessionMessage; + s_logger.Debug(addSessionMessage); var result = ReturnSessionToPool(session, ensureMinPoolSize); var wasSessionReturnedToPool = result.Item1; var sessionCreationTokens = result.Item2; @@ -549,13 +567,5 @@ internal List GetIdleSessionsStartTimes() return _idleSessions.Select(s => s.GetStartTime()).ToList(); } } - - - internal void Describe() - { - Console.WriteLine($"idle sessions: {_idleSessions.Count}"); - Console.WriteLine($"busy sessions: {_busySessionsCounter.Count()}"); - Console.WriteLine($"session creations : {_sessionCreationTokenCounter.Count()}"); - } } }