Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-937190 Wait for idle sessions available #840

Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ee2bff8
SNOW-937190 Wait for idle sessions available
sfc-gh-knozderko Nov 23, 2023
69d2545
add assert to check if the pool is empty at the beginning
sfc-gh-knozderko Dec 19, 2023
939f9f7
test with unique connection string per test
sfc-gh-knozderko Dec 20, 2023
b071399
fix
sfc-gh-knozderko Dec 20, 2023
2bb889f
some improvements
sfc-gh-knozderko Dec 20, 2023
1b763f5
debug info
sfc-gh-knozderko Jan 2, 2024
8eb34fd
for GH testing
sfc-gh-knozderko Jan 2, 2024
99799bc
fix waiting loop
sfc-gh-knozderko Jan 2, 2024
f5c1c23
fix timeout units
sfc-gh-knozderko Jan 2, 2024
1c3536d
fix TestDecreaseResources
sfc-gh-knozderko Jan 2, 2024
867bb25
more debug
sfc-gh-knozderko Jan 2, 2024
74a0f0c
time based stopwatch
sfc-gh-knozderko Jan 2, 2024
a949e35
use events for better tests
sfc-gh-knozderko Jan 3, 2024
9593408
tiny changes
sfc-gh-knozderko Jan 3, 2024
a567465
tiny change
sfc-gh-knozderko Jan 3, 2024
8e82261
fifo waiting queue
sfc-gh-knozderko Jan 3, 2024
a01a485
thread D delayed to C
sfc-gh-knozderko Jan 4, 2024
ee2f57c
clean debug changes
sfc-gh-knozderko Jan 4, 2024
826bbe7
more cleaning
sfc-gh-knozderko Jan 4, 2024
6c430c5
read write lock
sfc-gh-knozderko Jan 4, 2024
f8934ec
changes after review/improvements
sfc-gh-knozderko Jan 8, 2024
31b3899
rename classes, little refactors
sfc-gh-knozderko Jan 8, 2024
e583906
fix flaky tests
sfc-gh-knozderko Jan 9, 2024
a0b141d
little renaming
sfc-gh-knozderko Jan 9, 2024
f3b9a3e
fix tests
sfc-gh-knozderko Jan 9, 2024
ecf56fc
fix tests
sfc-gh-knozderko Jan 9, 2024
24a6947
tune tests
sfc-gh-knozderko Jan 10, 2024
68d1c68
fix test, more visibility in tests
sfc-gh-knozderko Jan 10, 2024
7bbdb17
rewrite test
sfc-gh-knozderko Jan 10, 2024
c425b2c
fix tests
sfc-gh-knozderko Jan 11, 2024
3ac7b88
fix comments
sfc-gh-knozderko Jan 11, 2024
144de1a
litle rename
sfc-gh-knozderko Jan 11, 2024
9f585a2
changes after review
sfc-gh-knozderko Jan 25, 2024
284b36a
fix cancellation case
sfc-gh-knozderko Jan 26, 2024
d42f4d5
fix cancellation case
sfc-gh-knozderko Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
Expand Down Expand Up @@ -95,6 +97,123 @@ public void TestReuseSessionInConnectionPoolReachingMaxConnections() // old name
Assert.AreEqual(ConnectionState.Closed, conn3.State);
Assert.AreEqual(ConnectionState.Closed, conn4.State);
}

[Test]
public void TestWaitForTheIdleConnectionWhenExceedingMaxConnectionsLimit()
{
// arrange
var connectionString = ConnectionString + "application=TestWaitForMaxSize1";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "expecting pool to be empty");
pool.SetMaxPoolSize(2);
pool.SetWaitingForSessionToReuseTimeout(1000);
var conn1 = OpenedConnection(connectionString);
var conn2 = OpenedConnection(connectionString);
var watch = new StopWatch();

// act
watch.Start();
var start = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
var thrown = Assert.Throws<SnowflakeDbException>(() => OpenedConnection(connectionString));
var stop = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
watch.Stop();

// assert
Assert.That(thrown.Message, Does.Contain("Unable to connect. Could not obtain a connection from the pool within a given timeout"));
Assert.That(watch.ElapsedMilliseconds, Is.InRange(1000, 1500));
Assert.AreEqual(pool.GetCurrentPoolSize(), 2);

// cleanup
conn1.Close();
conn2.Close();
}

[Test]
public void TestWaitForTheIdleConnectionWhenExceedingMaxConnectionsLimitAsync()
{
// arrange
var connectionString = ConnectionString + "application=TestWaitForMaxSize2";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "expecting pool to be empty");
pool.SetMaxPoolSize(2);
pool.SetWaitingForSessionToReuseTimeout(1000);
var conn1 = OpenedConnection(connectionString);
var conn2 = OpenedConnection(connectionString);
var watch = new StopWatch();

// act
watch.Start();
var thrown = Assert.ThrowsAsync<SnowflakeDbException>(() => OpenedConnectionAsync(connectionString));
watch.Stop();

// assert
Assert.That(thrown.Message, Does.Contain("Unable to connect"));
Assert.IsTrue(thrown.InnerException is AggregateException);
var nestedException = ((AggregateException)thrown.InnerException).InnerException;
Assert.That(nestedException.Message, Does.Contain("Could not obtain a connection from the pool within a given timeout"));
Assert.That(watch.ElapsedMilliseconds, Is.InRange(1000, 1500));
Assert.AreEqual(pool.GetCurrentPoolSize(), 2);

// cleanup
conn1.Close();
conn2.Close();
}

[Test]
public void TestWaitInAQueueForAnIdleSession()
{
// arrange
var connectionString = ConnectionString + "application=TestWaitForMaxSize3";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "the pool is expected to be empty");
pool.SetMaxPoolSize(2);
pool.SetWaitingForSessionToReuseTimeout(3000);
const long ADelay = 0;
const long BDelay = 400;
const long CDelay = 2 * BDelay;
const long DDelay = 3 * BDelay;
const long ABDelayAfterConnect = 2000;
const long ConnectPessimisticEstimate = 1300;
const long StartDelayPessimisticEstimate = 350;
const long AMinConnectionReleaseTime = ADelay + ABDelayAfterConnect; // 2000
const long AMaxConnectionReleaseTime = ADelay + StartDelayPessimisticEstimate + ConnectPessimisticEstimate + ABDelayAfterConnect; // 3650
const long BMinConnectionReleaseTime = BDelay + ABDelayAfterConnect; // 2400
const long BMaxConnectionReleaseTime = BDelay + StartDelayPessimisticEstimate + ConnectPessimisticEstimate + ABDelayAfterConnect; // 4050
const long CMinConnectDuration = AMinConnectionReleaseTime - CDelay - StartDelayPessimisticEstimate; // 2000 - 800 - 350 = 850
const long CMaxConnectDuration = AMaxConnectionReleaseTime - CDelay; // 3650 - 800 = 2850
const long DMinConnectDuration = BMinConnectionReleaseTime - DDelay - StartDelayPessimisticEstimate; // 2400 - 1200 - 350 = 850
const long DMaxConnectDuration = BMaxConnectionReleaseTime - DDelay; // 3650 - 800 = 2850

var threads = new ConnectingThreads(connectionString)
.NewThread("A", ADelay, ABDelayAfterConnect, true)
.NewThread("B", BDelay, ABDelayAfterConnect, true)
.NewThread("C", CDelay, 0, true)
.NewThread("D", DDelay, 0, true);
pool.SetSessionPoolEventHandler(new SessionPoolThreadEventHandler(threads));

// act
threads.StartAll().JoinAll();

// assert
var events = threads.Events().ToList();
Assert.AreEqual(6, events.Count);
var waitingEvents = events.Where(e => e.IsWaitingEvent()).ToList();
Assert.AreEqual(2, waitingEvents.Count);
CollectionAssert.AreEquivalent(new[] { "C", "D" }, waitingEvents.Select(e => e.ThreadName)); // equivalent = in any order
var connectedEvents = events.Where(e => e.IsConnectedEvent()).ToList();
Assert.AreEqual(4, connectedEvents.Count);
var firstConnectedEventsGroup = connectedEvents.GetRange(0, 2);
CollectionAssert.AreEquivalent(new[] { "A", "B" }, firstConnectedEventsGroup.Select(e => e.ThreadName));
var lastConnectingEventsGroup = connectedEvents.GetRange(2, 2);
CollectionAssert.AreEquivalent(new[] { "C", "D" }, lastConnectingEventsGroup.Select(e => e.ThreadName));
Assert.LessOrEqual(firstConnectedEventsGroup[0].Duration, ConnectPessimisticEstimate);
Assert.LessOrEqual(firstConnectedEventsGroup[1].Duration, ConnectPessimisticEstimate);
// first to wait from C and D should first to connect, because we won't create a new session, we just reuse sessions returned by A and B threads
Assert.AreEqual(waitingEvents[0].ThreadName, lastConnectingEventsGroup[0].ThreadName);
Assert.AreEqual(waitingEvents[1].ThreadName, lastConnectingEventsGroup[1].ThreadName);
Assert.That(lastConnectingEventsGroup[0].Duration, Is.InRange(CMinConnectDuration, CMaxConnectDuration));
Assert.That(lastConnectingEventsGroup[1].Duration, Is.InRange(DMinConnectDuration, DMaxConnectDuration));
}

[Test]
public void TestBusyAndIdleConnectionsCountedInPoolSize()
Expand Down Expand Up @@ -186,5 +305,21 @@ public void TestNewConnectionPoolClean()
Assert.AreEqual(ConnectionState.Closed, conn2.State);
Assert.AreEqual(ConnectionState.Closed, conn3.State);
}

private SnowflakeDbConnection OpenedConnection(string connectionString)
{
var connection = new SnowflakeDbConnection();
connection.ConnectionString = connectionString;
connection.Open();
return connection;
}

private async Task<SnowflakeDbConnection> OpenedConnectionAsync(string connectionString)
{
var connection = new SnowflakeDbConnection();
connection.ConnectionString = connectionString;
await connection.OpenAsync().ConfigureAwait(false);
return connection;
}
}
}
124 changes: 1 addition & 123 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
* Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
*/

using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Snowflake.Data.Core;
using Snowflake.Data.Client;
Expand Down Expand Up @@ -52,90 +49,7 @@ public static void AfterAllTests()
{
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
// test connection pooling with concurrent connection
public void TestConcurrentConnectionPooling()
{
// add test case name in connection string to make in unique for each test case
string connStr = ConnectionString + ";application=TestConcurrentConnectionPooling";
ConcurrentPoolingHelper(connStr, true);
}

[Test]
// test connection pooling with concurrent connection and no close
// call for connection. Connection is closed when Dispose() is called
// by framework.
public void TestConcurrentConnectionPoolingDispose()
{
// add test case name in connection string to make in unique for each test case
string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingNoClose";
ConcurrentPoolingHelper(connStr, false);
}

static void ConcurrentPoolingHelper(string connectionString, bool closeConnection)
{
// thread number a bit larger than pool size so some connections
// would fail on pooling while some connections could success
const int ThreadNum = 12;
// set short pooling timeout to cover the case that connection expired
const int PoolTimeout = 3;

// reset to default settings in case it changed by other test cases
Assert.AreEqual(true, SnowflakeDbConnectionPool.GetPool(connectionString).GetPooling()); // to instantiate pool
SnowflakeDbConnectionPool.SetMaxPoolSize(10);
SnowflakeDbConnectionPool.SetTimeout(PoolTimeout);

var threads = new Task[ThreadNum];
for (int i = 0; i < ThreadNum; i++)
{
threads[i] = Task.Factory.StartNew(() =>
{
QueryExecutionThread(connectionString, closeConnection);
});
}
Task.WaitAll(threads);
}

// thead to execute query with new connection in a loop
static void QueryExecutionThread(string connectionString, bool closeConnection)
{
for (int i = 0; i < 100; i++)
{
using (DbConnection conn = new SnowflakeDbConnection(connectionString))
{
conn.Open();
using (DbCommand cmd = conn.CreateCommand())
{
cmd.CommandText = "select 1, 2, 3";
try
{
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
for (int j = 0; j < reader.FieldCount; j++)
{
// Process each column as appropriate
reader.GetFieldValue<object>(j);
}
}
}
}
catch (Exception e)
{
Assert.Fail("Caught unexpected exception: " + e);
}
}

if (closeConnection)
{
conn.Close();
}
}
}
}


[Test]
public void TestBasicConnectionPool()
{
Expand All @@ -150,42 +64,6 @@ public void TestBasicConnectionPool()
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize());
}

[Test]
public void TestConnectionPoolIsFull()
sfc-gh-mhofman marked this conversation as resolved.
Show resolved Hide resolved
{
var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString);
SnowflakeDbConnectionPool.SetMaxPoolSize(2);
var conn1 = new SnowflakeDbConnection();
conn1.ConnectionString = ConnectionString;
conn1.Open();
Assert.AreEqual(ConnectionState.Open, conn1.State);

var conn2 = new SnowflakeDbConnection();
conn2.ConnectionString = ConnectionString;
conn2.Open();
Assert.AreEqual(ConnectionState.Open, conn2.State);

var conn3 = new SnowflakeDbConnection();
conn3.ConnectionString = ConnectionString;
conn3.Open();
Assert.AreEqual(ConnectionState.Open, conn3.State);
SnowflakeDbConnectionPool.ClearAllPools();
pool = SnowflakeDbConnectionPool.GetPool(ConnectionString);
SnowflakeDbConnectionPool.SetMaxPoolSize(2);

conn1.Close();
Assert.AreEqual(1, pool.GetCurrentPoolSize());
conn2.Close();
Assert.AreEqual(2, pool.GetCurrentPoolSize());
conn3.Close();
Assert.AreEqual(2, pool.GetCurrentPoolSize());

Assert.AreEqual(ConnectionState.Closed, conn1.State);
Assert.AreEqual(ConnectionState.Closed, conn2.State);
Assert.AreEqual(ConnectionState.Closed, conn3.State);
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
public void TestConnectionPoolExpirationWorks()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
using System.Data;
using System.Data.Common;
using System.Threading.Tasks;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
Expand Down Expand Up @@ -32,6 +35,88 @@ public static void AfterAllTests()
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
public void TestConcurrentConnectionPooling()
{
// add test case name in connection string to make in unique for each test case
string connStr = ConnectionString + ";application=TestConcurrentConnectionPooling";
ConcurrentPoolingHelper(connStr, true);
}

[Test]
// test connection pooling with concurrent connection and no close
// call for connection. Connection is closed when Dispose() is called
// by framework.
public void TestConcurrentConnectionPoolingDispose()
{
// add test case name in connection string to make in unique for each test case
string connStr = ConnectionString + ";application=TestConcurrentConnectionPoolingNoClose";
ConcurrentPoolingHelper(connStr, false);
}

static void ConcurrentPoolingHelper(string connectionString, bool closeConnection)
{
// thread number a bit larger than pool size so some connections
// would fail on pooling while some connections could success
const int ThreadNum = 12;
// set short pooling timeout to cover the case that connection expired
const int PoolTimeout = 3;

// reset to default settings in case it changed by other test cases
Assert.AreEqual(true, SnowflakeDbConnectionPool.GetPool(connectionString).GetPooling()); // to instantiate pool
SnowflakeDbConnectionPool.SetMaxPoolSize(10);
SnowflakeDbConnectionPool.SetTimeout(PoolTimeout);

var threads = new Task[ThreadNum];
for (int i = 0; i < ThreadNum; i++)
{
threads[i] = Task.Factory.StartNew(() =>
{
QueryExecutionThread(connectionString, closeConnection);
});
}
Task.WaitAll(threads);
}

// thead to execute query with new connection in a loop
static void QueryExecutionThread(string connectionString, bool closeConnection)
{
for (int i = 0; i < 100; i++)
{
using (DbConnection conn = new SnowflakeDbConnection(connectionString))
{
conn.Open();
using (DbCommand cmd = conn.CreateCommand())
{
cmd.CommandText = "select 1, 2, 3";
try
{
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
for (int j = 0; j < reader.FieldCount; j++)
{
// Process each column as appropriate
reader.GetFieldValue<object>(j);
}
}
}
}
catch (Exception e)
{
Assert.Fail("Caught unexpected exception: " + e);
}
}

if (closeConnection)
{
conn.Close();
}
}
}
}

[Test]
public void TestPoolContainsClosedConnections() // old name: TestConnectionPool
{
Expand Down
Loading
Loading