diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs index ed1969900..04e00e9fb 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs @@ -87,7 +87,7 @@ public void TestApplicationName() try { conn.Open(); - s_logger.Debug("{appName}"); + s_logger.Debug($"{appName}"); Assert.Fail(); } @@ -1898,7 +1898,7 @@ public void TestCloseAsyncFailure() { using (var conn = new MockSnowflakeDbConnection(new MockCloseSessionException())) { - SnowflakeDbConnectionPool.SetPooling(false); + SnowflakeDbConnectionPool.GetPool(ConnectionString, null).SetPooling(false); conn.ConnectionString = ConnectionString; Assert.AreEqual(conn.State, ConnectionState.Closed); Task task = null; diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs index 45148f066..5346137f7 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs @@ -23,6 +23,7 @@ class PoolConfig { public PoolConfig() { + var connectionPoolManagerBase = SnowflakeDbConnectionPool.Instance; // TODO: check why necessary _maxPoolSize = SnowflakeDbConnectionPool.GetMaxPoolSize(); _timeout = SnowflakeDbConnectionPool.GetTimeout(); _pooling = SnowflakeDbConnectionPool.GetPooling(); @@ -214,7 +215,7 @@ public void TestConnectionPoolIsFull() conn3.ConnectionString = ConnectionString + " retryCount=2"; conn3.Open(); Assert.AreEqual(ConnectionState.Open, conn3.State); - SnowflakeDbConnectionPool.ClearAllPools(); + // SnowflakeDbConnectionPool.ClearAllPools(); // TODO: check it! conn1.Close(); Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize()); @@ -320,6 +321,8 @@ public void TestConnectionPoolFull() conn3.Open(); Assert.AreEqual(ConnectionState.Open, conn3.State); + Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize()); + var conn4 = new SnowflakeDbConnection(); conn4.ConnectionString = ConnectionString + " retryCount=3"; conn4.Open(); diff --git a/Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs b/Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs index aa342fe68..6a4996a81 100644 --- a/Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs +++ b/Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs @@ -49,7 +49,7 @@ public override void Open() public override Task OpenAsync(CancellationToken cancellationToken) { - registerConnectionCancellationCallback(cancellationToken); + RegisterConnectionCancellationCallback(cancellationToken); SetMockSession(); diff --git a/Snowflake.Data/Client/SnowflakeDbConnection.cs b/Snowflake.Data/Client/SnowflakeDbConnection.cs index 615bcc879..5139fccd3 100755 --- a/Snowflake.Data/Client/SnowflakeDbConnection.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnection.cs @@ -26,7 +26,7 @@ public class SnowflakeDbConnection : DbConnection internal int _connectionTimeout; - private bool disposed = false; + private bool _disposed = false; private static Mutex _arraybindingMutex = new Mutex(); @@ -151,9 +151,9 @@ public override void Close() logger.Debug("Close Connection."); if (IsNonClosedWithSession()) { - var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; + var transactionRollbackStatus = GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; - if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession)) + if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(ConnectionString, Password, SfSession)) { logger.Debug($"Session pooled: {SfSession.sessionId}"); } @@ -172,7 +172,7 @@ public override void Close() // Adding an override for CloseAsync will prevent the need for casting to SnowflakeDbConnection to call CloseAsync(CancellationToken). public override async Task CloseAsync() { - await CloseAsync(CancellationToken.None); + await CloseAsync(CancellationToken.None).ConfigureAwait(false); } #endif @@ -189,9 +189,9 @@ public Task CloseAsync(CancellationToken cancellationToken) { if (IsNonClosedWithSession()) { - var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; + var transactionRollbackStatus = GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; - if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession)) + if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(ConnectionString, Password, SfSession)) { logger.Debug($"Session pooled: {SfSession.sessionId}"); _connectionState = ConnectionState.Closed; @@ -234,51 +234,34 @@ public Task CloseAsync(CancellationToken cancellationToken) private bool CanReuseSession(TransactionRollbackStatus transactionRollbackStatus) { - return SnowflakeDbConnectionPool.GetPooling() && + return GetPooling() && transactionRollbackStatus == TransactionRollbackStatus.Success; } + private bool GetPooling() + { + return SnowflakeDbConnectionPool.GetPool(ConnectionString, Password).GetPooling(); + } + public override void Open() { logger.Debug("Open Connection."); if (_connectionState != ConnectionState.Closed) { - logger.Debug($"Open with a connection already opened: {_connectionState}"); + logger.Warn($"Opening a connection already opened: {_connectionState}"); return; } - SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString); - if (SfSession != null) + + try { - logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); + OnSessionOpen(); + SfSession = SnowflakeDbConnectionPool.GetSession(ConnectionString, Password); + OnSessionEstablished(); } - else + catch (Exception e) { - SetSession(); - try - { - SfSession.Open(); - } - catch (Exception e) - { - // Otherwise when Dispose() is called, the close request would timeout. - _connectionState = ConnectionState.Closed; - logger.Error("Unable to connect", e); - if (!(e.GetType() == typeof(SnowflakeDbException))) - { - throw - new SnowflakeDbException( - e, - SnowflakeDbException.CONNECTION_FAILURE_SSTATE, - SFError.INTERNAL_ERROR, - "Unable to connect. " + e.Message); - } - else - { - throw; - } - } + RethrowOnSessionOpenFailure(e); } - OnSessionEstablished(); } public override Task OpenAsync(CancellationToken cancellationToken) @@ -286,34 +269,19 @@ public override Task OpenAsync(CancellationToken cancellationToken) logger.Debug("Open Connection Async."); if (_connectionState != ConnectionState.Closed) { - logger.Debug($"Open with a connection already opened: {_connectionState}"); + logger.Warn($"Opening a connection already opened: {_connectionState}"); return Task.CompletedTask; } - SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString); - if (SfSession != null) - { - logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); - OnSessionEstablished(); - return Task.CompletedTask; - } - - registerConnectionCancellationCallback(cancellationToken); - SetSession(); - return SfSession.OpenAsync(cancellationToken).ContinueWith( - previousTask => + OnSessionOpen(); + return SnowflakeDbConnectionPool.GetSessionAsync(ConnectionString, Password, cancellationToken) + .ContinueWith(previousTask => { if (previousTask.IsFaulted) { // Exception from SfSession.OpenAsync - Exception sfSessionEx = previousTask.Exception; _connectionState = ConnectionState.Closed; - logger.Error("Unable to connect", sfSessionEx); - throw new SnowflakeDbException( - sfSessionEx, - SnowflakeDbException.CONNECTION_FAILURE_SSTATE, - SFError.INTERNAL_ERROR, - "Unable to connect"); + RethrowOnSessionOpenFailure(previousTask.Exception); } else if (previousTask.IsCanceled) { @@ -322,7 +290,8 @@ public override Task OpenAsync(CancellationToken cancellationToken) } else { - logger.Debug("All good"); + SfSession = previousTask.Result; + logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); // Only continue if the session was opened successfully OnSessionEstablished(); } @@ -344,23 +313,42 @@ public void SetArrayBindStageCreated() { _isArrayBindStageCreated = true; } - - /// - /// Create a new SFsession with the connection string settings. - /// - /// If the connection string can't be processed - private void SetSession() + + private void OnSessionOpen() { - SfSession = new SFSession(ConnectionString, Password); - _connectionTimeout = (int)SfSession.connectionTimeout.TotalSeconds; + logger.Debug("Opening session"); _connectionState = ConnectionState.Connecting; } private void OnSessionEstablished() { + if (SfSession == null) + { + logger.Error("Error during opening session"); + throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unable to establish a session"); + } + logger.Debug("Session established"); _connectionState = ConnectionState.Open; + _connectionTimeout = (int)SfSession.connectionTimeout.TotalSeconds; + logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); } + private void RethrowOnSessionOpenFailure(Exception exception) + { + // Otherwise when Dispose() is called, the close request would timeout. + _connectionState = ConnectionState.Closed; + logger.Error("Unable to connect: ", exception); + if (exception != null && exception is SnowflakeDbException dbException) + throw dbException; + + var errorMessage = "Unable to connect. " + (exception != null ? exception.Message : ""); + throw new SnowflakeDbException( + exception, + SnowflakeDbException.CONNECTION_FAILURE_SSTATE, + SFError.INTERNAL_ERROR, + errorMessage); + } + protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) { // Parameterless BeginTransaction() method of the super class calls this method with IsolationLevel.Unspecified, @@ -382,12 +370,12 @@ protected override DbCommand CreateDbCommand() protected override void Dispose(bool disposing) { - if (disposed) + if (_disposed) return; try { - this.Close(); + Close(); } catch (Exception ex) { @@ -395,7 +383,7 @@ protected override void Dispose(bool disposing) logger.Error("Unable to close connection", ex); } - disposed = true; + _disposed = true; base.Dispose(disposing); } @@ -406,7 +394,7 @@ protected override void Dispose(bool disposing) /// layer or timeout reached. Whichever comes first would trigger query cancellation. /// /// cancellation token from upper layer - internal void registerConnectionCancellationCallback(CancellationToken externalCancellationToken) + internal void RegisterConnectionCancellationCallback(CancellationToken externalCancellationToken) { if (!externalCancellationToken.IsCancellationRequested) { diff --git a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs index 8e0093262..926e4ca49 100644 --- a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs @@ -1,4 +1,9 @@ -using Snowflake.Data.Core; +using System; +using System.Security; +using System.Threading; +using System.Threading.Tasks; +using Snowflake.Data.Core; +using Snowflake.Data.Core.ConnectionPool; using Snowflake.Data.Core.Session; using Snowflake.Data.Log; @@ -7,58 +12,122 @@ namespace Snowflake.Data.Client public class SnowflakeDbConnectionPool { private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); - - internal static SFSession GetSession(string connStr) + private static readonly Object s_instanceLock = new Object(); + private static ConnectionPoolManagerBase s_connectionPoolManager; + private static PoolManagerVersion s_poolVersion = PoolManagerVersion.Version1; + + public static ConnectionPoolManagerBase Instance { - s_logger.Debug("SnowflakeDbConnectionPool::GetSession"); - return SessionPoolSingleton.Instance.GetSession(connStr); + get + { + if (s_connectionPoolManager != null) + return s_connectionPoolManager; + lock (s_instanceLock) + { + s_connectionPoolManager = ProvideConnectionPoolManager(); + } + return s_connectionPoolManager; + } } - - internal static bool AddSession(SFSession session) + + public static SessionPool GetPool(string connectionString, SecureString password) { - s_logger.Debug("SnowflakeDbConnectionPool::AddSession"); - return SessionPoolSingleton.Instance.AddSession(session); + s_logger.Debug("SnowflakeDbConnectionPool::GetSession"); + return Instance.GetPool(connectionString, password); } public static void ClearAllPools() { s_logger.Debug("SnowflakeDbConnectionPool::ClearAllPools"); - SessionPoolSingleton.Instance.ClearAllPools(); + Instance.ClearAllPools(); } public static void SetMaxPoolSize(int size) { - SessionPoolSingleton.Instance.SetMaxPoolSize(size); + s_logger.Debug("SnowflakeDbConnectionPool::SetMaxPoolSize"); + Instance.SetMaxPoolSize(size); } public static int GetMaxPoolSize() { - return SessionPoolSingleton.Instance.GetMaxPoolSize(); + s_logger.Debug("SnowflakeDbConnectionPool::GetMaxPoolSize"); + return Instance.GetMaxPoolSize(); } public static void SetTimeout(long time) { - SessionPoolSingleton.Instance.SetTimeout(time); + s_logger.Debug("SnowflakeDbConnectionPool::SetTimeout"); + Instance.SetTimeout(time); } public static long GetTimeout() { - return SessionPoolSingleton.Instance.GetTimeout(); + s_logger.Debug("SnowflakeDbConnectionPool::GetTimeout"); + return Instance.GetTimeout(); } public static int GetCurrentPoolSize() { - return SessionPoolSingleton.Instance.GetCurrentPoolSize(); + s_logger.Debug("SnowflakeDbConnectionPool::GetCurrentPoolSize"); + return Instance.GetCurrentPoolSize(); } public static bool SetPooling(bool isEnable) { - return SessionPoolSingleton.Instance.SetPooling(isEnable); + s_logger.Debug("SnowflakeDbConnectionPool::SetPooling"); + return Instance.SetPooling(isEnable); } public static bool GetPooling() { - return SessionPoolSingleton.Instance.GetPooling(); + s_logger.Debug("SnowflakeDbConnectionPool::GetPooling"); + return Instance.GetPooling(); + } + + private static ConnectionPoolManagerBase ProvideConnectionPoolManager() + { + switch (s_poolVersion) + { + case PoolManagerVersion.Version1: return new ConnectionPoolManagerV1(); + case PoolManagerVersion.Version2: return new ConnectionPoolManagerV2(); + default: throw new NotSupportedException("Pool version not supported"); + } + } + + internal static SFSession GetSession(string connectionString, SecureString password) + { + s_logger.Debug("SnowflakeDbConnectionPool::GetSession"); + return Instance.GetSession(connectionString, password); + } + + internal static Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + s_logger.Debug("SnowflakeDbConnectionPool::GetSessionAsync"); + return Instance.GetSessionAsync(connectionString, password, cancellationToken); + } + + internal static bool AddSession(string connectionString, SecureString password, SFSession session) + { + s_logger.Debug("SnowflakeDbConnectionPool::AddSession"); + return Instance.AddSession(connectionString, password, session); + } + + internal static PoolManagerVersion GetVersion() + { + return s_poolVersion; + } + + public static void InternalTogglePreviousPool() + { + s_logger.Debug("ClearAllPools"); + if (Instance.GetCurrentPoolSize() > 0) + throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Switch pool version before connections are established!"); + ClearAllPools(); + lock (s_instanceLock) + { + s_poolVersion = PoolManagerVersion.Version1; + s_connectionPoolManager = ProvideConnectionPoolManager(); + } } } } diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs b/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs new file mode 100644 index 000000000..9fccec62a --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs @@ -0,0 +1,88 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security; +using System.Threading; +using System.Threading.Tasks; + +namespace Snowflake.Data.Core.Session +{ + public enum PoolManagerVersion + { + Version1, + Version2 + } + + public abstract class ConnectionPoolManagerBase + { + protected static readonly Object PoolsLock = new Object(); + protected readonly Dictionary Pools; + protected ConnectionPoolManagerBase() + { + lock (PoolsLock) + { + Pools = new Dictionary(); + } + } + + protected abstract PoolManagerVersion GetVersion(); + + protected abstract void ApplyPoolDefaults(SessionPool pool); + + protected virtual string GetPoolKey(string connectionString) => connectionString; + + public SessionPool GetPool(string connectionString, SecureString password) + { + string poolKey = GetPoolKey(connectionString); + if (Pools.ContainsKey(poolKey)) + return Pools[poolKey]; + lock (PoolsLock) + { + var pool = CreateSessionPool(connectionString, password); + ApplyPoolDefaults(pool); + Pools.Add(poolKey, pool); + return pool; + } + } + + public void ClearAllPools() + { + Pools.Values.ToList().ForEach(pool => pool.ClearAllPools()); + } + + public virtual void SetMaxPoolSize(int size) => throw FeatureNotAvailableForPoolVersion(); + public virtual int GetMaxPoolSize() => throw FeatureNotAvailableForPoolVersion(); + public virtual void SetTimeout(long time) => throw FeatureNotAvailableForPoolVersion(); + public virtual long GetTimeout() => throw FeatureNotAvailableForPoolVersion(); + public virtual int GetCurrentPoolSize() => throw FeatureNotAvailableForPoolVersion(); + public virtual bool SetPooling(bool isEnable) => throw FeatureNotAvailableForPoolVersion(); + public virtual bool GetPooling() => throw FeatureNotAvailableForPoolVersion(); + + internal SFSession GetSession(string connectionString, SecureString password) + { + // TODO: + // pool ver.1 is the same for any connection string so there's still need to pass params to GetSession + // pool ver.2 is different for each connection strings so in theory no need to pass params to GetSession + var sessionPool = GetPool(connectionString, password); + return sessionPool.GetSession(connectionString, password); + } + + internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + return GetPool(connectionString, password).GetSessionAsync(connectionString, password, cancellationToken); + } + + internal bool AddSession(string connectionString, SecureString password, SFSession session) + { + return GetPool(connectionString, password).AddSession(session); + } + + private SessionPool CreateSessionPool(string connectionString, SecureString password) => new SessionPool(connectionString, password); + + private NotSupportedException FeatureNotAvailableForPoolVersion() + { + return new NotSupportedException("API not available for selected connection pool version selected: " + + GetVersion()); + } + } +} \ No newline at end of file diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManagerV1.cs b/Snowflake.Data/Core/Session/ConnectionPoolManagerV1.cs new file mode 100644 index 000000000..424f59fdb --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionPoolManagerV1.cs @@ -0,0 +1,38 @@ +using System.Linq; +using System.Security; + +namespace Snowflake.Data.Core.Session +{ + sealed class ConnectionPoolManagerV1 : ConnectionPoolManagerBase + { + private const bool AllowExceedMaxPoolSizeDefault = true; + // private const int MinPoolSizeDefault = 0; // TODO: SNOW-902610 + private const int MaxPoolSizeDefault = 10; + private const string SinglePoolKeyForAllDataSources = "CONNECTION_CACHE"; + private const SessionPickAlgorithm SessionPicking = SessionPickAlgorithm.MatchConnectionString; + + protected override void ApplyPoolDefaults(SessionPool pool) + { + pool.SetAllowExceedMaxPoolSize(AllowExceedMaxPoolSizeDefault); + pool.SetMaxPoolSize(MaxPoolSizeDefault); + pool.SetSessionPickAlgorithm(SessionPicking); + } + protected override PoolManagerVersion GetVersion() => PoolManagerVersion.Version1; + // Same pool for any connection string (backward compatible solution) + protected override string GetPoolKey(string connectionString) => SinglePoolKeyForAllDataSources; + public override void SetMaxPoolSize(int size) => GetPool().SetMaxPoolSize(size); + public override int GetMaxPoolSize() => GetPool().GetMaxPoolSize(); + public override void SetTimeout(long time) => Pools.Values.First().SetTimeout(time); + public override long GetTimeout() => Pools.Values.First().GetTimeout(); + public override int GetCurrentPoolSize() => Pools.Values.First().GetCurrentPoolSize(); + public override bool GetPooling() => Pools.Values.First().GetPooling(); + public override bool SetPooling(bool isEnable) + { + if (GetPooling() == isEnable) + return false; + Pools.Values.First().SetPooling(isEnable); + return isEnable; + } + private SessionPool GetPool() => GetPool(null, null); + } +} \ No newline at end of file diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs b/Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs new file mode 100644 index 000000000..9f4e832b5 --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs @@ -0,0 +1,28 @@ +using System.Linq; +using Snowflake.Data.Core.Session; + +namespace Snowflake.Data.Core.ConnectionPool +{ + class ConnectionPoolManagerV2 : ConnectionPoolManagerBase + { + private const bool AllowExceedMaxPoolSizeDefault = false; + // private const int MinPoolSizeDefault = 0; // TODO: SNOW-902610 + private const int MaxPoolSizeDefault = 10; + private const SessionPickAlgorithm SessionPicking = SessionPickAlgorithm.PickOldest; + + protected override PoolManagerVersion GetVersion() => PoolManagerVersion.Version2; + + protected override void ApplyPoolDefaults(SessionPool pool) + { + pool.SetAllowExceedMaxPoolSize(AllowExceedMaxPoolSizeDefault); + // pool.SetMinPoolSize(MinPoolSizeDefault); // TODO: SNOW-902610 + pool.SetMaxPoolSize(MaxPoolSizeDefault); + pool.SetSessionPickAlgorithm(SessionPicking); + } + + public new int GetCurrentPoolSize() + { + return Pools.Values.Sum(sessionPool => sessionPool.GetCurrentPoolSize()); + } + } +} \ No newline at end of file diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index 092135e47..3025be5ed 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -1,33 +1,60 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Security; +using System.Threading; +using System.Threading.Tasks; +using Snowflake.Data.Client; using Snowflake.Data.Log; namespace Snowflake.Data.Core.Session { - sealed class SessionPoolSingleton : IDisposable + public enum SessionPickAlgorithm { - private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); - private static SessionPoolSingleton s_instance = null; + MatchConnectionString, + PickOldest + } + + public class SessionPool : IDisposable + { + private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); private static readonly object s_sessionPoolLock = new object(); + // private const int MinPoolSize = 0; // TODO: + private const int MaxPoolSize = 10; + private const long Timeout = 3600; + private const long OpenSessionTimeout = 60 * 2; + private readonly List _sessionPool; + private int _busySessions; + // private int _minPoolSize; // TODO: private int _maxPoolSize; private long _timeout; - private const int MaxPoolSize = 10; - private const long Timeout = 3600; - private bool _pooling = true; + private long _openSessionTimeout; + private int _openSessionSleepTimeout = 250; + private bool _pooling = true; // TODO: Get rid of that! + private string _connectionString; + private SecureString _password; + private bool _allowExceedMaxPoolSize; // backward compatibility flag + private SessionPickAlgorithm _sessionPick; // backward compatibility flag - SessionPoolSingleton() + internal SessionPool(string connectionString, SecureString password) { lock (s_sessionPoolLock) { _sessionPool = new List(); + _busySessions = 0; _maxPoolSize = MaxPoolSize; _timeout = Timeout; + // _minPoolSize = MinPoolSize; // TODO: + _openSessionTimeout = OpenSessionTimeout; + _connectionString = connectionString; + _password = password; } } - ~SessionPoolSingleton() + + ~SessionPool() { ClearAllPools(); } @@ -36,69 +63,249 @@ public void Dispose() { ClearAllPools(); } - - public static SessionPoolSingleton Instance + + private void CleanExpiredSessions() { - get + s_logger.Debug("SessionPool::CleanExpiredSessions"); + lock (s_sessionPoolLock) { - lock (s_sessionPoolLock) + long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + + foreach (var item in _sessionPool.ToList()) { - if(s_instance == null) + if (item.IsExpired(_timeout, timeNow)) { - s_instance = new SessionPoolSingleton(); + _sessionPool.Remove(item); + item.close(); } - return s_instance; } } } - private void CleanExpiredSessions() + internal SFSession GetSession(string connectionString, SecureString password) + { + if (!_pooling) + return OpenNewSession(connectionString, password); + CleanExpiredSessions(); + // EnsureMinPoolSizeAsync(); + return ProvidePooledSession(connectionString, password); + } + + internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) { - s_logger.Debug("SessionPool::cleanExpiredSessions"); + if (!_pooling) + return OpenNewSessionAsync(connectionString, password, cancellationToken); + CleanExpiredSessions(); + // EnsureMinPoolSizeAsync(); + return ProvidePooledSessionAsync(connectionString, password, cancellationToken); + } + + private SFSession ProvidePooledSession(string connectionString, SecureString password) + { + if (!_pooling) + return null; + lock (s_sessionPoolLock) { - long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (GetIdleSessionsSize() > 0) + { + var sessionFromPool = PickSession(connectionString); // oldest idle // TODO: + if (sessionFromPool != null) + { + _sessionPool.Remove(sessionFromPool); + _busySessions++; + s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}"); + return sessionFromPool; + } + } - foreach (var item in _sessionPool.ToList()) + if (GetCurrentPoolSize() < MaxPoolSize) { - if (item.IsExpired(_timeout, timeNow)) + var newSession = OpenNewSession(connectionString, password); + _busySessions++; + s_logger.Info($"Created new pooled session with sid {newSession.sessionId}"); + return newSession; + } + s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections"); + } + + if (_allowExceedMaxPoolSize) // backward compatibility + { + s_logger.Warn($"Exceeding Max Pool Size enabled, providing new session"); + return OpenNewSession(connectionString, password); + } + + return WaitForPooledSession(CancellationToken.None); + } + + private Task ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + if (!_pooling) + return null; + + lock (s_sessionPoolLock) + { + if (GetIdleSessionsSize() > 0) + { + var sessionFromPool = PickSession(connectionString); + if (sessionFromPool != null) { - _sessionPool.Remove(item); - item.close(); + _sessionPool.Remove(sessionFromPool); + _busySessions++; + s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}"); + return Task.FromResult(sessionFromPool); } } + + if (GetCurrentPoolSize() < MaxPoolSize) + { + var session = OpenNewSessionAsync(connectionString, password, cancellationToken); + _busySessions++; + s_logger.Info($"Creating new pooled session"); + return session; + } + + s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections"); } + + if (_allowExceedMaxPoolSize) // backward compatibility + { + s_logger.Warn($"Exceeding Max Pool Size enabled, providing new session"); + return OpenNewSessionAsync(connectionString, password, cancellationToken); + } + + return Task.Run(()=>WaitForPooledSession(cancellationToken)); } - internal SFSession GetSession(string connStr) + private SFSession WaitForPooledSession(CancellationToken cancellationToken) { - s_logger.Debug("SessionPool::GetSession"); if (!_pooling) return null; - lock (s_sessionPoolLock) + + SFSession session = null; + long start = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + + while (session == null) { - for (int i = 0; i < _sessionPool.Count; i++) + if (cancellationToken.IsCancellationRequested) + cancellationToken.ThrowIfCancellationRequested(); + + lock (s_sessionPoolLock) { - if (_sessionPool[i].connStr.Equals(connStr)) + session = PickSession(_connectionString); + if (session != null) { - SFSession session = _sessionPool[i]; - _sessionPool.RemoveAt(i); - long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); - if (session.IsExpired(_timeout, timeNow)) - { - session.close(); - i--; - } - else - { - s_logger.Debug($"reuse pooled session with sid {session.sessionId}"); - return session; - } + _sessionPool.Remove(session); + _busySessions++; + return session; } } + + Thread.Sleep(_openSessionSleepTimeout); + long now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (start + _openSessionTimeout < now) + throw new SnowflakeDbException( + new TimeoutException("No free connections in the pool."), // TODO: + SnowflakeDbException.CONNECTION_FAILURE_SSTATE, + SFError.INTERNAL_ERROR, + "Unable to connect."); + } + + return session; + } + + // TODO: inject + SFSession PickSession(string connectionString) + { + SFSession session = null; + lock (s_sessionPoolLock) + { + switch (_sessionPick) + { + case SessionPickAlgorithm.MatchConnectionString: + session = _sessionPool.FirstOrDefault(it => it.connStr.Equals(connectionString)); + break; + case SessionPickAlgorithm.PickOldest: + session = _sessionPool.Any() ? _sessionPool[0] : null; + break; + } + if (session != null) + _sessionPool.Remove(session); + } + + return session; + } + + // TODO: + // private void EnsureMinPoolSize() + // { + // if (!_pooling || _minPoolSize == 0) + // return; + // + // lock (s_sessionPoolLock) + // { + // s_logger.Debug($"Filling up connection pool to {_minPoolSize}"); + // + // while (GetCurrentPoolSize() < _minPoolSize) + // { + // var newSession = OpenNewSession(); + // _sessionPool.Add(newSession); + // } + // } + // } + + internal SFSession OpenNewSession(string connectionString, SecureString password) + { + SFSession session = new SFSession(connectionString, password); + try + { + session.Open(); + s_logger.Debug($"session opened {session.sessionId}"); + } + catch (Exception e) + { + // Otherwise when Dispose() is called, the close request would timeout. + // _connectionState = ConnectionState.Closed; // TODO: + s_logger.Error("Unable to connect", e); + if (!(e is SnowflakeDbException)) + { + throw + new SnowflakeDbException( + e, + SnowflakeDbException.CONNECTION_FAILURE_SSTATE, + SFError.INTERNAL_ERROR, + "Unable to connect. " + e.Message); + } + else + { + throw; + } } - return null; + return session; } + + internal Task OpenNewSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + SFSession session = new SFSession(connectionString, password); + return session + .OpenAsync(cancellationToken) + .ContinueWith(previousTask => + { + if (previousTask.IsFaulted) + { + Debug.Assert(previousTask.Exception != null, "previousTask.Exception != null"); + throw previousTask.Exception; + } + + if (cancellationToken.IsCancellationRequested) + { + cancellationToken.ThrowIfCancellationRequested(); + } + + return session; + }, cancellationToken); + } + internal bool AddSession(SFSession session) { s_logger.Debug("SessionPool::AddSession"); @@ -106,23 +313,34 @@ internal bool AddSession(SFSession session) return false; long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); if (session.IsNotOpen() || session.IsExpired(_timeout, timeNow)) - return false; + { + s_logger.Warn($"Session returning to the pool in an undesired state: {session.sessionId}"); + // TODO: fix because it is counted in the pool + // TODO: lock + if (_busySessions > 0) + _busySessions--; + return false; + } + if (_sessionPool.Count >= _maxPoolSize) + CleanExpiredSessions(); + lock (s_sessionPoolLock) { if (_sessionPool.Count >= _maxPoolSize) { - CleanExpiredSessions(); - } - if (_sessionPool.Count >= _maxPoolSize) - { - // pool is full + s_logger.Warn($"Pool is full, cannot add session with sid {session.sessionId}"); return false; } - s_logger.Debug($"pool connection with sid {session.sessionId}"); + if (_busySessions > 0) + _busySessions--; + s_logger.Debug($"Connection returned to the pool with sid {session.sessionId}"); _sessionPool.Add(session); return true; + + // s_logger.Warn($"Unexpected session with sid {session.sessionId} was not returned to the pool"); // or clear pool was called and session was created before + return false; } } @@ -136,6 +354,7 @@ internal void ClearAllPools() session.close(); } _sessionPool.Clear(); + _busySessions = 0; // TODO: check test TestConnectionPoolIsFull } } @@ -161,7 +380,13 @@ public long GetTimeout() public int GetCurrentPoolSize() { - return _sessionPool.Count; + switch (SnowflakeDbConnectionPool.GetVersion()) + { + case PoolManagerVersion.Version1: return _sessionPool.Count; + case PoolManagerVersion.Version2: return _sessionPool.Count + _busySessions; + } + throw new NotSupportedException("Unknown pool version"); + } public bool SetPooling(bool isEnable) @@ -180,6 +405,26 @@ public bool GetPooling() { return _pooling; } + + private int GetIdleSessionsSize() + { + return _sessionPool.Count; + } + + public bool GetAllowExceedMaxPoolSize() + { + return _allowExceedMaxPoolSize; + } + + public void SetAllowExceedMaxPoolSize(bool allowExceedMaxPoolSize) + { + _allowExceedMaxPoolSize = allowExceedMaxPoolSize; + } + + public void SetSessionPickAlgorithm(SessionPickAlgorithm sessionPicking) + { + _sessionPick = sessionPicking; + } } } \ No newline at end of file