From 59efbbf4ecfdfb5fa6d49a2830cbc60832d9d638 Mon Sep 17 00:00:00 2001 From: Krzysztof Nozderko Date: Mon, 20 May 2024 14:57:37 +0000 Subject: [PATCH 1/2] SNOW-968914 Break waiting on pool destroy --- .../UnitTests/Session/FixedZeroCounterTest.cs | 23 ++++++++-- .../Session/NonNegativeCounterTest.cs | 15 ++++++ .../UnitTests/Session/NonWaitingQueueTest.cs | 22 ++++++--- .../SessionCreationTokenCounterTest.cs | 37 ++++++++++----- .../UnitTests/Session/WaitingQueueTest.cs | 39 ++++++++++++---- .../Core/Session/ConnectionPoolManager.cs | 2 +- .../Session/ISessionCreationTokenCounter.cs | 2 + Snowflake.Data/Core/Session/IWaitingQueue.cs | 2 + .../NonCountingSessionCreationTokenCounter.cs | 8 +++- .../Core/Session/NonWaitingQueue.cs | 4 ++ Snowflake.Data/Core/Session/SFSession.cs | 4 +- .../Session/SessionCreationTokenCounter.cs | 13 ++++++ Snowflake.Data/Core/Session/SessionPool.cs | 42 ++++++++++++++--- Snowflake.Data/Core/Session/WaitingQueue.cs | 46 ++++++++++++++++++- 14 files changed, 216 insertions(+), 43 deletions(-) diff --git a/Snowflake.Data.Tests/UnitTests/Session/FixedZeroCounterTest.cs b/Snowflake.Data.Tests/UnitTests/Session/FixedZeroCounterTest.cs index 79653ef63..fd04be9af 100644 --- a/Snowflake.Data.Tests/UnitTests/Session/FixedZeroCounterTest.cs +++ b/Snowflake.Data.Tests/UnitTests/Session/FixedZeroCounterTest.cs @@ -11,10 +11,10 @@ public void TestInitialZero() { // arrange var counter = new FixedZeroCounter(); - + // act var count = counter.Count(); - + // assert Assert.AreEqual(0, count); } @@ -24,25 +24,38 @@ public void TestZeroAfterIncrease() { // arrange var counter = new FixedZeroCounter(); - + // act counter.Increase(); // assert Assert.AreEqual(0, counter.Count()); } - + [Test] public void TestZeroAfterDecrease() { // arrange var counter = new FixedZeroCounter(); - + // act counter.Decrease(); // assert Assert.AreEqual(0, counter.Count()); } + + [Test] + public void TestZeroAfterReset() + { + // arrange + var counter = new FixedZeroCounter(); + + // act + counter.Reset(); + + // assert + Assert.AreEqual(0, counter.Count()); + } } } diff --git a/Snowflake.Data.Tests/UnitTests/Session/NonNegativeCounterTest.cs b/Snowflake.Data.Tests/UnitTests/Session/NonNegativeCounterTest.cs index 7b63126f4..638299532 100644 --- a/Snowflake.Data.Tests/UnitTests/Session/NonNegativeCounterTest.cs +++ b/Snowflake.Data.Tests/UnitTests/Session/NonNegativeCounterTest.cs @@ -72,5 +72,20 @@ public void TestDecreaseDoesNotGoBelowZero() // assert Assert.AreEqual(0, counter.Count()); } + + [Test] + public void TestReset() + { + // arrange + var counter = new NonNegativeCounter(); + counter.Increase(); + counter.Increase(); + + // act + counter.Reset(); + + // assert + Assert.AreEqual(0, counter.Count()); + } } } diff --git a/Snowflake.Data.Tests/UnitTests/Session/NonWaitingQueueTest.cs b/Snowflake.Data.Tests/UnitTests/Session/NonWaitingQueueTest.cs index 1273ca6db..4d519b3a9 100644 --- a/Snowflake.Data.Tests/UnitTests/Session/NonWaitingQueueTest.cs +++ b/Snowflake.Data.Tests/UnitTests/Session/NonWaitingQueueTest.cs @@ -14,12 +14,12 @@ public void TestWaitDoesNotHangAndReturnsFalse() // arrange var nonWaitingQueue = new NonWaitingQueue(); var watch = new Stopwatch(); - + // act watch.Start(); var result = nonWaitingQueue.Wait(10000, CancellationToken.None); watch.Stop(); - + // assert Assert.IsFalse(result); Assert.LessOrEqual(watch.ElapsedMilliseconds, 50); @@ -31,10 +31,10 @@ public void TestNoOneIsWaiting() // arrange var nonWaitingQueue = new NonWaitingQueue(); nonWaitingQueue.Wait(10000, CancellationToken.None); - + // act var isAnyoneWaiting = nonWaitingQueue.IsAnyoneWaiting(); - + // assert Assert.IsFalse(isAnyoneWaiting); } @@ -47,9 +47,19 @@ public void TestWaitingDisabled() // act var isWaitingEnabled = nonWaitingQueue.IsWaitingEnabled(); - + // assert Assert.IsFalse(isWaitingEnabled); } + + [Test] + public void TestReset() + { + // arrange + var nonWaitingQueue = new NonWaitingQueue(); + + // act/assert + Assert.DoesNotThrow(() => nonWaitingQueue.Reset()); + } } -} \ No newline at end of file +} diff --git a/Snowflake.Data.Tests/UnitTests/Session/SessionCreationTokenCounterTest.cs b/Snowflake.Data.Tests/UnitTests/Session/SessionCreationTokenCounterTest.cs index 4cbb7dcb4..ee825b44a 100644 --- a/Snowflake.Data.Tests/UnitTests/Session/SessionCreationTokenCounterTest.cs +++ b/Snowflake.Data.Tests/UnitTests/Session/SessionCreationTokenCounterTest.cs @@ -11,22 +11,22 @@ public class SessionCreationTokenCounterTest { private static readonly TimeSpan s_longTime = TimeSpan.FromSeconds(30); private static readonly TimeSpan s_shortTime = TimeSpan.FromMilliseconds(50); - + [Test] public void TestGrantSessionCreation() { // arrange var tokens = new SessionCreationTokenCounter(s_longTime); - + // act tokens.NewToken(); - + // assert Assert.AreEqual(1, tokens.Count()); - + // act tokens.NewToken(); - + // assert Assert.AreEqual(2, tokens.Count()); } @@ -38,16 +38,16 @@ public void TestCompleteSessionCreation() var tokens = new SessionCreationTokenCounter(s_longTime); var token1 = tokens.NewToken(); var token2 = tokens.NewToken(); - + // act tokens.RemoveToken(token1); - + // assert Assert.AreEqual(1, tokens.Count()); - + // act tokens.RemoveToken(token2); - + // assert Assert.AreEqual(0, tokens.Count()); } @@ -59,7 +59,7 @@ public void TestCompleteUnknownTokenDoesNotThrowExceptions() var tokens = new SessionCreationTokenCounter(s_longTime); tokens.NewToken(); var unknownToken = new SessionCreationToken(SFSessionHttpClientProperties.DefaultConnectionTimeout); - + // act tokens.RemoveToken(unknownToken); @@ -80,7 +80,22 @@ public void TestCompleteCleansExpiredTokens() // act tokens.RemoveToken(token); - + + // assert + Assert.AreEqual(0, tokens.Count()); + } + + [Test] + public void TestResetTokens() + { + // arrange + var tokens = new SessionCreationTokenCounter(s_longTime); + tokens.NewToken(); + tokens.NewToken(); + + // act + tokens.Reset(); + // assert Assert.AreEqual(0, tokens.Count()); } diff --git a/Snowflake.Data.Tests/UnitTests/Session/WaitingQueueTest.cs b/Snowflake.Data.Tests/UnitTests/Session/WaitingQueueTest.cs index 922bfea30..530eae133 100644 --- a/Snowflake.Data.Tests/UnitTests/Session/WaitingQueueTest.cs +++ b/Snowflake.Data.Tests/UnitTests/Session/WaitingQueueTest.cs @@ -15,7 +15,7 @@ public void TestWaitForTheResourceUntilTimeout() // arrange var queue = new WaitingQueue(); var watch = new Stopwatch(); - + // act watch.Start(); var result = queue.Wait(50, CancellationToken.None); @@ -38,7 +38,7 @@ public void TestWaitForTheResourceUntilCancellation() watch.Start(); var result = queue.Wait(30000, cancellationSource.Token); watch.Stop(); - + // assert Assert.IsFalse(result); Assert.That(watch.ElapsedMilliseconds, Is.InRange(45, 1500)); // sometimes Wait takes a bit smaller amount of time than it should. Thus we expect it to be greater than 45, not just 50. @@ -56,7 +56,7 @@ public void TestWaitUntilResourceAvailable() Thread.Sleep(50); queue.OnResourceIncrease(); }); - + // act watch.Start(); var result = queue.Wait(30000, CancellationToken.None); @@ -72,10 +72,10 @@ public void TestWaitingEnabled() { // arrange var queue = new WaitingQueue(); - + // act var isWaitingEnabled = queue.IsWaitingEnabled(); - + // assert Assert.IsTrue(isWaitingEnabled); } @@ -85,10 +85,10 @@ public void TestNoOneIsWaiting() { // arrange var queue = new WaitingQueue(); - + // act var isAnyoneWaiting = queue.IsAnyoneWaiting(); - + // assert Assert.IsFalse(isAnyoneWaiting); } @@ -109,9 +109,32 @@ public void TestSomeoneIsWaiting() // act var isAnyoneWaiting = queue.IsAnyoneWaiting(); - + // assert Assert.IsTrue(isAnyoneWaiting); } + + [Test] + [Retry(2)] + public void TestReturnUnsuccessfulOnResetWhileWaiting() + { + // arrange + var queue = new WaitingQueue(); + var watch = new Stopwatch(); + Task.Run(() => + { + Thread.Sleep(50); + queue.Reset(); + }); + + // act + watch.Start(); + var result = queue.Wait(30000, CancellationToken.None); + watch.Stop(); + + // assert + Assert.IsFalse(result); + Assert.That(watch.ElapsedMilliseconds, Is.InRange(50, 1500)); + } } } diff --git a/Snowflake.Data/Core/Session/ConnectionPoolManager.cs b/Snowflake.Data/Core/Session/ConnectionPoolManager.cs index aa0271952..fa4dde5c1 100644 --- a/Snowflake.Data/Core/Session/ConnectionPoolManager.cs +++ b/Snowflake.Data/Core/Session/ConnectionPoolManager.cs @@ -58,7 +58,7 @@ public void ClearAllPools() s_logger.Debug("ConnectionPoolManager::ClearAllPools"); foreach (var sessionPool in _pools.Values) { - sessionPool.ClearSessions(); + sessionPool.DestroyPool(); } _pools.Clear(); } diff --git a/Snowflake.Data/Core/Session/ISessionCreationTokenCounter.cs b/Snowflake.Data/Core/Session/ISessionCreationTokenCounter.cs index f4d6786c6..9b98c01e5 100644 --- a/Snowflake.Data/Core/Session/ISessionCreationTokenCounter.cs +++ b/Snowflake.Data/Core/Session/ISessionCreationTokenCounter.cs @@ -7,5 +7,7 @@ internal interface ISessionCreationTokenCounter void RemoveToken(SessionCreationToken creationToken); int Count(); + + void Reset(); } } diff --git a/Snowflake.Data/Core/Session/IWaitingQueue.cs b/Snowflake.Data/Core/Session/IWaitingQueue.cs index 5cc895026..26bc45d0d 100644 --- a/Snowflake.Data/Core/Session/IWaitingQueue.cs +++ b/Snowflake.Data/Core/Session/IWaitingQueue.cs @@ -13,5 +13,7 @@ internal interface IWaitingQueue int WaitingCount(); bool IsWaitingEnabled(); + + void Reset(); } } diff --git a/Snowflake.Data/Core/Session/NonCountingSessionCreationTokenCounter.cs b/Snowflake.Data/Core/Session/NonCountingSessionCreationTokenCounter.cs index cf6671d17..44292d755 100644 --- a/Snowflake.Data/Core/Session/NonCountingSessionCreationTokenCounter.cs +++ b/Snowflake.Data/Core/Session/NonCountingSessionCreationTokenCounter.cs @@ -4,8 +4,8 @@ namespace Snowflake.Data.Core.Session { internal class NonCountingSessionCreationTokenCounter: ISessionCreationTokenCounter { - private static readonly TimeSpan s_irrelevantCreateSessionTimeout = SFSessionHttpClientProperties.DefaultConnectionTimeout; // in case of old caching pool or pooling disabled we do not remove expired ones nor even store them - + private static readonly TimeSpan s_irrelevantCreateSessionTimeout = SFSessionHttpClientProperties.DefaultConnectionTimeout; // in case of old caching pool or pooling disabled we do not remove expired ones nor even store them + public SessionCreationToken NewToken() => new SessionCreationToken(s_irrelevantCreateSessionTimeout); public void RemoveToken(SessionCreationToken creationToken) @@ -13,5 +13,9 @@ public void RemoveToken(SessionCreationToken creationToken) } public int Count() => 0; + + public void Reset() + { + } } } diff --git a/Snowflake.Data/Core/Session/NonWaitingQueue.cs b/Snowflake.Data/Core/Session/NonWaitingQueue.cs index 1e7f9ee64..46ec84677 100644 --- a/Snowflake.Data/Core/Session/NonWaitingQueue.cs +++ b/Snowflake.Data/Core/Session/NonWaitingQueue.cs @@ -27,5 +27,9 @@ public bool IsWaitingEnabled() { return false; } + + public void Reset() + { + } } } diff --git a/Snowflake.Data/Core/Session/SFSession.cs b/Snowflake.Data/Core/Session/SFSession.cs index 94805b412..af8b1e55b 100755 --- a/Snowflake.Data/Core/Session/SFSession.cs +++ b/Snowflake.Data/Core/Session/SFSession.cs @@ -274,7 +274,7 @@ internal void close() { // Nothing to do if the session is not open if (!IsEstablished()) return; - logger.Debug($"Closing session with id: {sessionId}, user: {_user ?? string.Empty}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}"); + logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}"); stopHeartBeatForThisSession(); // Send a close session request @@ -306,7 +306,7 @@ internal async Task CloseAsync(CancellationToken cancellationToken) { // Nothing to do if the session is not open if (!IsEstablished()) return; - logger.Debug($"Closing session with id: {sessionId}, user: {_user ?? string.Empty}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}"); + logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}"); stopHeartBeatForThisSession(); // Send a close session request diff --git a/Snowflake.Data/Core/Session/SessionCreationTokenCounter.cs b/Snowflake.Data/Core/Session/SessionCreationTokenCounter.cs index 1bb9b0493..32ba7e55b 100644 --- a/Snowflake.Data/Core/Session/SessionCreationTokenCounter.cs +++ b/Snowflake.Data/Core/Session/SessionCreationTokenCounter.cs @@ -58,5 +58,18 @@ public int Count() _tokenLock.ExitReadLock(); } } + + public void Reset() + { + _tokenLock.EnterWriteLock(); + try + { + _tokens.Clear(); + } + finally + { + _tokenLock.ExitWriteLock(); + } + } } } diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index c28db40c0..b37808d08 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -32,6 +32,7 @@ sealed class SessionPool : IDisposable private ISessionPoolEventHandler _sessionPoolEventHandler = new SessionPoolEventHandler(); // a way to inject some additional behaviour after certain events. Can be used for example to measure time of given steps. private readonly ConnectionPoolConfig _poolConfig; private bool _configOverriden = false; + private bool _underDestruction = false; private static readonly InvalidOperationException s_notSupportedInCachePoolException = new InvalidOperationException("Feature not supported in a Connection Cache"); @@ -72,12 +73,12 @@ internal static SessionPool CreateSessionPool(string connectionString, SecureStr { // Use async for the finalizer due to possible deadlock // when waiting for the CloseResponse task while closing the session - ClearAllPoolsAsync(); + DestroyPoolAsync(); } public void Dispose() { - ClearIdleSessions(); + DestroyPool(); } internal static ISessionFactory SessionFactory @@ -269,7 +270,7 @@ private SFSession WaitForSession(string connStr) _sessionPoolEventHandler.OnWaitingForSessionStarted(this); var beforeWaitingTimeMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); long nowTimeMillis = beforeWaitingTimeMillis; - while (!TimeoutHelper.IsExpired(beforeWaitingTimeMillis, nowTimeMillis, _poolConfig.WaitingForIdleSessionTimeout)) // we loop to handle the case if someone overtook us after being woken or session which we were promised has just expired + while (GetPooling() && !_underDestruction && !TimeoutHelper.IsExpired(beforeWaitingTimeMillis, nowTimeMillis, _poolConfig.WaitingForIdleSessionTimeout)) // we loop to handle the case if someone overtook us after being woken or session which we were promised has just expired { var timeoutLeftMillis = TimeoutHelper.FiniteTimeoutLeftMillis(beforeWaitingTimeMillis, nowTimeMillis, _poolConfig.WaitingForIdleSessionTimeout); _sessionPoolEventHandler.OnWaitingForSessionStarted(this, timeoutLeftMillis); @@ -336,7 +337,7 @@ private SFSession NewSession(String connectionString, SecureString password, Ses var session = s_sessionFactory.NewSession(connectionString, password); session.Open(); s_logger.Debug("SessionPool::NewSession - opened" + PoolIdentification()); - if (GetPooling()) + if (GetPooling() && !_underDestruction) { lock (_sessionPoolLock) { @@ -407,7 +408,7 @@ private Task NewSessionAsync(String connectionString, SecureString pa if (!previousTask.IsCanceled) { - if (GetPooling()) + if (GetPooling() && !_underDestruction) { lock (_sessionPoolLock) { @@ -440,7 +441,7 @@ internal bool AddSession(SFSession session, bool ensureMinPoolSize) { s_logger.Debug("SessionPool::AddSession" + PoolIdentification()); - if (!GetPooling()) + if (!GetPooling() || _underDestruction) return false; if (IsMultiplePoolsVersion() && @@ -515,13 +516,40 @@ private Tuple> ReturnSessionToPool(SFSession se } } + internal void DestroyPool() + { + s_logger.Debug("SessionPool::DestroyPool" + PoolIdentification()); + lock (_sessionPoolLock) + { + _underDestruction = true; + ClearIdleSessions(); + _busySessionsCounter.Reset(); + _waitingForIdleSessionQueue.Reset(); + _sessionCreationTokenCounter.Reset(); + } + } + + internal void DestroyPoolAsync() + { + s_logger.Debug("SessionPool::DestroyPoolAsync" + PoolIdentification()); + lock (_sessionPoolLock) + { + _underDestruction = true; + ClearAllPoolsAsync(); + _busySessionsCounter.Reset(); + _waitingForIdleSessionQueue.Reset(); + _sessionCreationTokenCounter.Reset(); + } + } + internal void ClearSessions() { - s_logger.Debug("SessionPool::ClearSessions" + PoolIdentification()); + s_logger.Debug($"SessionPool::ClearSessions" + PoolIdentification()); lock (_sessionPoolLock) { _busySessionsCounter.Reset(); ClearIdleSessions(); + _waitingForIdleSessionQueue.Reset(); } } diff --git a/Snowflake.Data/Core/Session/WaitingQueue.cs b/Snowflake.Data/Core/Session/WaitingQueue.cs index bdd64b9f5..8eeab2282 100644 --- a/Snowflake.Data/Core/Session/WaitingQueue.cs +++ b/Snowflake.Data/Core/Session/WaitingQueue.cs @@ -8,6 +8,7 @@ internal class WaitingQueue: IWaitingQueue { private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); private readonly List _queue = new List(); + private readonly HashSet _notSuccessfulCollection = new HashSet(); public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken) { @@ -23,7 +24,31 @@ public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken) } try { - return semaphore.Wait(millisecondsTimeout, cancellationToken); + var waitingResult = semaphore.Wait(millisecondsTimeout, cancellationToken); + bool shouldFail; + _lock.EnterReadLock(); + try + { + shouldFail = _notSuccessfulCollection.Contains(semaphore); + } + finally + { + _lock.ExitReadLock(); + } + if (shouldFail) + { + _lock.EnterWriteLock(); + try + { + _notSuccessfulCollection.Remove(semaphore); + } + finally + { + _lock.ExitWriteLock(); + } + return false; + } + return waitingResult; } catch (OperationCanceledException) { @@ -83,5 +108,24 @@ public int WaitingCount() } public bool IsWaitingEnabled() => true; + + public void Reset() + { + _lock.EnterWriteLock(); + try + { + while (_queue.Count > 0) + { + var semaphore = _queue[0]; + _queue.RemoveAt(0); + _notSuccessfulCollection.Add(semaphore); + semaphore?.Release(); + } + } + finally + { + _lock.ExitWriteLock(); + } + } } } From 6d04f60657b8807fa08a19c65e8e1e97f751b6f4 Mon Sep 17 00:00:00 2001 From: Krzysztof Nozderko Date: Tue, 21 May 2024 11:32:30 +0000 Subject: [PATCH 2/2] rename method --- Snowflake.Data/Core/Session/SessionPool.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index b37808d08..de66c2240 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -535,7 +535,7 @@ internal void DestroyPoolAsync() lock (_sessionPoolLock) { _underDestruction = true; - ClearAllPoolsAsync(); + ClearIdleSessionsAsync(); _busySessionsCounter.Reset(); _waitingForIdleSessionQueue.Reset(); _sessionCreationTokenCounter.Reset(); @@ -566,9 +566,9 @@ internal void ClearIdleSessions() } } - internal async void ClearAllPoolsAsync() + internal async void ClearIdleSessionsAsync() { - s_logger.Debug("SessionPool::ClearAllPoolsAsync" + PoolIdentification()); + s_logger.Debug("SessionPool::ClearIdleSessionsAsync" + PoolIdentification()); IEnumerable idleSessionsCopy; lock (_sessionPoolLock) {