Skip to content

Commit

Permalink
SNOW-937190 Wait for idle sessions available
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-knozderko committed Dec 19, 2023
1 parent 56f0b81 commit ee2bff8
Show file tree
Hide file tree
Showing 19 changed files with 1,056 additions and 157 deletions.
159 changes: 159 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/ConnectingThreads.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Snowflake.Data.Client;

namespace Snowflake.Data.Tests.IntegrationTests
{
class ConnectingThreads
{
private string _connectionString;

private ConcurrentQueue<ThreadEvent> _events = new ConcurrentQueue<ThreadEvent>();

private List<Thread> threads = new List<Thread>();

public ConnectingThreads(string connectionString)
{
_connectionString = connectionString;
}

public ConnectingThreads NewThread(string name,
long waitBeforeConnectMillis,
long waitAfterConnectMillis,
bool closeOnExit)
{
var thread = new ConnectingThread(
name,
_events,
_connectionString,
waitBeforeConnectMillis,
waitAfterConnectMillis,
closeOnExit).Build();
threads.Add(thread);
return this;
}

public ConnectingThreads StartAll()
{
threads.ForEach(thread => thread.Start());
return this;
}

public ConnectingThreads JoinAll()
{
threads.ForEach(thread => thread.Join());
return this;
}

public IEnumerable<ThreadEvent> Events() => _events.ToArray().OfType<ThreadEvent>();
}

class ConnectingThread
{
private string _name;

private ConcurrentQueue<ThreadEvent> _events;

private string _connectionString;

private long _waitBeforeConnectMillis;

private long _waitAfterConnectMillis;

private bool _closeOnExit;

public ConnectingThread(
string name,
ConcurrentQueue<ThreadEvent> events,
string connectionString,
long waitBeforeConnectMillis,
long waitAfterConnectMillis,
bool closeOnExit)
{
_name = name;
_events = events;
_connectionString = connectionString;
_waitBeforeConnectMillis = waitBeforeConnectMillis;
_waitAfterConnectMillis = waitAfterConnectMillis;
_closeOnExit = closeOnExit;
}

public Thread Build()
{
var thread = new Thread(Execute);
thread.Name = "thread_" + _name;
return thread;
}

private void Execute()
{
var connection = new SnowflakeDbConnection();
connection.ConnectionString = _connectionString;
Sleep(_waitBeforeConnectMillis);
var watch = new Stopwatch();
watch.Start();
var connected = false;
try
{
connection.Open();
connected = true;
}
catch (Exception exception)
{
watch.Stop();
_events.Enqueue(ThreadEvent.EventConnectingFailed(_name, exception, watch.ElapsedMilliseconds));
}
if (connected)
{
watch.Stop();
_events.Enqueue(ThreadEvent.EventConnected(_name, watch.ElapsedMilliseconds));
}
Sleep(_waitAfterConnectMillis);
if (_closeOnExit)
{
connection.Close();
}
}

private void Sleep(long millis)
{
if (millis <= 0)
{
return;
}
System.Threading.Thread.Sleep((int) millis);
}
}

class ThreadEvent
{
public string ThreadName { get; set; }

public string EventName { get; set; }

public Exception Error { get; set; }

public long Timestamp { get; set; }

public long Duration { get; set; }

public ThreadEvent(string threadName, string eventName, Exception error, long duration)
{
ThreadName = threadName;
EventName = eventName;
Error = error;
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
Duration = duration;
}

public static ThreadEvent EventConnected(string threadName, long duration) =>
new ThreadEvent(threadName, "CONNECTED", null, duration);

public static ThreadEvent EventConnectingFailed(string threadName, Exception exception, long duration) =>
new ThreadEvent(threadName, "FAILED_TO_CONNECT", exception, duration);
}
}
76 changes: 76 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Data;
using System.Diagnostics;
using System.Linq;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
Expand Down Expand Up @@ -95,6 +97,72 @@ public void TestReuseSessionInConnectionPoolReachingMaxConnections() // old name
Assert.AreEqual(ConnectionState.Closed, conn3.State);
Assert.AreEqual(ConnectionState.Closed, conn4.State);
}

[Test]
public void TestWaitForTheIdleConnectionWhenExceedingMaxConnectionsLimit()
{
// arrange
var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString);
pool.SetMaxPoolSize(2);
pool.SetWaitingTimeout(1000);
var conn1 = OpenedConnection();
var conn2 = OpenedConnection();
var watch = new Stopwatch();

// act
watch.Start();
var thrown = Assert.Throws<SnowflakeDbException>(() => OpenedConnection());
watch.Stop();

// assert
Assert.That(thrown.Message, Does.Contain("Unable to connect. Could not obtain a connection from the pool within a given timeout"));
Assert.GreaterOrEqual(watch.ElapsedMilliseconds, 1000);
Assert.LessOrEqual(watch.ElapsedMilliseconds, 1500);
Assert.AreEqual(pool.GetCurrentPoolSize(), 2);

// cleanup
conn1.Close();
conn2.Close();
}

[Test]
public void TestWaitInAQueueForAnIdleSession()
{
// arrange
var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString);
pool.SetMaxPoolSize(2);
pool.SetWaitingTimeout(3000);
var threads = new ConnectingThreads(ConnectionString)
.NewThread("A", 0, 2000, true)
.NewThread("B", 50, 2000, true)
.NewThread("C", 100, 0, true)
.NewThread("D", 150, 0, true);
var watch = new Stopwatch();

// act
watch.Start();
threads.StartAll().JoinAll();
watch.Stop();

// assert
var events = threads.Events().ToList();
Assert.AreEqual(4, events.Count);
CollectionAssert.AreEqual(
new[]
{
Tuple.Create("A", "CONNECTED"),
Tuple.Create("B", "CONNECTED"),
Tuple.Create("C", "CONNECTED"),
Tuple.Create("D", "CONNECTED")
},
events.Select(e => Tuple.Create(e.ThreadName, e.EventName)));
Assert.LessOrEqual(events[0].Duration, 1000);
Assert.LessOrEqual(events[1].Duration, 1000);
Assert.GreaterOrEqual(events[2].Duration, 2000);
Assert.LessOrEqual(events[2].Duration, 3100);
Assert.GreaterOrEqual(events[3].Duration, 2000);
Assert.LessOrEqual(events[3].Duration, 3100);
}

[Test]
public void TestBusyAndIdleConnectionsCountedInPoolSize()
Expand Down Expand Up @@ -186,5 +254,13 @@ public void TestNewConnectionPoolClean()
Assert.AreEqual(ConnectionState.Closed, conn2.State);
Assert.AreEqual(ConnectionState.Closed, conn3.State);
}

private SnowflakeDbConnection OpenedConnection()
{
var connection = new SnowflakeDbConnection();
connection.ConnectionString = ConnectionString;
connection.Open();
return connection;
}
}
}
Loading

0 comments on commit ee2bff8

Please sign in to comment.