diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs index ed1969900..c70eed995 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs @@ -1898,7 +1898,7 @@ public void TestCloseAsyncFailure() { using (var conn = new MockSnowflakeDbConnection(new MockCloseSessionException())) { - SnowflakeDbConnectionPool.SetPooling(false); + SnowflakeDbConnectionPool.GetPool(ConnectionString, null).SetPooling(false); conn.ConnectionString = ConnectionString; Assert.AreEqual(conn.State, ConnectionState.Closed); Task task = null; diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs index 32d9e035d..8ac5c3004 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs @@ -215,7 +215,7 @@ public void TestConnectionPoolIsFull() conn3.ConnectionString = ConnectionString + " retryCount=2"; conn3.Open(); Assert.AreEqual(ConnectionState.Open, conn3.State); - SnowflakeDbConnectionPool.ClearAllPools(); + // SnowflakeDbConnectionPool.ClearAllPools(); // TODO: check it! conn1.Close(); Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); diff --git a/Snowflake.Data/Client/SnowflakeDbConnection.cs b/Snowflake.Data/Client/SnowflakeDbConnection.cs index 2737f2838..5139fccd3 100755 --- a/Snowflake.Data/Client/SnowflakeDbConnection.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnection.cs @@ -280,6 +280,7 @@ public override Task OpenAsync(CancellationToken cancellationToken) if (previousTask.IsFaulted) { // Exception from SfSession.OpenAsync + _connectionState = ConnectionState.Closed; RethrowOnSessionOpenFailure(previousTask.Exception); } else if (previousTask.IsCanceled) @@ -289,6 +290,7 @@ public override Task OpenAsync(CancellationToken cancellationToken) } else { + SfSession = previousTask.Result; logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); // Only continue if the session was opened successfully OnSessionEstablished(); diff --git a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs index 11241c7c2..cc2085ce8 100644 --- a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs @@ -3,6 +3,7 @@ using System.Threading; using System.Threading.Tasks; using Snowflake.Data.Core; +using Snowflake.Data.Core.ConnectionPool; using Snowflake.Data.Core.Session; using Snowflake.Data.Log; @@ -13,7 +14,7 @@ public class SnowflakeDbConnectionPool private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); private static readonly Object s_instanceLock = new Object(); private static ConnectionPoolManagerBase s_connectionPoolManager; - private static readonly PoolManagerVersion s_poolVersion = PoolManagerVersion.Version1; + private static PoolManagerVersion s_poolVersion = PoolManagerVersion.Version2; public static ConnectionPoolManagerBase Instance { @@ -85,10 +86,12 @@ public static bool GetPooling() private static ConnectionPoolManagerBase ProvideConnectionPoolManager() { - if (s_poolVersion == PoolManagerVersion.Version1) - return new ConnectionPoolManagerV1(); - - throw new NotSupportedException("Pool version not supported"); + switch (s_poolVersion) + { + case PoolManagerVersion.Version1: return new ConnectionPoolManagerV1(); + case PoolManagerVersion.Version2: return new ConnectionPoolManagerV2(); + default: throw new NotSupportedException("Pool version not supported"); + } } internal static SFSession GetSession(string connectionString, SecureString password) @@ -97,7 +100,7 @@ internal static SFSession GetSession(string connectionString, SecureString passw return Instance.GetSession(connectionString, password); } - internal static Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + internal static Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) { s_logger.Debug("SnowflakeDbConnectionPool::GetSessionAsync"); return Instance.GetSessionAsync(connectionString, password, cancellationToken); @@ -109,6 +112,17 @@ internal static bool AddSession(string connectionString, SecureString password, return Instance.AddSession(connectionString, password, session); } - + public static void InternalTogglePreviousPool() + { + s_logger.Debug("ClearAllPools"); + if (Instance.GetCurrentPoolSize() > 0) + throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Switch pool version before connections are established!"); + ClearAllPools(); + lock (s_instanceLock) + { + s_poolVersion = PoolManagerVersion.Version1; + s_connectionPoolManager = ProvideConnectionPoolManager(); + } + } } } diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs b/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs index 741153c51..0ebca2022 100644 --- a/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs +++ b/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs @@ -65,7 +65,7 @@ internal SFSession GetSession(string connectionString, SecureString password) return sessionPool.GetSession(connectionString, password); } - internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) { return GetPool(connectionString, password).GetSessionAsync(connectionString, password, cancellationToken); } diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs b/Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs new file mode 100644 index 000000000..572704113 --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs @@ -0,0 +1,26 @@ +using System.Linq; +using Snowflake.Data.Core.Session; + +namespace Snowflake.Data.Core.ConnectionPool +{ + class ConnectionPoolManagerV2 : ConnectionPoolManagerBase + { + private const bool AllowExceedMaxPoolSizeDefault = false; + // private const int MinPoolSizeDefault = 0; // TODO: SNOW-902610 + private const int MaxPoolSizeDefault = 10; + + protected override PoolManagerVersion GetVersion() => PoolManagerVersion.Version2; + + protected override void ApplyPoolDefaults(SessionPool pool) + { + pool.SetAllowExceedMaxPoolSize(AllowExceedMaxPoolSizeDefault); + // pool.SetMinPoolSize(MinPoolSizeDefault); // TODO: SNOW-902610 + pool.SetMaxPoolSize(MaxPoolSizeDefault); + } + + public new int GetCurrentPoolSize() + { + return Pools.Values.Sum(sessionPool => sessionPool.GetCurrentPoolSize()); + } + } +} \ No newline at end of file diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index 6fe2fa757..9ea4ccb9e 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Security; using System.Threading; @@ -82,7 +83,7 @@ internal SFSession GetSession(string connectionString, SecureString password) return ProvidePooledSession(connectionString, password); } - internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) { if (!_pooling) return OpenNewSessionAsync(connectionString, password, cancellationToken); @@ -148,16 +149,8 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas lock (s_sessionPoolLock) { - if (GetCurrentPoolSize() < _maxPoolSize) + if (GetIdleSessionsSize() > 0) { - if (GetIdleSessionsSize() == 0) - { - var newSession = OpenNewSession(connectionString, password); - _busySessions++; - s_logger.Info($"Created new pooled session with sid {newSession.sessionId}"); - return newSession; - } - var sessionFromPool = _sessionPool[0]; // oldest idle _sessionPool.Remove(sessionFromPool); _busySessions++; @@ -165,6 +158,13 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas return sessionFromPool; } + if (GetCurrentPoolSize() < MaxPoolSize) + { + var newSession = OpenNewSession(connectionString, password); + _busySessions++; + s_logger.Info($"Created new pooled session with sid {newSession.sessionId}"); + return newSession; + } s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections"); } @@ -178,23 +178,15 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas } - private Task ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + private Task ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) { if (!_pooling) return null; lock (s_sessionPoolLock) { - if (GetCurrentPoolSize() < _maxPoolSize) + if (GetIdleSessionsSize() > 0) { - if (GetIdleSessionsSize() == 0) - { - var session = OpenNewSessionAsync(connectionString, password, cancellationToken); - _busySessions++; - s_logger.Info($"Creating new pooled session"); - return session; - } - var sessionFromPool = _sessionPool[0]; // oldest idle _sessionPool.Remove(sessionFromPool); _busySessions++; @@ -202,6 +194,14 @@ private Task ProvidePooledSessionAsync(string connectionString, SecureString pas return Task.FromResult(sessionFromPool); } + if (GetCurrentPoolSize() < MaxPoolSize) + { + var session = OpenNewSessionAsync(connectionString, password, cancellationToken); + _busySessions++; + s_logger.Info($"Creating new pooled session"); + return session; + } + s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections"); } @@ -300,18 +300,26 @@ internal SFSession OpenNewSession(string connectionString, SecureString password return session; } - internal Task OpenNewSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + internal Task OpenNewSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) { SFSession session = new SFSession(connectionString, password); - try - { - return session.OpenAsync(cancellationToken); - } - catch (Exception e) - { - s_logger.Error("Unable to connect", e); - return Task.FromException(e); - } + return session + .OpenAsync(cancellationToken) + .ContinueWith(previousTask => + { + if (previousTask.IsFaulted) + { + Debug.Assert(previousTask.Exception != null, "previousTask.Exception != null"); + throw previousTask.Exception; + } + + if (cancellationToken.IsCancellationRequested) + { + cancellationToken.ThrowIfCancellationRequested(); + } + + return session; + }, cancellationToken); } internal bool AddSession(SFSession session)