diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs index 45148f066..32d9e035d 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs @@ -23,6 +23,7 @@ class PoolConfig { public PoolConfig() { + var connectionPoolManagerBase = SnowflakeDbConnectionPool.Instance; // TODO: check why necessary _maxPoolSize = SnowflakeDbConnectionPool.GetMaxPoolSize(); _timeout = SnowflakeDbConnectionPool.GetTimeout(); _pooling = SnowflakeDbConnectionPool.GetPooling(); diff --git a/Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs b/Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs index aa342fe68..6a4996a81 100644 --- a/Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs +++ b/Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs @@ -49,7 +49,7 @@ public override void Open() public override Task OpenAsync(CancellationToken cancellationToken) { - registerConnectionCancellationCallback(cancellationToken); + RegisterConnectionCancellationCallback(cancellationToken); SetMockSession(); diff --git a/Snowflake.Data/Client/SnowflakeDbConnection.cs b/Snowflake.Data/Client/SnowflakeDbConnection.cs index 615bcc879..2737f2838 100755 --- a/Snowflake.Data/Client/SnowflakeDbConnection.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnection.cs @@ -26,7 +26,7 @@ public class SnowflakeDbConnection : DbConnection internal int _connectionTimeout; - private bool disposed = false; + private bool _disposed = false; private static Mutex _arraybindingMutex = new Mutex(); @@ -151,9 +151,9 @@ public override void Close() logger.Debug("Close Connection."); if (IsNonClosedWithSession()) { - var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; + var transactionRollbackStatus = GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; - if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession)) + if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(ConnectionString, Password, SfSession)) { logger.Debug($"Session pooled: {SfSession.sessionId}"); } @@ -172,7 +172,7 @@ public override void Close() // Adding an override for CloseAsync will prevent the need for casting to SnowflakeDbConnection to call CloseAsync(CancellationToken). public override async Task CloseAsync() { - await CloseAsync(CancellationToken.None); + await CloseAsync(CancellationToken.None).ConfigureAwait(false); } #endif @@ -189,9 +189,9 @@ public Task CloseAsync(CancellationToken cancellationToken) { if (IsNonClosedWithSession()) { - var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; + var transactionRollbackStatus = GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined; - if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession)) + if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(ConnectionString, Password, SfSession)) { logger.Debug($"Session pooled: {SfSession.sessionId}"); _connectionState = ConnectionState.Closed; @@ -234,51 +234,34 @@ public Task CloseAsync(CancellationToken cancellationToken) private bool CanReuseSession(TransactionRollbackStatus transactionRollbackStatus) { - return SnowflakeDbConnectionPool.GetPooling() && + return GetPooling() && transactionRollbackStatus == TransactionRollbackStatus.Success; } + private bool GetPooling() + { + return SnowflakeDbConnectionPool.GetPool(ConnectionString, Password).GetPooling(); + } + public override void Open() { logger.Debug("Open Connection."); if (_connectionState != ConnectionState.Closed) { - logger.Debug($"Open with a connection already opened: {_connectionState}"); + logger.Warn($"Opening a connection already opened: {_connectionState}"); return; } - SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString); - if (SfSession != null) + + try { - logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); + OnSessionOpen(); + SfSession = SnowflakeDbConnectionPool.GetSession(ConnectionString, Password); + OnSessionEstablished(); } - else + catch (Exception e) { - SetSession(); - try - { - SfSession.Open(); - } - catch (Exception e) - { - // Otherwise when Dispose() is called, the close request would timeout. - _connectionState = ConnectionState.Closed; - logger.Error("Unable to connect", e); - if (!(e.GetType() == typeof(SnowflakeDbException))) - { - throw - new SnowflakeDbException( - e, - SnowflakeDbException.CONNECTION_FAILURE_SSTATE, - SFError.INTERNAL_ERROR, - "Unable to connect. " + e.Message); - } - else - { - throw; - } - } + RethrowOnSessionOpenFailure(e); } - OnSessionEstablished(); } public override Task OpenAsync(CancellationToken cancellationToken) @@ -286,34 +269,18 @@ public override Task OpenAsync(CancellationToken cancellationToken) logger.Debug("Open Connection Async."); if (_connectionState != ConnectionState.Closed) { - logger.Debug($"Open with a connection already opened: {_connectionState}"); - return Task.CompletedTask; - } - SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString); - if (SfSession != null) - { - logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); - OnSessionEstablished(); + logger.Warn($"Opening a connection already opened: {_connectionState}"); return Task.CompletedTask; } - registerConnectionCancellationCallback(cancellationToken); - SetSession(); - - return SfSession.OpenAsync(cancellationToken).ContinueWith( - previousTask => + OnSessionOpen(); + return SnowflakeDbConnectionPool.GetSessionAsync(ConnectionString, Password, cancellationToken) + .ContinueWith(previousTask => { if (previousTask.IsFaulted) { // Exception from SfSession.OpenAsync - Exception sfSessionEx = previousTask.Exception; - _connectionState = ConnectionState.Closed; - logger.Error("Unable to connect", sfSessionEx); - throw new SnowflakeDbException( - sfSessionEx, - SnowflakeDbException.CONNECTION_FAILURE_SSTATE, - SFError.INTERNAL_ERROR, - "Unable to connect"); + RethrowOnSessionOpenFailure(previousTask.Exception); } else if (previousTask.IsCanceled) { @@ -322,7 +289,7 @@ public override Task OpenAsync(CancellationToken cancellationToken) } else { - logger.Debug("All good"); + logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); // Only continue if the session was opened successfully OnSessionEstablished(); } @@ -344,23 +311,42 @@ public void SetArrayBindStageCreated() { _isArrayBindStageCreated = true; } - - /// - /// Create a new SFsession with the connection string settings. - /// - /// If the connection string can't be processed - private void SetSession() + + private void OnSessionOpen() { - SfSession = new SFSession(ConnectionString, Password); - _connectionTimeout = (int)SfSession.connectionTimeout.TotalSeconds; + logger.Debug("Opening session"); _connectionState = ConnectionState.Connecting; } private void OnSessionEstablished() { + if (SfSession == null) + { + logger.Error("Error during opening session"); + throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unable to establish a session"); + } + logger.Debug("Session established"); _connectionState = ConnectionState.Open; + _connectionTimeout = (int)SfSession.connectionTimeout.TotalSeconds; + logger.Debug($"Connection open with pooled session: {SfSession.sessionId}"); } + private void RethrowOnSessionOpenFailure(Exception exception) + { + // Otherwise when Dispose() is called, the close request would timeout. + _connectionState = ConnectionState.Closed; + logger.Error("Unable to connect: ", exception); + if (exception != null && exception is SnowflakeDbException dbException) + throw dbException; + + var errorMessage = "Unable to connect. " + (exception != null ? exception.Message : ""); + throw new SnowflakeDbException( + exception, + SnowflakeDbException.CONNECTION_FAILURE_SSTATE, + SFError.INTERNAL_ERROR, + errorMessage); + } + protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) { // Parameterless BeginTransaction() method of the super class calls this method with IsolationLevel.Unspecified, @@ -382,12 +368,12 @@ protected override DbCommand CreateDbCommand() protected override void Dispose(bool disposing) { - if (disposed) + if (_disposed) return; try { - this.Close(); + Close(); } catch (Exception ex) { @@ -395,7 +381,7 @@ protected override void Dispose(bool disposing) logger.Error("Unable to close connection", ex); } - disposed = true; + _disposed = true; base.Dispose(disposing); } @@ -406,7 +392,7 @@ protected override void Dispose(bool disposing) /// layer or timeout reached. Whichever comes first would trigger query cancellation. /// /// cancellation token from upper layer - internal void registerConnectionCancellationCallback(CancellationToken externalCancellationToken) + internal void RegisterConnectionCancellationCallback(CancellationToken externalCancellationToken) { if (!externalCancellationToken.IsCancellationRequested) { diff --git a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs index 8e0093262..11241c7c2 100644 --- a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs @@ -1,4 +1,8 @@ -using Snowflake.Data.Core; +using System; +using System.Security; +using System.Threading; +using System.Threading.Tasks; +using Snowflake.Data.Core; using Snowflake.Data.Core.Session; using Snowflake.Data.Log; @@ -7,58 +11,104 @@ namespace Snowflake.Data.Client public class SnowflakeDbConnectionPool { private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); - - internal static SFSession GetSession(string connStr) + private static readonly Object s_instanceLock = new Object(); + private static ConnectionPoolManagerBase s_connectionPoolManager; + private static readonly PoolManagerVersion s_poolVersion = PoolManagerVersion.Version1; + + public static ConnectionPoolManagerBase Instance { - s_logger.Debug("SnowflakeDbConnectionPool::GetSession"); - return SessionPoolSingleton.Instance.GetSession(connStr); + get + { + if (s_connectionPoolManager != null) + return s_connectionPoolManager; + lock (s_instanceLock) + { + s_connectionPoolManager = ProvideConnectionPoolManager(); + } + return s_connectionPoolManager; + } } - - internal static bool AddSession(SFSession session) + + public static SessionPool GetPool(string connectionString, SecureString password) { - s_logger.Debug("SnowflakeDbConnectionPool::AddSession"); - return SessionPoolSingleton.Instance.AddSession(session); + s_logger.Debug("SnowflakeDbConnectionPool::GetSession"); + return Instance.GetPool(connectionString, password); } public static void ClearAllPools() { s_logger.Debug("SnowflakeDbConnectionPool::ClearAllPools"); - SessionPoolSingleton.Instance.ClearAllPools(); + Instance.ClearAllPools(); } public static void SetMaxPoolSize(int size) { - SessionPoolSingleton.Instance.SetMaxPoolSize(size); + s_logger.Debug("SnowflakeDbConnectionPool::SetMaxPoolSize"); + Instance.SetMaxPoolSize(size); } public static int GetMaxPoolSize() { - return SessionPoolSingleton.Instance.GetMaxPoolSize(); + s_logger.Debug("SnowflakeDbConnectionPool::GetMaxPoolSize"); + return Instance.GetMaxPoolSize(); } public static void SetTimeout(long time) { - SessionPoolSingleton.Instance.SetTimeout(time); + s_logger.Debug("SnowflakeDbConnectionPool::SetTimeout"); + Instance.SetTimeout(time); } public static long GetTimeout() { - return SessionPoolSingleton.Instance.GetTimeout(); + s_logger.Debug("SnowflakeDbConnectionPool::GetTimeout"); + return Instance.GetTimeout(); } public static int GetCurrentPoolSize() { - return SessionPoolSingleton.Instance.GetCurrentPoolSize(); + s_logger.Debug("SnowflakeDbConnectionPool::GetCurrentPoolSize"); + return Instance.GetCurrentPoolSize(); } public static bool SetPooling(bool isEnable) { - return SessionPoolSingleton.Instance.SetPooling(isEnable); + s_logger.Debug("SnowflakeDbConnectionPool::SetPooling"); + return Instance.SetPooling(isEnable); } public static bool GetPooling() { - return SessionPoolSingleton.Instance.GetPooling(); + s_logger.Debug("SnowflakeDbConnectionPool::GetPooling"); + return Instance.GetPooling(); + } + + private static ConnectionPoolManagerBase ProvideConnectionPoolManager() + { + if (s_poolVersion == PoolManagerVersion.Version1) + return new ConnectionPoolManagerV1(); + + throw new NotSupportedException("Pool version not supported"); + } + + internal static SFSession GetSession(string connectionString, SecureString password) + { + s_logger.Debug("SnowflakeDbConnectionPool::GetSession"); + return Instance.GetSession(connectionString, password); + } + + internal static Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + s_logger.Debug("SnowflakeDbConnectionPool::GetSessionAsync"); + return Instance.GetSessionAsync(connectionString, password, cancellationToken); } + + internal static bool AddSession(string connectionString, SecureString password, SFSession session) + { + s_logger.Debug("SnowflakeDbConnectionPool::AddSession"); + return Instance.AddSession(connectionString, password, session); + } + + } } diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs b/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs new file mode 100644 index 000000000..741153c51 --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security; +using System.Threading; +using System.Threading.Tasks; + +namespace Snowflake.Data.Core.Session +{ + public enum PoolManagerVersion + { + Version1, + Version2 + } + + public abstract class ConnectionPoolManagerBase + { + protected static readonly Object PoolsLock = new Object(); + protected readonly Dictionary Pools; + protected ConnectionPoolManagerBase() + { + lock (PoolsLock) + { + Pools = new Dictionary(); + } + } + + protected abstract PoolManagerVersion GetVersion(); + + protected abstract void ApplyPoolDefaults(SessionPool pool); + + protected virtual string GetPoolKey(string connectionString) => connectionString; + + public void ClearAllPools() + { + Pools.Values.ToList().ForEach(pool => pool.ClearAllPools()); + } + + public virtual SessionPool GetPool(string connectionString, SecureString password) + { + string poolKey = GetPoolKey(connectionString); + if (Pools.TryGetValue(poolKey, out var pool)) + return pool; + + pool = CreateSessionPool(connectionString, password); + ApplyPoolDefaults(pool); + Pools.Add(poolKey, pool); + return pool; + } + + public virtual void SetMaxPoolSize(int size) => throw FeatureNotAvailableForPoolVersion(); + public virtual int GetMaxPoolSize() => throw FeatureNotAvailableForPoolVersion(); + public virtual void SetTimeout(long time) => throw FeatureNotAvailableForPoolVersion(); + public virtual long GetTimeout() => throw FeatureNotAvailableForPoolVersion(); + public virtual int GetCurrentPoolSize() => throw FeatureNotAvailableForPoolVersion(); + public virtual bool SetPooling(bool isEnable) => throw FeatureNotAvailableForPoolVersion(); + public virtual bool GetPooling() => throw FeatureNotAvailableForPoolVersion(); + + internal SFSession GetSession(string connectionString, SecureString password) + { + // TODO: + // pool ver.1 is the same for any connection string so there's still need to pass params to GetSession + // pool ver.2 is different for each connection strings so in theory no need to pass params to GetSession + var sessionPool = GetPool(connectionString, password); + return sessionPool.GetSession(connectionString, password); + } + + internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + return GetPool(connectionString, password).GetSessionAsync(connectionString, password, cancellationToken); + } + + internal bool AddSession(string connectionString, SecureString password, SFSession session) + { + return GetPool(connectionString, password).AddSession(session); + } + + private SessionPool CreateSessionPool(string connectionString, SecureString password) => new SessionPool(connectionString, password); + + private NotSupportedException FeatureNotAvailableForPoolVersion() + { + return new NotSupportedException("API not available for selected connection pool version selected: " + + GetVersion()); + } + } +} \ No newline at end of file diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManagerV1.cs b/Snowflake.Data/Core/Session/ConnectionPoolManagerV1.cs new file mode 100644 index 000000000..51df93914 --- /dev/null +++ b/Snowflake.Data/Core/Session/ConnectionPoolManagerV1.cs @@ -0,0 +1,35 @@ +using System.Linq; +using System.Security; + +namespace Snowflake.Data.Core.Session +{ + sealed class ConnectionPoolManagerV1 : ConnectionPoolManagerBase + { + private const bool AllowExceedMaxPoolSizeDefault = true; + private const int MaxPoolSizeDefault = 10; + private const string SinglePoolKeyForAllDataSources = "CONNECTION_CACHE"; + + protected override void ApplyPoolDefaults(SessionPool pool) + { + pool.SetAllowExceedMaxPoolSize(AllowExceedMaxPoolSizeDefault); + pool.SetMaxPoolSize(MaxPoolSizeDefault); + } + protected override PoolManagerVersion GetVersion() => PoolManagerVersion.Version1; + // Same pool for any connection string (backward compatible solution) + protected override string GetPoolKey(string connectionString) => SinglePoolKeyForAllDataSources; + public override void SetMaxPoolSize(int size) => GetPool().SetMaxPoolSize(size); + public override int GetMaxPoolSize() => GetPool().GetMaxPoolSize(); + public override void SetTimeout(long time) => Pools.Values.First().SetTimeout(time); + public override long GetTimeout() => Pools.Values.First().GetTimeout(); + public override int GetCurrentPoolSize() => Pools.Values.First().GetCurrentPoolSize(); + public override bool GetPooling() => Pools.Values.First().GetPooling(); + public override bool SetPooling(bool isEnable) + { + if (GetPooling() == isEnable) + return false; + Pools.Values.First().SetPooling(isEnable); + return isEnable; + } + private SessionPool GetPool() => GetPool(null, null); + } +} \ No newline at end of file diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index 092135e47..6fe2fa757 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -1,33 +1,51 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Security; +using System.Threading; +using System.Threading.Tasks; +using Snowflake.Data.Client; using Snowflake.Data.Log; namespace Snowflake.Data.Core.Session { - sealed class SessionPoolSingleton : IDisposable + public class SessionPool : IDisposable { - private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); - private static SessionPoolSingleton s_instance = null; + private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); private static readonly object s_sessionPoolLock = new object(); + // private const int MinPoolSize = 0; // TODO: + private const int MaxPoolSize = 10; + private const long Timeout = 3600; + private const long OpenSessionTimeout = 60 * 2; + private readonly List _sessionPool; + private int _busySessions; + // private int _minPoolSize; // TODO: private int _maxPoolSize; private long _timeout; - private const int MaxPoolSize = 10; - private const long Timeout = 3600; - private bool _pooling = true; + private long _openSessionTimeout; + private int _openSessionSleepTimeout = 250; + private bool _pooling = true; // TODO: Get rid of that! + private string _connectionString; + private SecureString _password; + private bool _allowExceedMaxPoolSize; // backward compatibility flag - SessionPoolSingleton() + internal SessionPool(string connectionString, SecureString password) { lock (s_sessionPoolLock) { _sessionPool = new List(); _maxPoolSize = MaxPoolSize; _timeout = Timeout; + // _minPoolSize = MinPoolSize; // TODO: + _openSessionTimeout = OpenSessionTimeout; + _connectionString = connectionString; + _password = password; } } - ~SessionPoolSingleton() + + ~SessionPool() { ClearAllPools(); } @@ -36,69 +54,266 @@ public void Dispose() { ClearAllPools(); } - - public static SessionPoolSingleton Instance + + private void CleanExpiredSessions() { - get + s_logger.Debug("SessionPool::CleanExpiredSessions"); + lock (s_sessionPoolLock) { - lock (s_sessionPoolLock) + long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + + foreach (var item in _sessionPool.ToList()) { - if(s_instance == null) + if (item.IsExpired(_timeout, timeNow)) { - s_instance = new SessionPoolSingleton(); + _sessionPool.Remove(item); + item.close(); } - return s_instance; } } } - private void CleanExpiredSessions() + internal SFSession GetSession(string connectionString, SecureString password) { - s_logger.Debug("SessionPool::cleanExpiredSessions"); - lock (s_sessionPoolLock) + if (!_pooling) + return OpenNewSession(connectionString, password); + CleanExpiredSessions(); + // AsyncEnsureMinPoolSize(); + return ProvidePooledSession(connectionString, password); + } + + internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + if (!_pooling) + return OpenNewSessionAsync(connectionString, password, cancellationToken); + CleanExpiredSessions(); + // AsyncEnsureMinPoolSizeAsync(); + return ProvidePooledSessionAsync(connectionString, password, cancellationToken); + } + + /* + /// + /// Creates and opens a new SFSession with the connection string settings of the pool. + /// + /// If the session cannot be established + internal SFSession GetSession(string connectionString, SecureString password) + { + s_logger.Debug("SessionPool::GetSession"); + SFSession session = null; + if (!_pooling) { - long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + session = OpenNewSession(connectionString, password); + s_logger.Info($"create non-poolable session with sid {session.sessionId}"); + return session; + } + + CleanExpiredSessions(); + // EnsureMinPoolSizeAsync(); // TODO: + if (session == null && !_allowExceedMaxPoolSize) + session = WaitForPooledSession(CancellationToken.None); + return session; + } - foreach (var item in _sessionPool.ToList()) + /// + /// Creates a request to open a new SFSession with the connection string settings of the pool. + /// + /// If the connection string can't be processed + internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + s_logger.Debug("SessionPool::GetSessionAsync"); + var session = GetSessionInternal(connectionString, password); + if (session == null && !_allowExceedMaxPoolSize) + session = WaitForPooledSessionAsync(cancellationToken); + return session; + } + + internal SFSession GetSessionInternal(string connectionString, SecureString password) + { + if (!_pooling) + { + var session = OpenNewSession(connectionString, password); + s_logger.Info($"create non-poolable session with sid {session.sessionId}"); + return session; + } + + CleanExpiredSessions(); + // EnsureMinPoolSizeAsync(); // TODO: + return ProvidePooledSession(connectionString, password); + } +*/ + private SFSession ProvidePooledSession(string connectionString, SecureString password) + { + if (!_pooling) + return null; + + lock (s_sessionPoolLock) + { + if (GetCurrentPoolSize() < _maxPoolSize) { - if (item.IsExpired(_timeout, timeNow)) + if (GetIdleSessionsSize() == 0) { - _sessionPool.Remove(item); - item.close(); + var newSession = OpenNewSession(connectionString, password); + _busySessions++; + s_logger.Info($"Created new pooled session with sid {newSession.sessionId}"); + return newSession; } + + var sessionFromPool = _sessionPool[0]; // oldest idle + _sessionPool.Remove(sessionFromPool); + _busySessions++; + s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}"); + return sessionFromPool; } + + s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections"); + } + + if (_allowExceedMaxPoolSize) // backward compatibility + { + s_logger.Warn($"Exceeding Max Pool Size enabled, providing new session"); + return OpenNewSession(connectionString, password); } + + return WaitForPooledSession(CancellationToken.None); } - internal SFSession GetSession(string connStr) + + private Task ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) { - s_logger.Debug("SessionPool::GetSession"); if (!_pooling) return null; + lock (s_sessionPoolLock) { - for (int i = 0; i < _sessionPool.Count; i++) + if (GetCurrentPoolSize() < _maxPoolSize) { - if (_sessionPool[i].connStr.Equals(connStr)) + if (GetIdleSessionsSize() == 0) { - SFSession session = _sessionPool[i]; - _sessionPool.RemoveAt(i); - long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); - if (session.IsExpired(_timeout, timeNow)) - { - session.close(); - i--; - } - else - { - s_logger.Debug($"reuse pooled session with sid {session.sessionId}"); - return session; - } + var session = OpenNewSessionAsync(connectionString, password, cancellationToken); + _busySessions++; + s_logger.Info($"Creating new pooled session"); + return session; } + + var sessionFromPool = _sessionPool[0]; // oldest idle + _sessionPool.Remove(sessionFromPool); + _busySessions++; + s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}"); + return Task.FromResult(sessionFromPool); } + + s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections"); } - return null; + + if (_allowExceedMaxPoolSize) // backward compatibility + { + s_logger.Warn($"Exceeding Max Pool Size enabled, providing new session"); + return OpenNewSessionAsync(connectionString, password, cancellationToken); + } + + return Task.Run(()=>WaitForPooledSession(cancellationToken)); } + + private SFSession WaitForPooledSession(CancellationToken cancellationToken) + { + if (!_pooling) + return null; + + SFSession session = null; + long start = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + + while (session == null) + { + if (cancellationToken.IsCancellationRequested) + cancellationToken.ThrowIfCancellationRequested(); + + if (GetIdleSessionsSize() > 0) + { + lock (s_sessionPoolLock) + { + session = _sessionPool[0]; + _sessionPool.Remove(session); + _busySessions++; + return session; + } + } + + Thread.Sleep(_openSessionSleepTimeout); + + long now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (start + _openSessionTimeout < now) + throw new SnowflakeDbException( + new TimeoutException("No free connections in the pool."), // TODO: + SnowflakeDbException.CONNECTION_FAILURE_SSTATE, + SFError.INTERNAL_ERROR, + "Unable to connect."); + } + + return session; + } + + // TODO: + // private void EnsureMinPoolSize() + // { + // if (!_pooling || _minPoolSize == 0) + // return; + // + // lock (s_sessionPoolLock) + // { + // s_logger.Debug($"Filling up connection pool to {_minPoolSize}"); + // + // while (GetCurrentPoolSize() < _minPoolSize) + // { + // var newSession = OpenNewSession(); + // _sessionPool.Add(newSession); + // } + // } + // } + + internal SFSession OpenNewSession(string connectionString, SecureString password) + { + SFSession session = new SFSession(connectionString, password); + try + { + session.Open(); + s_logger.Debug($"session opened {session.sessionId}"); + } + catch (Exception e) + { + // Otherwise when Dispose() is called, the close request would timeout. + // _connectionState = ConnectionState.Closed; // TODO: + s_logger.Error("Unable to connect", e); + if (!(e is SnowflakeDbException)) + { + throw + new SnowflakeDbException( + e, + SnowflakeDbException.CONNECTION_FAILURE_SSTATE, + SFError.INTERNAL_ERROR, + "Unable to connect. " + e.Message); + } + else + { + throw; + } + } + return session; + } + + internal Task OpenNewSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken) + { + SFSession session = new SFSession(connectionString, password); + try + { + return session.OpenAsync(cancellationToken); + } + catch (Exception e) + { + s_logger.Error("Unable to connect", e); + return Task.FromException(e); + } + } + internal bool AddSession(SFSession session) { s_logger.Debug("SessionPool::AddSession"); @@ -106,7 +321,7 @@ internal bool AddSession(SFSession session) return false; long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); if (session.IsNotOpen() || session.IsExpired(_timeout, timeNow)) - return false; + return false; // TODO: fix because it is counted in the pool lock (s_sessionPoolLock) { @@ -161,7 +376,7 @@ public long GetTimeout() public int GetCurrentPoolSize() { - return _sessionPool.Count; + return _sessionPool.Count + _busySessions; } public bool SetPooling(bool isEnable) @@ -180,6 +395,21 @@ public bool GetPooling() { return _pooling; } + + private int GetIdleSessionsSize() + { + return _sessionPool.Count; + } + + public bool GetAllowExceedMaxPoolSize() + { + return _allowExceedMaxPoolSize; + } + + public void SetAllowExceedMaxPoolSize(bool allowExceedMaxPoolSize) + { + _allowExceedMaxPoolSize = allowExceedMaxPoolSize; + } } } \ No newline at end of file