From a949e359b1d298a486f5d2d842d5c7ed32a0050c Mon Sep 17 00:00:00 2001 From: Krzysztof Nozderko Date: Wed, 3 Jan 2024 14:09:51 +0100 Subject: [PATCH] use events for better tests --- .../ConnectionMultiplePoolsIT.cs | 43 ++++++++--------- .../Session/SemaphoreBasedQueueTest.cs | 2 +- .../ConnectingThreads.cs | 47 ++++++++++++++++--- .../Core/Session/ISessionPoolEventHandler.cs | 11 +++++ Snowflake.Data/Core/Session/SessionPool.cs | 20 +++++++- .../Core/Session/SessionPoolEventHandler.cs | 17 +++++++ 6 files changed, 111 insertions(+), 29 deletions(-) rename Snowflake.Data.Tests/{IntegrationTests => Util}/ConnectingThreads.cs (70%) create mode 100644 Snowflake.Data/Core/Session/ISessionPoolEventHandler.cs create mode 100644 Snowflake.Data/Core/Session/SessionPoolEventHandler.cs diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs index f25ed353b..ac504fc2a 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs @@ -175,34 +175,35 @@ public void TestWaitInAQueueForAnIdleSession() pool.SetWaitingTimeout(3000); var threads = new ConnectingThreads(connectionString) .NewThread("A", 0, 2000, true) - .NewThread("B", 50, 2000, true) + .NewThread("B", 0, 2000, true) .NewThread("C", 100, 0, true) - .NewThread("D", 150, 0, true); - var watch = new StopWatch(); + .NewThread("D", 100, 0, true); + pool.SetSessionPoolEventHandler(new SessionPoolThreadEventHandler(threads)); // act - watch.Start(); threads.StartAll().JoinAll(); - watch.Stop(); // assert var events = threads.Events().ToList(); - Assert.AreEqual(4, events.Count); - CollectionAssert.AreEqual( - new[] - { - Tuple.Create("A", "CONNECTED"), - Tuple.Create("B", "CONNECTED"), - Tuple.Create("C", "CONNECTED"), - Tuple.Create("D", "CONNECTED") - }, - events.Select(e => Tuple.Create(e.ThreadName, e.EventName))); - Assert.LessOrEqual(events[0].Duration, 1000); - Assert.LessOrEqual(events[1].Duration, 1000); - Assert.GreaterOrEqual(events[2].Duration, 2000); - Assert.LessOrEqual(events[2].Duration, 3100); - Assert.GreaterOrEqual(events[3].Duration, 2000); - Assert.LessOrEqual(events[3].Duration, 3100); + Assert.AreEqual(6, events.Count); + var waitingEvents = events.Where(e => e.IsWaitingEvent()).ToList(); + Assert.AreEqual(2, waitingEvents.Count); + CollectionAssert.AreEquivalent(new[] { "C", "D" }, waitingEvents.Select(e => e.ThreadName)); // equivalent = in any order + var connectedEvents = events.Where(e => e.IsConnectedEvent()).ToList(); + Assert.AreEqual(4, connectedEvents.Count); + var firstConnectedEventsGroup = connectedEvents.GetRange(0, 2); + CollectionAssert.AreEquivalent(new[] { "A", "B" }, firstConnectedEventsGroup.Select(e => e.ThreadName)); + var lastConnectingEventsGroup = connectedEvents.GetRange(2, 2); + CollectionAssert.AreEquivalent(new[] { "C", "D" }, lastConnectingEventsGroup.Select(e => e.ThreadName)); + Assert.LessOrEqual(firstConnectedEventsGroup[0].Duration, 1000); + Assert.LessOrEqual(firstConnectedEventsGroup[1].Duration, 1000); + // first to wait from C and D should first to connect, because we won't create a new session, we just reuse sessions returned by A and B threads + Assert.AreEqual(waitingEvents[0].ThreadName, lastConnectingEventsGroup[0].ThreadName); + Assert.AreEqual(waitingEvents[1].ThreadName, lastConnectingEventsGroup[1].ThreadName); + Assert.GreaterOrEqual(lastConnectingEventsGroup[0].Duration, 1900); + Assert.LessOrEqual(lastConnectingEventsGroup[0].Duration, 3100); + Assert.GreaterOrEqual(lastConnectingEventsGroup[1].Duration, 2000); + Assert.LessOrEqual(lastConnectingEventsGroup[1].Duration, 3100); } [Test] diff --git a/Snowflake.Data.Tests/UnitTests/Session/SemaphoreBasedQueueTest.cs b/Snowflake.Data.Tests/UnitTests/Session/SemaphoreBasedQueueTest.cs index 2ec2c698b..c39ae2a7a 100644 --- a/Snowflake.Data.Tests/UnitTests/Session/SemaphoreBasedQueueTest.cs +++ b/Snowflake.Data.Tests/UnitTests/Session/SemaphoreBasedQueueTest.cs @@ -42,7 +42,7 @@ public void TestWaitForTheResourceUntilCancellation() // assert Assert.IsFalse(result); - Assert.GreaterOrEqual(watch.ElapsedMilliseconds, 50); + Assert.GreaterOrEqual(watch.ElapsedMilliseconds, 45); // sometimes Wait takes a bit smaller amount of time than it should. Thus we expect it to be greater than 45, not just 50. Assert.LessOrEqual(watch.ElapsedMilliseconds, 120); } diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectingThreads.cs b/Snowflake.Data.Tests/Util/ConnectingThreads.cs similarity index 70% rename from Snowflake.Data.Tests/IntegrationTests/ConnectingThreads.cs rename to Snowflake.Data.Tests/Util/ConnectingThreads.cs index be33f1ce5..297f744c1 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectingThreads.cs +++ b/Snowflake.Data.Tests/Util/ConnectingThreads.cs @@ -1,10 +1,11 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Threading; using Snowflake.Data.Client; +using Snowflake.Data.Core.Session; +using Snowflake.Data.Tests.Util; namespace Snowflake.Data.Tests.IntegrationTests { @@ -50,6 +51,8 @@ public ConnectingThreads JoinAll() } public IEnumerable Events() => _events.ToArray().OfType(); + + public void Enqueue(ThreadEvent threadEvent) => _events.Enqueue(threadEvent); } class ConnectingThread @@ -66,6 +69,8 @@ class ConnectingThread private bool _closeOnExit; + internal const string NamePrefix = "thread_"; + public ConnectingThread( string name, ConcurrentQueue events, @@ -85,7 +90,7 @@ public ConnectingThread( public Thread Build() { var thread = new Thread(Execute); - thread.Name = "thread_" + _name; + thread.Name = NamePrefix + _name; return thread; } @@ -94,7 +99,7 @@ private void Execute() var connection = new SnowflakeDbConnection(); connection.ConnectionString = _connectionString; Sleep(_waitBeforeConnectMillis); - var watch = new Stopwatch(); + var watch = new StopWatch(); watch.Start(); var connected = false; try @@ -125,7 +130,7 @@ private void Sleep(long millis) { return; } - System.Threading.Thread.Sleep((int) millis); + Thread.Sleep((int) millis); } } @@ -141,6 +146,10 @@ class ThreadEvent public long Duration { get; set; } + private const string Connected = "CONNECTED"; + private const string WaitingForSession = "WAITING_FOR_SESSION"; + private const string FailedToConnect = "FAILED_TO_CONNECT"; + public ThreadEvent(string threadName, string eventName, Exception error, long duration) { ThreadName = threadName; @@ -149,11 +158,37 @@ public ThreadEvent(string threadName, string eventName, Exception error, long du Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); Duration = duration; } + + public bool IsConnectedEvent() => EventName.Equals(Connected); + + public bool IsWaitingEvent() => EventName.Equals(WaitingForSession); public static ThreadEvent EventConnected(string threadName, long duration) => - new ThreadEvent(threadName, "CONNECTED", null, duration); + new ThreadEvent(threadName, Connected, null, duration); public static ThreadEvent EventConnectingFailed(string threadName, Exception exception, long duration) => - new ThreadEvent(threadName, "FAILED_TO_CONNECT", exception, duration); + new ThreadEvent(threadName, FailedToConnect, exception, duration); + + public static ThreadEvent EventWaitingForSessionStarted(string threadName) => + new ThreadEvent(threadName, WaitingForSession, null, 0); + } + + class SessionPoolThreadEventHandler: SessionPoolEventHandler + { + private readonly ConnectingThreads _connectingThreads; + + public SessionPoolThreadEventHandler(ConnectingThreads connectingThreads) + { + _connectingThreads = connectingThreads; + } + + public override void OnWaitingForSessionStarted(SessionPool sessionPool) + { + var threadName = Thread.CurrentThread.Name; + var realThreadName = threadName.StartsWith(ConnectingThread.NamePrefix) + ? threadName.Substring(ConnectingThread.NamePrefix.Length) : threadName; + var waitingStartedEvent = ThreadEvent.EventWaitingForSessionStarted(realThreadName); + _connectingThreads.Enqueue(waitingStartedEvent); + } } } diff --git a/Snowflake.Data/Core/Session/ISessionPoolEventHandler.cs b/Snowflake.Data/Core/Session/ISessionPoolEventHandler.cs new file mode 100644 index 000000000..59c18d4c3 --- /dev/null +++ b/Snowflake.Data/Core/Session/ISessionPoolEventHandler.cs @@ -0,0 +1,11 @@ +namespace Snowflake.Data.Core.Session +{ + internal interface ISessionPoolEventHandler + { + void OnNewSessionCreated(SessionPool sessionPool); + + void OnWaitingForSessionStarted(SessionPool sessionPool); + + void OnSessionProvided(SessionPool sessionPool); + } +} diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index 0b9a3b47a..743d1f867 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -31,6 +31,7 @@ sealed class SessionPool : IDisposable internal SecureString Password { get; } private bool _pooling = true; private readonly ICounter _busySessionsCounter; + private ISessionPoolEventHandler _sessionPoolEventHandler = new SessionPoolEventHandler(); // a way to inject some additional behaviour after certain events. Can be used for example to measure time of given steps. private SessionPool() { @@ -103,6 +104,10 @@ internal SFSession GetSession(string connStr, SecureString password) if (!_pooling) return NewSession(connStr, password, _noPoolingCreateSessionTokens.BeginCreate()); var sessionOrCreateToken = GetIdleSession(connStr); + if (sessionOrCreateToken.Session != null) + { + _sessionPoolEventHandler.OnSessionProvided(this); + } return sessionOrCreateToken.Session ?? NewSession(connStr, password, sessionOrCreateToken.CreateToken); } @@ -112,6 +117,10 @@ internal async Task GetSessionAsync(string connStr, SecureString pass if (!_pooling) return await NewSessionAsync(connStr, password, _noPoolingCreateSessionTokens.BeginCreate(), cancellationToken).ConfigureAwait(false); var sessionOrCreateToken = GetIdleSession(connStr); + if (sessionOrCreateToken.Session != null) + { + _sessionPoolEventHandler.OnSessionProvided(this); + } return sessionOrCreateToken.Session ?? await NewSessionAsync(connStr, password, sessionOrCreateToken.CreateToken, cancellationToken).ConfigureAwait(false); } @@ -120,6 +129,11 @@ internal async Task GetSessionAsync(string connStr, SecureString pass internal Task GetSessionAsync(CancellationToken cancellationToken) => GetSessionAsync(ConnectionString, Password, cancellationToken); + internal void SetSessionPoolEventHandler(ISessionPoolEventHandler sessionPoolEventHandler) + { + _sessionPoolEventHandler = sessionPoolEventHandler; + } + private SessionOrCreateToken GetIdleSession(string connStr) { s_logger.Debug("SessionPool::GetIdleSession"); @@ -170,6 +184,7 @@ private SFSession WaitForSession(string connStr) { var timeout = _waitingQueue.GetWaitingTimeoutMillis(); s_logger.Warn($"SessionPool::WaitForSession for {timeout} millis timeout"); + _sessionPoolEventHandler.OnWaitingForSessionStarted(this); var beforeWaitingTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); string debugApplicationName = ""; if (connStr.EndsWith("application=TestWaitForMaxSize1")) @@ -251,6 +266,8 @@ private SFSession NewSession(String connectionString, SecureString password, Cre _busySessionsCounter.Increase(); } } + _sessionPoolEventHandler.OnNewSessionCreated(this); + _sessionPoolEventHandler.OnSessionProvided(this); s_logger.Warn($"SessionPool::NewSession - finish: {DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"); // TODO: remove !!! return session; } @@ -298,7 +315,8 @@ private Task NewSessionAsync(String connectionString, SecureString pa _busySessionsCounter.Increase(); } } - + _sessionPoolEventHandler.OnNewSessionCreated(this); + _sessionPoolEventHandler.OnSessionProvided(this); return session; }, TaskContinuationOptions.None); // previously it was NotOnCanceled but we would like to execute it even in case of cancellation to properly update counters } diff --git a/Snowflake.Data/Core/Session/SessionPoolEventHandler.cs b/Snowflake.Data/Core/Session/SessionPoolEventHandler.cs new file mode 100644 index 000000000..4ebc8d429 --- /dev/null +++ b/Snowflake.Data/Core/Session/SessionPoolEventHandler.cs @@ -0,0 +1,17 @@ +namespace Snowflake.Data.Core.Session +{ + internal class SessionPoolEventHandler: ISessionPoolEventHandler + { + public virtual void OnNewSessionCreated(SessionPool sessionPool) + { + } + + public virtual void OnWaitingForSessionStarted(SessionPool sessionPool) + { + } + + public virtual void OnSessionProvided(SessionPool sessionPool) + { + } + } +}