From 771c7ae9c67d19168e0d7f56d50e5c0117688c47 Mon Sep 17 00:00:00 2001 From: Krzysztof Nozderko Date: Fri, 1 Mar 2024 17:35:12 +0100 Subject: [PATCH] SNOW-902610 Min pool size --- .../ConnectionMultiplePoolsIT.cs | 47 +++++++++++--- .../ConnectionPoolCommonIT.cs | 14 ----- .../ConnectionSinglePoolCacheIT.cs | 14 +++++ .../UnitTests/ConnectionPoolManagerTest.cs | 4 +- .../Session/SessionOrCreationTokensTest.cs | 62 +++++++++++++++++++ .../Core/Session/SessionOrCreationToken.cs | 22 ------- .../Core/Session/SessionOrCreationTokens.cs | 41 ++++++++++++ Snowflake.Data/Core/Session/SessionPool.cs | 56 ++++++++++++----- 8 files changed, 198 insertions(+), 62 deletions(-) create mode 100644 Snowflake.Data.Tests/UnitTests/Session/SessionOrCreationTokensTest.cs delete mode 100644 Snowflake.Data/Core/Session/SessionOrCreationToken.cs create mode 100644 Snowflake.Data/Core/Session/SessionOrCreationTokens.cs diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs index fbc143c59..1093dfee5 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs @@ -1,6 +1,7 @@ using System; using System.Data; using System.Linq; +using System.Threading; using System.Threading.Tasks; using NUnit.Framework; using Snowflake.Data.Client; @@ -35,6 +36,19 @@ public static void AfterAllTests() SnowflakeDbConnectionPool.ClearAllPools(); } + [Test] + public void TestBasicConnectionPool() + { + var connectingString = ConnectionString + "maxPoolSize=1;minPoolSize=1"; + var conn1 = new SnowflakeDbConnection(connectingString); + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + conn1.Close(); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(connectingString).GetCurrentPoolSize()); + } + [Test] public void TestReuseSessionInConnectionPool() // old name: TestConnectionPool { @@ -211,10 +225,10 @@ public void TestWaitInAQueueForAnIdleSession() public void TestBusyAndIdleConnectionsCountedInPoolSize() { // arrange - var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString); - pool.SetMaxPoolSize(2); + var connectionString = ConnectionString + "maxPoolSize=2;minPoolSize=1"; + var pool = SnowflakeDbConnectionPool.GetPool(connectionString); var connection = new SnowflakeDbConnection(); - connection.ConnectionString = ConnectionString; + connection.ConnectionString = connectionString; // act connection.Open(); @@ -267,19 +281,19 @@ public void TestConnectionPoolDisable() [Test] public void TestNewConnectionPoolClean() { - SnowflakeDbConnectionPool.SetMaxPoolSize(2); + var connectionString = ConnectionString + "maxPoolSize=2;minPoolSize=1;"; var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ConnectionString; + conn1.ConnectionString = connectionString; conn1.Open(); Assert.AreEqual(ConnectionState.Open, conn1.State); var conn2 = new SnowflakeDbConnection(); - conn2.ConnectionString = ConnectionString + " retryCount=1"; + conn2.ConnectionString = connectionString + "retryCount=1"; conn2.Open(); Assert.AreEqual(ConnectionState.Open, conn2.State); var conn3 = new SnowflakeDbConnection(); - conn3.ConnectionString = ConnectionString + " retryCount=2"; + conn3.ConnectionString = connectionString + "retryCount=2"; conn3.Open(); Assert.AreEqual(ConnectionState.Open, conn3.State); @@ -324,6 +338,25 @@ public void TestConnectionPoolExpirationWorks() Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); } + [Test] + public void TestMinPoolSize() + { + // arrange + var connection = new SnowflakeDbConnection(); + connection.ConnectionString = ConnectionString + "application=TestMinPoolSize;minPoolSize=3"; + + // act + connection.Open(); + Thread.Sleep(3000); + + // assert + var pool = SnowflakeDbConnectionPool.GetPool(connection.ConnectionString); + Assert.AreEqual(3, pool.GetCurrentPoolSize()); + + // cleanup + connection.Close(); + } + private SnowflakeDbConnection OpenedConnection(string connectionString) { var connection = new SnowflakeDbConnection(); diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs index ee1b0c5ba..3e58448c4 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs @@ -49,20 +49,6 @@ public static void AfterAllTests() { SnowflakeDbConnectionPool.ClearAllPools(); } - - [Test] - public void TestBasicConnectionPool() - { - SnowflakeDbConnectionPool.SetMaxPoolSize(1); - - var conn1 = new SnowflakeDbConnection(ConnectionString); - conn1.Open(); - Assert.AreEqual(ConnectionState.Open, conn1.State); - conn1.Close(); - - Assert.AreEqual(ConnectionState.Closed, conn1.State); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); - } [Test] public void TestConnectionPoolMultiThreading() diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs index fe91f8610..08f18c97a 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs @@ -35,6 +35,20 @@ public static void AfterAllTests() SnowflakeDbConnectionPool.ClearAllPools(); } + [Test] + public void TestBasicConnectionPool() + { + SnowflakeDbConnectionPool.SetMaxPoolSize(1); + + var conn1 = new SnowflakeDbConnection(ConnectionString); + conn1.Open(); + Assert.AreEqual(ConnectionState.Open, conn1.State); + conn1.Close(); + + Assert.AreEqual(ConnectionState.Closed, conn1.State); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + } + [Test] public void TestConcurrentConnectionPooling() { diff --git a/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs b/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs index 1672ddf83..fc92b7060 100644 --- a/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs +++ b/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs @@ -19,8 +19,8 @@ namespace Snowflake.Data.Tests.UnitTests class ConnectionPoolManagerTest { private readonly ConnectionPoolManager _connectionPoolManager = new ConnectionPoolManager(); - private const string ConnectionString1 = "database=D1;warehouse=W1;account=A1;user=U1;password=P1;role=R1;"; - private const string ConnectionString2 = "database=D2;warehouse=W2;account=A2;user=U2;password=P2;role=R2;"; + private const string ConnectionString1 = "database=D1;warehouse=W1;account=A1;user=U1;password=P1;role=R1;minPoolSize=1;"; + private const string ConnectionString2 = "database=D2;warehouse=W2;account=A2;user=U2;password=P2;role=R2;minPoolSize=1;"; private readonly SecureString _password = new SecureString(); private static PoolConfig s_poolConfig; diff --git a/Snowflake.Data.Tests/UnitTests/Session/SessionOrCreationTokensTest.cs b/Snowflake.Data.Tests/UnitTests/Session/SessionOrCreationTokensTest.cs new file mode 100644 index 000000000..12cbf36b3 --- /dev/null +++ b/Snowflake.Data.Tests/UnitTests/Session/SessionOrCreationTokensTest.cs @@ -0,0 +1,62 @@ +using System; +using System.Linq; +using NUnit.Framework; +using Snowflake.Data.Core; +using Snowflake.Data.Core.Session; + +namespace Snowflake.Data.Tests.UnitTests.Session +{ + [TestFixture] + public class SessionOrCreationTokensTest + { + private SFSession _session = new SFSession("account=test;user=test;password=test", null); + + [Test] + public void TestNoBackgroundSessionsToCreateWhenInitialisedWithSession() + { + // arrange + var sessionOrTokens = new SessionOrCreationTokens(_session); + + // act + var backgroundCreationTokens = sessionOrTokens.BackgroundSessionCreationTokens(); + + Assert.AreEqual(0, backgroundCreationTokens.Count); + } + + [Test] + public void TestReturnFirstCreationToken() + { + // arrange + var sessionCreationTokenCounter = new SessionCreationTokenCounter(TimeSpan.FromSeconds(10)); + var tokens = Enumerable.Range(1, 3) + .Select(_ => sessionCreationTokenCounter.NewToken()) + .ToList(); + var sessionOrTokens = new SessionOrCreationTokens(tokens); + + // act + var token = sessionOrTokens.SessionCreationToken(); + + // assert + Assert.AreSame(tokens[0], token); + } + + [Test] + public void TestReturnCreationTokensFromTheSecondOneForBackgroundExecution() + { + // arrange + var sessionCreationTokenCounter = new SessionCreationTokenCounter(TimeSpan.FromSeconds(10)); + var tokens = Enumerable.Range(1, 3) + .Select(_ => sessionCreationTokenCounter.NewToken()) + .ToList(); + var sessionOrTokens = new SessionOrCreationTokens(tokens); + + // act + var backgroundTokens = sessionOrTokens.BackgroundSessionCreationTokens(); + + // assert + Assert.AreEqual(2, backgroundTokens.Count); + Assert.AreSame(tokens[1], backgroundTokens[0]); + Assert.AreSame(tokens[2], backgroundTokens[1]); + } + } +} \ No newline at end of file diff --git a/Snowflake.Data/Core/Session/SessionOrCreationToken.cs b/Snowflake.Data/Core/Session/SessionOrCreationToken.cs deleted file mode 100644 index 4d5e8001e..000000000 --- a/Snowflake.Data/Core/Session/SessionOrCreationToken.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; - -namespace Snowflake.Data.Core.Session -{ - internal class SessionOrCreationToken - { - public SFSession Session { get; } - public SessionCreationToken SessionCreationToken { get; } - - public SessionOrCreationToken(SFSession session) - { - Session = session ?? throw new Exception("Internal error: missing session"); - SessionCreationToken = null; - } - - public SessionOrCreationToken(SessionCreationToken sessionCreationToken) - { - Session = null; - SessionCreationToken = sessionCreationToken ?? throw new Exception("Internal error: missing session creation token"); - } - } -} diff --git a/Snowflake.Data/Core/Session/SessionOrCreationTokens.cs b/Snowflake.Data/Core/Session/SessionOrCreationTokens.cs new file mode 100644 index 000000000..ae97d7233 --- /dev/null +++ b/Snowflake.Data/Core/Session/SessionOrCreationTokens.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Snowflake.Data.Core.Session +{ + internal class SessionOrCreationTokens + { + private static readonly List s_emptySessionCreationTokenList = new List(); + + public SFSession Session { get; } + public List SessionCreationTokens { get; } + + public SessionOrCreationTokens(SFSession session) + { + Session = session ?? throw new Exception("Internal error: missing session"); + SessionCreationTokens = s_emptySessionCreationTokenList; + } + + public SessionOrCreationTokens(List sessionCreationTokens) + { + Session = null; + if (sessionCreationTokens == null || sessionCreationTokens.Count == 0) + { + throw new Exception("Internal error: missing session creation token"); + } + SessionCreationTokens = sessionCreationTokens; + } + + public List BackgroundSessionCreationTokens() + { + if (Session == null) + { + return SessionCreationTokens.Skip(1).ToList(); + } + return SessionCreationTokens; + } + + public SessionCreationToken SessionCreationToken() => SessionCreationTokens.First(); + } +} diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index 7dcdf5f16..329859fc5 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -115,12 +115,15 @@ internal SFSession GetSession(string connStr, SecureString password) s_logger.Debug("SessionPool::GetSession"); if (!GetPooling()) return NewNonPoolingSession(connStr, password); - var sessionOrCreateToken = GetIdleSession(connStr); - if (sessionOrCreateToken.Session != null) + var sessionOrCreateTokens = GetIdleSession(connStr); + if (sessionOrCreateTokens.Session != null) { _sessionPoolEventHandler.OnSessionProvided(this); } - return sessionOrCreateToken.Session ?? NewSession(connStr, password, sessionOrCreateToken.SessionCreationToken); + sessionOrCreateTokens.BackgroundSessionCreationTokens().ForEach(token => + ScheduleNewIdleSession(connStr, password, token) + ); + return sessionOrCreateTokens.Session ?? NewSession(connStr, password, sessionOrCreateTokens.SessionCreationToken()); } internal async Task GetSessionAsync(string connStr, SecureString password, CancellationToken cancellationToken) @@ -128,12 +131,24 @@ internal async Task GetSessionAsync(string connStr, SecureString pass s_logger.Debug("SessionPool::GetSessionAsync"); if (!GetPooling()) return await NewNonPoolingSessionAsync(connStr, password, cancellationToken).ConfigureAwait(false); - var sessionOrCreateToken = GetIdleSession(connStr); - if (sessionOrCreateToken.Session != null) + var sessionOrCreateTokens = GetIdleSession(connStr); + if (sessionOrCreateTokens.Session != null) { _sessionPoolEventHandler.OnSessionProvided(this); } - return sessionOrCreateToken.Session ?? await NewSessionAsync(connStr, password, sessionOrCreateToken.SessionCreationToken, cancellationToken).ConfigureAwait(false); + sessionOrCreateTokens.BackgroundSessionCreationTokens().ForEach(token => + ScheduleNewIdleSession(connStr, password, token) + ); + return sessionOrCreateTokens.Session ?? await NewSessionAsync(connStr, password, sessionOrCreateTokens.SessionCreationToken(), cancellationToken).ConfigureAwait(false); + } + + internal void ScheduleNewIdleSession(string connStr, SecureString password, SessionCreationToken token) + { + Task.Run(() => + { + var session = NewSession(connStr, password, token); + AddSession(session); + }); } internal SFSession GetSession() => GetSession(ConnectionString, Password); @@ -146,7 +161,7 @@ internal void SetSessionPoolEventHandler(ISessionPoolEventHandler sessionPoolEve _sessionPoolEventHandler = sessionPoolEventHandler; } - private SessionOrCreationToken GetIdleSession(string connStr) + private SessionOrCreationTokens GetIdleSession(string connStr) { s_logger.Debug("SessionPool::GetIdleSession"); lock (_sessionPoolLock) @@ -161,34 +176,41 @@ private SessionOrCreationToken GetIdleSession(string connStr) if (session != null) { s_logger.Debug("SessionPool::GetIdleSession - no thread was waiting for a session, an idle session was retrieved from the pool"); - return new SessionOrCreationToken(session); + return new SessionOrCreationTokens(session); } s_logger.Debug("SessionPool::GetIdleSession - no thread was waiting for a session, but could not find any idle session available in the pool"); - if (IsAllowedToCreateNewSession()) + var sessionsCount = AllowedNumberOfNewSessionCreations(); + if (sessionsCount > 0) { - // there is no need to wait for a session since we can create a new one - return new SessionOrCreationToken(_sessionCreationTokenCounter.NewToken()); + // there is no need to wait for a session since we can create new ones + var sessionCreationTokens = Enumerable.Range(1, sessionsCount) + .Select(_ => _sessionCreationTokenCounter.NewToken()) + .ToList(); + return new SessionOrCreationTokens(sessionCreationTokens); } } } - return new SessionOrCreationToken(WaitForSession(connStr)); + return new SessionOrCreationTokens(WaitForSession(connStr)); } - private bool IsAllowedToCreateNewSession() + private int AllowedNumberOfNewSessionCreations() { if (!_waitingForIdleSessionQueue.IsWaitingEnabled()) { s_logger.Debug($"SessionPool - creating of new sessions is not limited"); - return true; + return 1; // waiting disabled means we are in .... TODO } var currentSize = GetCurrentPoolSize(); if (currentSize < _poolConfig.MaxPoolSize) { - s_logger.Debug($"SessionPool - allowed to create a session, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}"); - return true; + var maxSessionsToCreate = _poolConfig.MaxPoolSize - currentSize; + var sessionsNeeded = Math.Max(_poolConfig.MinPoolSize - currentSize, 1); + var sessionsToCreate = Math.Min(sessionsNeeded, maxSessionsToCreate); + s_logger.Debug($"SessionPool - allowed to create {sessionsToCreate} sessions, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}"); + return sessionsToCreate; } s_logger.Debug($"SessionPool - not allowed to create a session, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}"); - return false; + return 0; } private SFSession WaitForSession(string connStr)