From a12dadc9eecd3e77fc68e8e903898a57b6eb6841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Hofman?= Date: Wed, 11 Oct 2023 17:24:09 +0200 Subject: [PATCH] SNOW-937190 max pool size reached with wait support and timeout exception --- Snowflake.Data/Core/Session/SessionPool.cs | 50 +++++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index 8fad6f53f..d9a79bd79 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -28,7 +28,9 @@ sealed class SessionPool : IDisposable internal SecureString Password { get; } private bool _pooling = true; private int _busySessions; - private bool _allowExceedMaxPoolSize = true; + private bool _allowExceedMaxPoolSize = true; // differentiates behavior when max size reached + private int _waitForIdleSessionSleepTimeout = 5; + private int _waitForIdleSessionTimeout = 120; private SessionPool() { @@ -45,7 +47,7 @@ private SessionPool(string connectionString, SecureString password) : this() { ConnectionString = connectionString; Password = password; - _allowExceedMaxPoolSize = false; // TODO: SNOW-937190 + _allowExceedMaxPoolSize = false; } internal static SessionPool CreateSessionCache() => new SessionPool(); @@ -142,6 +144,11 @@ private SFSession GetIdleSession(string connStr) private SFSession NewSession(String connectionString, SecureString password) { s_logger.Debug("SessionPool::NewSession"); + if (!_allowExceedMaxPoolSize && GetCurrentPoolSize() >= MaxPoolSize) + { + s_logger.Warn("Max pool size reached"); + return WaitForPooledSession(CancellationToken.None); + } try { var session = s_sessionFactory.NewSession(connectionString, password); @@ -165,6 +172,11 @@ private SFSession NewSession(String connectionString, SecureString password) private Task NewSessionAsync(String connectionString, SecureString password, CancellationToken cancellationToken) { s_logger.Debug("SessionPool::NewSessionAsync"); + if (!_allowExceedMaxPoolSize && GetCurrentPoolSize() >= MaxPoolSize) + { + s_logger.Warn("Max pool size reached"); + return Task.Run(() => WaitForPooledSession(cancellationToken), cancellationToken); + } var session = s_sessionFactory.NewSession(connectionString, password); _busySessions++; return session @@ -183,12 +195,46 @@ private Task NewSessionAsync(String connectionString, SecureString pa return session; }, TaskContinuationOptions.NotOnCanceled); } + + private SFSession WaitForPooledSession(CancellationToken cancellationToken) + { + if (!_pooling) + return null; + + long start = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + while (true) + { + if (cancellationToken.IsCancellationRequested) + cancellationToken.ThrowIfCancellationRequested(); + + lock (s_sessionPoolLock) + { + var session = GetIdleSession(_connectionString); + if (session != null) + { + _idleSessions.Remove(session); + _busySessions++; + return session; + } + } + + Thread.Sleep(_waitForIdleSessionSleepTimeout); + long now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (start + _waitForIdleSessionTimeout < now) + throw new SnowflakeDbException( + new TimeoutException("No free connections in the pool."), // TODO: + SnowflakeDbException.CONNECTION_FAILURE_SSTATE, + SFError.REQUEST_TIMEOUT, + "Unable to connect."); + } + } internal bool AddSession(SFSession session) { s_logger.Debug("SessionPool::AddSession"); if (!_pooling) return false; + long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); if (session.IsNotOpen() || session.IsExpired(_timeout, timeNow)) {