Skip to content

Commit

Permalink
SNOW-937190 Wait for idle sessions available (#840)
Browse files Browse the repository at this point in the history
### Description
SNOW-937190 Wait for idle sessions available if session count exceeds
max count

### Checklist
- [x] Code compiles correctly
- [x] Code is formatted according to [Coding
Conventions](../CodingConventions.md)
- [x] Created tests which fail without the change (if possible)
- [x] All tests passing (`dotnet test`)
- [x] Extended the README / documentation, if necessary
- [x] Provide JIRA issue id (if possible) or GitHub issue id in PR name
  • Loading branch information
sfc-gh-knozderko authored Jan 29, 2024
1 parent 56f0b81 commit 85204fc
Show file tree
Hide file tree
Showing 22 changed files with 1,304 additions and 156 deletions.
135 changes: 135 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
Expand Down Expand Up @@ -95,6 +97,123 @@ public void TestReuseSessionInConnectionPoolReachingMaxConnections() // old name
Assert.AreEqual(ConnectionState.Closed, conn3.State);
Assert.AreEqual(ConnectionState.Closed, conn4.State);
}

[Test]
public void TestWaitForTheIdleConnectionWhenExceedingMaxConnectionsLimit()
{
// arrange
var connectionString = ConnectionString + "application=TestWaitForMaxSize1";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "expecting pool to be empty");
pool.SetMaxPoolSize(2);
pool.SetWaitingForSessionToReuseTimeout(1000);
var conn1 = OpenedConnection(connectionString);
var conn2 = OpenedConnection(connectionString);
var watch = new StopWatch();

// act
watch.Start();
var start = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
var thrown = Assert.Throws<SnowflakeDbException>(() => OpenedConnection(connectionString));
var stop = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
watch.Stop();

// assert
Assert.That(thrown.Message, Does.Contain("Unable to connect. Could not obtain a connection from the pool within a given timeout"));
Assert.That(watch.ElapsedMilliseconds, Is.InRange(1000, 1500));
Assert.AreEqual(pool.GetCurrentPoolSize(), 2);

// cleanup
conn1.Close();
conn2.Close();
}

[Test]
public void TestWaitForTheIdleConnectionWhenExceedingMaxConnectionsLimitAsync()
{
// arrange
var connectionString = ConnectionString + "application=TestWaitForMaxSize2";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "expecting pool to be empty");
pool.SetMaxPoolSize(2);
pool.SetWaitingForSessionToReuseTimeout(1000);
var conn1 = OpenedConnection(connectionString);
var conn2 = OpenedConnection(connectionString);
var watch = new StopWatch();

// act
watch.Start();
var thrown = Assert.ThrowsAsync<SnowflakeDbException>(() => OpenedConnectionAsync(connectionString));
watch.Stop();

// assert
Assert.That(thrown.Message, Does.Contain("Unable to connect"));
Assert.IsTrue(thrown.InnerException is AggregateException);
var nestedException = ((AggregateException)thrown.InnerException).InnerException;
Assert.That(nestedException.Message, Does.Contain("Could not obtain a connection from the pool within a given timeout"));
Assert.That(watch.ElapsedMilliseconds, Is.InRange(1000, 1500));
Assert.AreEqual(pool.GetCurrentPoolSize(), 2);

// cleanup
conn1.Close();
conn2.Close();
}

[Test]
public void TestWaitInAQueueForAnIdleSession()
{
// arrange
var connectionString = ConnectionString + "application=TestWaitForMaxSize3";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "the pool is expected to be empty");
pool.SetMaxPoolSize(2);
pool.SetWaitingForSessionToReuseTimeout(3000);
const long ADelay = 0;
const long BDelay = 400;
const long CDelay = 2 * BDelay;
const long DDelay = 3 * BDelay;
const long ABDelayAfterConnect = 2000;
const long ConnectPessimisticEstimate = 1300;
const long StartDelayPessimisticEstimate = 350;
const long AMinConnectionReleaseTime = ADelay + ABDelayAfterConnect; // 2000
const long AMaxConnectionReleaseTime = ADelay + StartDelayPessimisticEstimate + ConnectPessimisticEstimate + ABDelayAfterConnect; // 3650
const long BMinConnectionReleaseTime = BDelay + ABDelayAfterConnect; // 2400
const long BMaxConnectionReleaseTime = BDelay + StartDelayPessimisticEstimate + ConnectPessimisticEstimate + ABDelayAfterConnect; // 4050
const long CMinConnectDuration = AMinConnectionReleaseTime - CDelay - StartDelayPessimisticEstimate; // 2000 - 800 - 350 = 850
const long CMaxConnectDuration = AMaxConnectionReleaseTime - CDelay; // 3650 - 800 = 2850
const long DMinConnectDuration = BMinConnectionReleaseTime - DDelay - StartDelayPessimisticEstimate; // 2400 - 1200 - 350 = 850
const long DMaxConnectDuration = BMaxConnectionReleaseTime - DDelay; // 3650 - 800 = 2850

var threads = new ConnectingThreads(connectionString)
.NewThread("A", ADelay, ABDelayAfterConnect, true)
.NewThread("B", BDelay, ABDelayAfterConnect, true)
.NewThread("C", CDelay, 0, true)
.NewThread("D", DDelay, 0, true);
pool.SetSessionPoolEventHandler(new SessionPoolThreadEventHandler(threads));

// act
threads.StartAll().JoinAll();

// assert
var events = threads.Events().ToList();
Assert.AreEqual(6, events.Count);
var waitingEvents = events.Where(e => e.IsWaitingEvent()).ToList();
Assert.AreEqual(2, waitingEvents.Count);
CollectionAssert.AreEquivalent(new[] { "C", "D" }, waitingEvents.Select(e => e.ThreadName)); // equivalent = in any order
var connectedEvents = events.Where(e => e.IsConnectedEvent()).ToList();
Assert.AreEqual(4, connectedEvents.Count);
var firstConnectedEventsGroup = connectedEvents.GetRange(0, 2);
CollectionAssert.AreEquivalent(new[] { "A", "B" }, firstConnectedEventsGroup.Select(e => e.ThreadName));
var lastConnectingEventsGroup = connectedEvents.GetRange(2, 2);
CollectionAssert.AreEquivalent(new[] { "C", "D" }, lastConnectingEventsGroup.Select(e => e.ThreadName));
Assert.LessOrEqual(firstConnectedEventsGroup[0].Duration, ConnectPessimisticEstimate);
Assert.LessOrEqual(firstConnectedEventsGroup[1].Duration, ConnectPessimisticEstimate);
// first to wait from C and D should first to connect, because we won't create a new session, we just reuse sessions returned by A and B threads
Assert.AreEqual(waitingEvents[0].ThreadName, lastConnectingEventsGroup[0].ThreadName);
Assert.AreEqual(waitingEvents[1].ThreadName, lastConnectingEventsGroup[1].ThreadName);
Assert.That(lastConnectingEventsGroup[0].Duration, Is.InRange(CMinConnectDuration, CMaxConnectDuration));
Assert.That(lastConnectingEventsGroup[1].Duration, Is.InRange(DMinConnectDuration, DMaxConnectDuration));
}

[Test]
public void TestBusyAndIdleConnectionsCountedInPoolSize()
Expand Down Expand Up @@ -186,5 +305,21 @@ public void TestNewConnectionPoolClean()
Assert.AreEqual(ConnectionState.Closed, conn2.State);
Assert.AreEqual(ConnectionState.Closed, conn3.State);
}

private SnowflakeDbConnection OpenedConnection(string connectionString)
{
var connection = new SnowflakeDbConnection();
connection.ConnectionString = connectionString;
connection.Open();
return connection;
}

private async Task<SnowflakeDbConnection> OpenedConnectionAsync(string connectionString)
{
var connection = new SnowflakeDbConnection();
connection.ConnectionString = connectionString;
await connection.OpenAsync().ConfigureAwait(false);
return connection;
}
}
}
124 changes: 1 addition & 123 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
* Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
*/

using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Snowflake.Data.Core;
using Snowflake.Data.Client;
Expand Down Expand Up @@ -52,90 +49,7 @@ public static void AfterAllTests()
{
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
// test connection pooling with concurrent connection
public void TestConcurrentConnectionPooling()
{
// add test case name in connection string to make in unique for each test case
string connStr = ConnectionString + ";application=TestConcurrentConnectionPooling";
ConcurrentPoolingHelper(connStr, true);
}

[Test]
// test connection pooling with concurrent connection and no close
// call for connection. Connection is closed when Dispose() is called
// by framework.
public void TestConcurrentConnectionPoolingDispose()
{
// add test case name in connection string to make in unique for each test case
string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingNoClose";
ConcurrentPoolingHelper(connStr, false);
}

static void ConcurrentPoolingHelper(string connectionString, bool closeConnection)
{
// thread number a bit larger than pool size so some connections
// would fail on pooling while some connections could success
const int ThreadNum = 12;
// set short pooling timeout to cover the case that connection expired
const int PoolTimeout = 3;

// reset to default settings in case it changed by other test cases
Assert.AreEqual(true, SnowflakeDbConnectionPool.GetPool(connectionString).GetPooling()); // to instantiate pool
SnowflakeDbConnectionPool.SetMaxPoolSize(10);
SnowflakeDbConnectionPool.SetTimeout(PoolTimeout);

var threads = new Task[ThreadNum];
for (int i = 0; i < ThreadNum; i++)
{
threads[i] = Task.Factory.StartNew(() =>
{
QueryExecutionThread(connectionString, closeConnection);
});
}
Task.WaitAll(threads);
}

// thead to execute query with new connection in a loop
static void QueryExecutionThread(string connectionString, bool closeConnection)
{
for (int i = 0; i < 100; i++)
{
using (DbConnection conn = new SnowflakeDbConnection(connectionString))
{
conn.Open();
using (DbCommand cmd = conn.CreateCommand())
{
cmd.CommandText = "select 1, 2, 3";
try
{
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
for (int j = 0; j < reader.FieldCount; j++)
{
// Process each column as appropriate
reader.GetFieldValue<object>(j);
}
}
}
}
catch (Exception e)
{
Assert.Fail("Caught unexpected exception: " + e);
}
}

if (closeConnection)
{
conn.Close();
}
}
}
}


[Test]
public void TestBasicConnectionPool()
{
Expand All @@ -150,42 +64,6 @@ public void TestBasicConnectionPool()
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize());
}

[Test]
public void TestConnectionPoolIsFull()
{
var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString);
SnowflakeDbConnectionPool.SetMaxPoolSize(2);
var conn1 = new SnowflakeDbConnection();
conn1.ConnectionString = ConnectionString;
conn1.Open();
Assert.AreEqual(ConnectionState.Open, conn1.State);

var conn2 = new SnowflakeDbConnection();
conn2.ConnectionString = ConnectionString;
conn2.Open();
Assert.AreEqual(ConnectionState.Open, conn2.State);

var conn3 = new SnowflakeDbConnection();
conn3.ConnectionString = ConnectionString;
conn3.Open();
Assert.AreEqual(ConnectionState.Open, conn3.State);
SnowflakeDbConnectionPool.ClearAllPools();
pool = SnowflakeDbConnectionPool.GetPool(ConnectionString);
SnowflakeDbConnectionPool.SetMaxPoolSize(2);

conn1.Close();
Assert.AreEqual(1, pool.GetCurrentPoolSize());
conn2.Close();
Assert.AreEqual(2, pool.GetCurrentPoolSize());
conn3.Close();
Assert.AreEqual(2, pool.GetCurrentPoolSize());

Assert.AreEqual(ConnectionState.Closed, conn1.State);
Assert.AreEqual(ConnectionState.Closed, conn2.State);
Assert.AreEqual(ConnectionState.Closed, conn3.State);
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
public void TestConnectionPoolExpirationWorks()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
using System.Data;
using System.Data.Common;
using System.Threading.Tasks;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
Expand Down Expand Up @@ -32,6 +35,88 @@ public static void AfterAllTests()
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
public void TestConcurrentConnectionPooling()
{
// add test case name in connection string to make in unique for each test case
string connStr = ConnectionString + ";application=TestConcurrentConnectionPooling";
ConcurrentPoolingHelper(connStr, true);
}

[Test]
// test connection pooling with concurrent connection and no close
// call for connection. Connection is closed when Dispose() is called
// by framework.
public void TestConcurrentConnectionPoolingDispose()
{
// add test case name in connection string to make in unique for each test case
string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingNoClose";
ConcurrentPoolingHelper(connStr, false);
}

static void ConcurrentPoolingHelper(string connectionString, bool closeConnection)
{
// thread number a bit larger than pool size so some connections
// would fail on pooling while some connections could success
const int ThreadNum = 12;
// set short pooling timeout to cover the case that connection expired
const int PoolTimeout = 3;

// reset to default settings in case it changed by other test cases
Assert.AreEqual(true, SnowflakeDbConnectionPool.GetPool(connectionString).GetPooling()); // to instantiate pool
SnowflakeDbConnectionPool.SetMaxPoolSize(10);
SnowflakeDbConnectionPool.SetTimeout(PoolTimeout);

var threads = new Task[ThreadNum];
for (int i = 0; i < ThreadNum; i++)
{
threads[i] = Task.Factory.StartNew(() =>
{
QueryExecutionThread(connectionString, closeConnection);
});
}
Task.WaitAll(threads);
}

// thead to execute query with new connection in a loop
static void QueryExecutionThread(string connectionString, bool closeConnection)
{
for (int i = 0; i < 100; i++)
{
using (DbConnection conn = new SnowflakeDbConnection(connectionString))
{
conn.Open();
using (DbCommand cmd = conn.CreateCommand())
{
cmd.CommandText = "select 1, 2, 3";
try
{
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
for (int j = 0; j < reader.FieldCount; j++)
{
// Process each column as appropriate
reader.GetFieldValue<object>(j);
}
}
}
}
catch (Exception e)
{
Assert.Fail("Caught unexpected exception: " + e);
}
}

if (closeConnection)
{
conn.Close();
}
}
}
}

[Test]
public void TestPoolContainsClosedConnections() // old name: TestConnectionPool
{
Expand Down
Loading

0 comments on commit 85204fc

Please sign in to comment.