Skip to content

Commit

Permalink
SNOW-902610 Min pool size
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-knozderko committed Mar 4, 2024
1 parent 42e27b6 commit 771c7ae
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 62 deletions.
47 changes: 40 additions & 7 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Snowflake.Data.Client;
Expand Down Expand Up @@ -35,6 +36,19 @@ public static void AfterAllTests()
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
public void TestBasicConnectionPool()
{
var connectingString = ConnectionString + "maxPoolSize=1;minPoolSize=1";
var conn1 = new SnowflakeDbConnection(connectingString);
conn1.Open();
Assert.AreEqual(ConnectionState.Open, conn1.State);
conn1.Close();

Assert.AreEqual(ConnectionState.Closed, conn1.State);
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(connectingString).GetCurrentPoolSize());
}

[Test]
public void TestReuseSessionInConnectionPool() // old name: TestConnectionPool
{
Expand Down Expand Up @@ -211,10 +225,10 @@ public void TestWaitInAQueueForAnIdleSession()
public void TestBusyAndIdleConnectionsCountedInPoolSize()
{
// arrange
var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString);
pool.SetMaxPoolSize(2);
var connectionString = ConnectionString + "maxPoolSize=2;minPoolSize=1";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
var connection = new SnowflakeDbConnection();
connection.ConnectionString = ConnectionString;
connection.ConnectionString = connectionString;

// act
connection.Open();
Expand Down Expand Up @@ -267,19 +281,19 @@ public void TestConnectionPoolDisable()
[Test]
public void TestNewConnectionPoolClean()
{
SnowflakeDbConnectionPool.SetMaxPoolSize(2);
var connectionString = ConnectionString + "maxPoolSize=2;minPoolSize=1;";
var conn1 = new SnowflakeDbConnection();
conn1.ConnectionString = ConnectionString;
conn1.ConnectionString = connectionString;
conn1.Open();
Assert.AreEqual(ConnectionState.Open, conn1.State);

var conn2 = new SnowflakeDbConnection();
conn2.ConnectionString = ConnectionString + " retryCount=1";
conn2.ConnectionString = connectionString + "retryCount=1";
conn2.Open();
Assert.AreEqual(ConnectionState.Open, conn2.State);

var conn3 = new SnowflakeDbConnection();
conn3.ConnectionString = ConnectionString + " retryCount=2";
conn3.ConnectionString = connectionString + "retryCount=2";
conn3.Open();
Assert.AreEqual(ConnectionState.Open, conn3.State);

Expand Down Expand Up @@ -324,6 +338,25 @@ public void TestConnectionPoolExpirationWorks()
Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize());
}

[Test]
public void TestMinPoolSize()
{
// arrange
var connection = new SnowflakeDbConnection();
connection.ConnectionString = ConnectionString + "application=TestMinPoolSize;minPoolSize=3";

// act
connection.Open();
Thread.Sleep(3000);

// assert
var pool = SnowflakeDbConnectionPool.GetPool(connection.ConnectionString);
Assert.AreEqual(3, pool.GetCurrentPoolSize());

// cleanup
connection.Close();
}

private SnowflakeDbConnection OpenedConnection(string connectionString)
{
var connection = new SnowflakeDbConnection();
Expand Down
14 changes: 0 additions & 14 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,6 @@ public static void AfterAllTests()
{
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
public void TestBasicConnectionPool()
{
SnowflakeDbConnectionPool.SetMaxPoolSize(1);

var conn1 = new SnowflakeDbConnection(ConnectionString);
conn1.Open();
Assert.AreEqual(ConnectionState.Open, conn1.State);
conn1.Close();

Assert.AreEqual(ConnectionState.Closed, conn1.State);
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize());
}

[Test]
public void TestConnectionPoolMultiThreading()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ public static void AfterAllTests()
SnowflakeDbConnectionPool.ClearAllPools();
}

[Test]
public void TestBasicConnectionPool()
{
SnowflakeDbConnectionPool.SetMaxPoolSize(1);

var conn1 = new SnowflakeDbConnection(ConnectionString);
conn1.Open();
Assert.AreEqual(ConnectionState.Open, conn1.State);
conn1.Close();

Assert.AreEqual(ConnectionState.Closed, conn1.State);
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize());
}

[Test]
public void TestConcurrentConnectionPooling()
{
Expand Down
4 changes: 2 additions & 2 deletions Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ namespace Snowflake.Data.Tests.UnitTests
class ConnectionPoolManagerTest
{
private readonly ConnectionPoolManager _connectionPoolManager = new ConnectionPoolManager();
private const string ConnectionString1 = "database=D1;warehouse=W1;account=A1;user=U1;password=P1;role=R1;";
private const string ConnectionString2 = "database=D2;warehouse=W2;account=A2;user=U2;password=P2;role=R2;";
private const string ConnectionString1 = "database=D1;warehouse=W1;account=A1;user=U1;password=P1;role=R1;minPoolSize=1;";
private const string ConnectionString2 = "database=D2;warehouse=W2;account=A2;user=U2;password=P2;role=R2;minPoolSize=1;";
private readonly SecureString _password = new SecureString();
private static PoolConfig s_poolConfig;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Linq;
using NUnit.Framework;
using Snowflake.Data.Core;
using Snowflake.Data.Core.Session;

namespace Snowflake.Data.Tests.UnitTests.Session
{
[TestFixture]
public class SessionOrCreationTokensTest
{
private SFSession _session = new SFSession("account=test;user=test;password=test", null);

[Test]
public void TestNoBackgroundSessionsToCreateWhenInitialisedWithSession()
{
// arrange
var sessionOrTokens = new SessionOrCreationTokens(_session);

// act
var backgroundCreationTokens = sessionOrTokens.BackgroundSessionCreationTokens();

Assert.AreEqual(0, backgroundCreationTokens.Count);
}

[Test]
public void TestReturnFirstCreationToken()
{
// arrange
var sessionCreationTokenCounter = new SessionCreationTokenCounter(TimeSpan.FromSeconds(10));
var tokens = Enumerable.Range(1, 3)
.Select(_ => sessionCreationTokenCounter.NewToken())
.ToList();
var sessionOrTokens = new SessionOrCreationTokens(tokens);

// act
var token = sessionOrTokens.SessionCreationToken();

// assert
Assert.AreSame(tokens[0], token);
}

[Test]
public void TestReturnCreationTokensFromTheSecondOneForBackgroundExecution()
{
// arrange
var sessionCreationTokenCounter = new SessionCreationTokenCounter(TimeSpan.FromSeconds(10));
var tokens = Enumerable.Range(1, 3)
.Select(_ => sessionCreationTokenCounter.NewToken())
.ToList();
var sessionOrTokens = new SessionOrCreationTokens(tokens);

// act
var backgroundTokens = sessionOrTokens.BackgroundSessionCreationTokens();

// assert
Assert.AreEqual(2, backgroundTokens.Count);
Assert.AreSame(tokens[1], backgroundTokens[0]);
Assert.AreSame(tokens[2], backgroundTokens[1]);
}
}
}
22 changes: 0 additions & 22 deletions Snowflake.Data/Core/Session/SessionOrCreationToken.cs

This file was deleted.

41 changes: 41 additions & 0 deletions Snowflake.Data/Core/Session/SessionOrCreationTokens.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
using System.Linq;

namespace Snowflake.Data.Core.Session
{
internal class SessionOrCreationTokens
{
private static readonly List<SessionCreationToken> s_emptySessionCreationTokenList = new List<SessionCreationToken>();

public SFSession Session { get; }
public List<SessionCreationToken> SessionCreationTokens { get; }

public SessionOrCreationTokens(SFSession session)
{
Session = session ?? throw new Exception("Internal error: missing session");
SessionCreationTokens = s_emptySessionCreationTokenList;
}

public SessionOrCreationTokens(List<SessionCreationToken> sessionCreationTokens)
{
Session = null;
if (sessionCreationTokens == null || sessionCreationTokens.Count == 0)
{
throw new Exception("Internal error: missing session creation token");
}
SessionCreationTokens = sessionCreationTokens;
}

public List<SessionCreationToken> BackgroundSessionCreationTokens()
{
if (Session == null)
{
return SessionCreationTokens.Skip(1).ToList();
}
return SessionCreationTokens;
}

public SessionCreationToken SessionCreationToken() => SessionCreationTokens.First();
}
}
56 changes: 39 additions & 17 deletions Snowflake.Data/Core/Session/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,40 @@ internal SFSession GetSession(string connStr, SecureString password)
s_logger.Debug("SessionPool::GetSession");
if (!GetPooling())
return NewNonPoolingSession(connStr, password);
var sessionOrCreateToken = GetIdleSession(connStr);
if (sessionOrCreateToken.Session != null)
var sessionOrCreateTokens = GetIdleSession(connStr);
if (sessionOrCreateTokens.Session != null)
{
_sessionPoolEventHandler.OnSessionProvided(this);
}
return sessionOrCreateToken.Session ?? NewSession(connStr, password, sessionOrCreateToken.SessionCreationToken);
sessionOrCreateTokens.BackgroundSessionCreationTokens().ForEach(token =>
ScheduleNewIdleSession(connStr, password, token)
);
return sessionOrCreateTokens.Session ?? NewSession(connStr, password, sessionOrCreateTokens.SessionCreationToken());
}

internal async Task<SFSession> GetSessionAsync(string connStr, SecureString password, CancellationToken cancellationToken)
{
s_logger.Debug("SessionPool::GetSessionAsync");
if (!GetPooling())
return await NewNonPoolingSessionAsync(connStr, password, cancellationToken).ConfigureAwait(false);
var sessionOrCreateToken = GetIdleSession(connStr);
if (sessionOrCreateToken.Session != null)
var sessionOrCreateTokens = GetIdleSession(connStr);
if (sessionOrCreateTokens.Session != null)
{
_sessionPoolEventHandler.OnSessionProvided(this);
}
return sessionOrCreateToken.Session ?? await NewSessionAsync(connStr, password, sessionOrCreateToken.SessionCreationToken, cancellationToken).ConfigureAwait(false);
sessionOrCreateTokens.BackgroundSessionCreationTokens().ForEach(token =>
ScheduleNewIdleSession(connStr, password, token)
);
return sessionOrCreateTokens.Session ?? await NewSessionAsync(connStr, password, sessionOrCreateTokens.SessionCreationToken(), cancellationToken).ConfigureAwait(false);
}

internal void ScheduleNewIdleSession(string connStr, SecureString password, SessionCreationToken token)
{
Task.Run(() =>
{
var session = NewSession(connStr, password, token);
AddSession(session);
});
}

internal SFSession GetSession() => GetSession(ConnectionString, Password);
Expand All @@ -146,7 +161,7 @@ internal void SetSessionPoolEventHandler(ISessionPoolEventHandler sessionPoolEve
_sessionPoolEventHandler = sessionPoolEventHandler;
}

private SessionOrCreationToken GetIdleSession(string connStr)
private SessionOrCreationTokens GetIdleSession(string connStr)
{
s_logger.Debug("SessionPool::GetIdleSession");
lock (_sessionPoolLock)
Expand All @@ -161,34 +176,41 @@ private SessionOrCreationToken GetIdleSession(string connStr)
if (session != null)
{
s_logger.Debug("SessionPool::GetIdleSession - no thread was waiting for a session, an idle session was retrieved from the pool");
return new SessionOrCreationToken(session);
return new SessionOrCreationTokens(session);
}
s_logger.Debug("SessionPool::GetIdleSession - no thread was waiting for a session, but could not find any idle session available in the pool");
if (IsAllowedToCreateNewSession())
var sessionsCount = AllowedNumberOfNewSessionCreations();
if (sessionsCount > 0)
{
// there is no need to wait for a session since we can create a new one
return new SessionOrCreationToken(_sessionCreationTokenCounter.NewToken());
// there is no need to wait for a session since we can create new ones
var sessionCreationTokens = Enumerable.Range(1, sessionsCount)
.Select(_ => _sessionCreationTokenCounter.NewToken())
.ToList();
return new SessionOrCreationTokens(sessionCreationTokens);
}
}
}
return new SessionOrCreationToken(WaitForSession(connStr));
return new SessionOrCreationTokens(WaitForSession(connStr));
}

private bool IsAllowedToCreateNewSession()
private int AllowedNumberOfNewSessionCreations()
{
if (!_waitingForIdleSessionQueue.IsWaitingEnabled())
{
s_logger.Debug($"SessionPool - creating of new sessions is not limited");
return true;
return 1; // waiting disabled means we are in .... TODO
}
var currentSize = GetCurrentPoolSize();
if (currentSize < _poolConfig.MaxPoolSize)
{
s_logger.Debug($"SessionPool - allowed to create a session, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}");
return true;
var maxSessionsToCreate = _poolConfig.MaxPoolSize - currentSize;
var sessionsNeeded = Math.Max(_poolConfig.MinPoolSize - currentSize, 1);
var sessionsToCreate = Math.Min(sessionsNeeded, maxSessionsToCreate);
s_logger.Debug($"SessionPool - allowed to create {sessionsToCreate} sessions, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}");
return sessionsToCreate;
}
s_logger.Debug($"SessionPool - not allowed to create a session, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}");
return false;
return 0;
}

private SFSession WaitForSession(string connStr)
Expand Down

0 comments on commit 771c7ae

Please sign in to comment.