Skip to content

Commit

Permalink
use events for better tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-knozderko committed Jan 3, 2024
1 parent 74a0f0c commit a949e35
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 29 deletions.
43 changes: 22 additions & 21 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,34 +175,35 @@ public void TestWaitInAQueueForAnIdleSession()
pool.SetWaitingTimeout(3000);
var threads = new ConnectingThreads(connectionString)
.NewThread("A", 0, 2000, true)
.NewThread("B", 50, 2000, true)
.NewThread("B", 0, 2000, true)
.NewThread("C", 100, 0, true)
.NewThread("D", 150, 0, true);
var watch = new StopWatch();
.NewThread("D", 100, 0, true);
pool.SetSessionPoolEventHandler(new SessionPoolThreadEventHandler(threads));

// act
watch.Start();
threads.StartAll().JoinAll();
watch.Stop();

// assert
var events = threads.Events().ToList();
Assert.AreEqual(4, events.Count);
CollectionAssert.AreEqual(
new[]
{
Tuple.Create("A", "CONNECTED"),
Tuple.Create("B", "CONNECTED"),
Tuple.Create("C", "CONNECTED"),
Tuple.Create("D", "CONNECTED")
},
events.Select(e => Tuple.Create(e.ThreadName, e.EventName)));
Assert.LessOrEqual(events[0].Duration, 1000);
Assert.LessOrEqual(events[1].Duration, 1000);
Assert.GreaterOrEqual(events[2].Duration, 2000);
Assert.LessOrEqual(events[2].Duration, 3100);
Assert.GreaterOrEqual(events[3].Duration, 2000);
Assert.LessOrEqual(events[3].Duration, 3100);
Assert.AreEqual(6, events.Count);
var waitingEvents = events.Where(e => e.IsWaitingEvent()).ToList();
Assert.AreEqual(2, waitingEvents.Count);
CollectionAssert.AreEquivalent(new[] { "C", "D" }, waitingEvents.Select(e => e.ThreadName)); // equivalent = in any order
var connectedEvents = events.Where(e => e.IsConnectedEvent()).ToList();
Assert.AreEqual(4, connectedEvents.Count);
var firstConnectedEventsGroup = connectedEvents.GetRange(0, 2);
CollectionAssert.AreEquivalent(new[] { "A", "B" }, firstConnectedEventsGroup.Select(e => e.ThreadName));
var lastConnectingEventsGroup = connectedEvents.GetRange(2, 2);
CollectionAssert.AreEquivalent(new[] { "C", "D" }, lastConnectingEventsGroup.Select(e => e.ThreadName));
Assert.LessOrEqual(firstConnectedEventsGroup[0].Duration, 1000);
Assert.LessOrEqual(firstConnectedEventsGroup[1].Duration, 1000);
// first to wait from C and D should first to connect, because we won't create a new session, we just reuse sessions returned by A and B threads
Assert.AreEqual(waitingEvents[0].ThreadName, lastConnectingEventsGroup[0].ThreadName);
Assert.AreEqual(waitingEvents[1].ThreadName, lastConnectingEventsGroup[1].ThreadName);
Assert.GreaterOrEqual(lastConnectingEventsGroup[0].Duration, 1900);
Assert.LessOrEqual(lastConnectingEventsGroup[0].Duration, 3100);
Assert.GreaterOrEqual(lastConnectingEventsGroup[1].Duration, 2000);
Assert.LessOrEqual(lastConnectingEventsGroup[1].Duration, 3100);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void TestWaitForTheResourceUntilCancellation()

// assert
Assert.IsFalse(result);
Assert.GreaterOrEqual(watch.ElapsedMilliseconds, 50);
Assert.GreaterOrEqual(watch.ElapsedMilliseconds, 45); // sometimes Wait takes a bit smaller amount of time than it should. Thus we expect it to be greater than 45, not just 50.
Assert.LessOrEqual(watch.ElapsedMilliseconds, 120);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
using Snowflake.Data.Tests.Util;

namespace Snowflake.Data.Tests.IntegrationTests
{
Expand Down Expand Up @@ -50,6 +51,8 @@ public ConnectingThreads JoinAll()
}

public IEnumerable<ThreadEvent> Events() => _events.ToArray().OfType<ThreadEvent>();

public void Enqueue(ThreadEvent threadEvent) => _events.Enqueue(threadEvent);
}

class ConnectingThread
Expand All @@ -66,6 +69,8 @@ class ConnectingThread

private bool _closeOnExit;

internal const string NamePrefix = "thread_";

public ConnectingThread(
string name,
ConcurrentQueue<ThreadEvent> events,
Expand All @@ -85,7 +90,7 @@ public ConnectingThread(
public Thread Build()
{
var thread = new Thread(Execute);
thread.Name = "thread_" + _name;
thread.Name = NamePrefix + _name;
return thread;
}

Expand All @@ -94,7 +99,7 @@ private void Execute()
var connection = new SnowflakeDbConnection();
connection.ConnectionString = _connectionString;
Sleep(_waitBeforeConnectMillis);
var watch = new Stopwatch();
var watch = new StopWatch();
watch.Start();
var connected = false;
try
Expand Down Expand Up @@ -125,7 +130,7 @@ private void Sleep(long millis)
{
return;
}
System.Threading.Thread.Sleep((int) millis);
Thread.Sleep((int) millis);
}
}

Expand All @@ -141,6 +146,10 @@ class ThreadEvent

public long Duration { get; set; }

private const string Connected = "CONNECTED";
private const string WaitingForSession = "WAITING_FOR_SESSION";
private const string FailedToConnect = "FAILED_TO_CONNECT";

public ThreadEvent(string threadName, string eventName, Exception error, long duration)
{
ThreadName = threadName;
Expand All @@ -149,11 +158,37 @@ public ThreadEvent(string threadName, string eventName, Exception error, long du
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
Duration = duration;
}

public bool IsConnectedEvent() => EventName.Equals(Connected);

public bool IsWaitingEvent() => EventName.Equals(WaitingForSession);

public static ThreadEvent EventConnected(string threadName, long duration) =>
new ThreadEvent(threadName, "CONNECTED", null, duration);
new ThreadEvent(threadName, Connected, null, duration);

public static ThreadEvent EventConnectingFailed(string threadName, Exception exception, long duration) =>
new ThreadEvent(threadName, "FAILED_TO_CONNECT", exception, duration);
new ThreadEvent(threadName, FailedToConnect, exception, duration);

public static ThreadEvent EventWaitingForSessionStarted(string threadName) =>
new ThreadEvent(threadName, WaitingForSession, null, 0);
}

class SessionPoolThreadEventHandler: SessionPoolEventHandler
{
private readonly ConnectingThreads _connectingThreads;

public SessionPoolThreadEventHandler(ConnectingThreads connectingThreads)
{
_connectingThreads = connectingThreads;
}

public override void OnWaitingForSessionStarted(SessionPool sessionPool)
{
var threadName = Thread.CurrentThread.Name;
var realThreadName = threadName.StartsWith(ConnectingThread.NamePrefix)
? threadName.Substring(ConnectingThread.NamePrefix.Length) : threadName;
var waitingStartedEvent = ThreadEvent.EventWaitingForSessionStarted(realThreadName);
_connectingThreads.Enqueue(waitingStartedEvent);
}
}
}
11 changes: 11 additions & 0 deletions Snowflake.Data/Core/Session/ISessionPoolEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Snowflake.Data.Core.Session
{
internal interface ISessionPoolEventHandler
{
void OnNewSessionCreated(SessionPool sessionPool);

void OnWaitingForSessionStarted(SessionPool sessionPool);

void OnSessionProvided(SessionPool sessionPool);
}
}
20 changes: 19 additions & 1 deletion Snowflake.Data/Core/Session/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ sealed class SessionPool : IDisposable
internal SecureString Password { get; }
private bool _pooling = true;
private readonly ICounter _busySessionsCounter;
private ISessionPoolEventHandler _sessionPoolEventHandler = new SessionPoolEventHandler(); // a way to inject some additional behaviour after certain events. Can be used for example to measure time of given steps.

private SessionPool()
{
Expand Down Expand Up @@ -103,6 +104,10 @@ internal SFSession GetSession(string connStr, SecureString password)
if (!_pooling)
return NewSession(connStr, password, _noPoolingCreateSessionTokens.BeginCreate());
var sessionOrCreateToken = GetIdleSession(connStr);
if (sessionOrCreateToken.Session != null)
{
_sessionPoolEventHandler.OnSessionProvided(this);
}
return sessionOrCreateToken.Session ?? NewSession(connStr, password, sessionOrCreateToken.CreateToken);
}

Expand All @@ -112,6 +117,10 @@ internal async Task<SFSession> GetSessionAsync(string connStr, SecureString pass
if (!_pooling)
return await NewSessionAsync(connStr, password, _noPoolingCreateSessionTokens.BeginCreate(), cancellationToken).ConfigureAwait(false);

Check warning on line 118 in Snowflake.Data/Core/Session/SessionPool.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/Session/SessionPool.cs#L118

Added line #L118 was not covered by tests
var sessionOrCreateToken = GetIdleSession(connStr);
if (sessionOrCreateToken.Session != null)
{
_sessionPoolEventHandler.OnSessionProvided(this);
}
return sessionOrCreateToken.Session ?? await NewSessionAsync(connStr, password, sessionOrCreateToken.CreateToken, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -120,6 +129,11 @@ internal async Task<SFSession> GetSessionAsync(string connStr, SecureString pass
internal Task<SFSession> GetSessionAsync(CancellationToken cancellationToken) =>
GetSessionAsync(ConnectionString, Password, cancellationToken);

internal void SetSessionPoolEventHandler(ISessionPoolEventHandler sessionPoolEventHandler)
{
_sessionPoolEventHandler = sessionPoolEventHandler;
}

private SessionOrCreateToken GetIdleSession(string connStr)
{
s_logger.Debug("SessionPool::GetIdleSession");
Expand Down Expand Up @@ -170,6 +184,7 @@ private SFSession WaitForSession(string connStr)
{
var timeout = _waitingQueue.GetWaitingTimeoutMillis();
s_logger.Warn($"SessionPool::WaitForSession for {timeout} millis timeout");
_sessionPoolEventHandler.OnWaitingForSessionStarted(this);
var beforeWaitingTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
string debugApplicationName = "";
if (connStr.EndsWith("application=TestWaitForMaxSize1"))
Expand Down Expand Up @@ -251,6 +266,8 @@ private SFSession NewSession(String connectionString, SecureString password, Cre
_busySessionsCounter.Increase();
}
}
_sessionPoolEventHandler.OnNewSessionCreated(this);
_sessionPoolEventHandler.OnSessionProvided(this);
s_logger.Warn($"SessionPool::NewSession - finish: {DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"); // TODO: remove !!!
return session;
}
Expand Down Expand Up @@ -298,7 +315,8 @@ private Task<SFSession> NewSessionAsync(String connectionString, SecureString pa
_busySessionsCounter.Increase();
}
}

_sessionPoolEventHandler.OnNewSessionCreated(this);
_sessionPoolEventHandler.OnSessionProvided(this);

Check warning on line 319 in Snowflake.Data/Core/Session/SessionPool.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/Session/SessionPool.cs#L311-L319

Added lines #L311 - L319 were not covered by tests
return session;
}, TaskContinuationOptions.None); // previously it was NotOnCanceled but we would like to execute it even in case of cancellation to properly update counters

Check warning on line 321 in Snowflake.Data/Core/Session/SessionPool.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/Session/SessionPool.cs#L321

Added line #L321 was not covered by tests
}
Expand Down
17 changes: 17 additions & 0 deletions Snowflake.Data/Core/Session/SessionPoolEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Snowflake.Data.Core.Session
{
internal class SessionPoolEventHandler: ISessionPoolEventHandler
{
public virtual void OnNewSessionCreated(SessionPool sessionPool)
{
}

public virtual void OnWaitingForSessionStarted(SessionPool sessionPool)
{
}

public virtual void OnSessionProvided(SessionPool sessionPool)
{
}
}
}

0 comments on commit a949e35

Please sign in to comment.