diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs new file mode 100644 index 000000000..56814a99f --- /dev/null +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs @@ -0,0 +1,48 @@ +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using Snowflake.Data.Client; +using Snowflake.Data.Core.Session; +using Snowflake.Data.Tests.Util; + +namespace Snowflake.Data.Tests.IntegrationTests +{ + [TestFixture] + [NonParallelizable] + public class ConnectionMultiplePoolsAsyncIT: SFBaseTestAsync + { + private readonly PoolConfig _previousPoolConfig = new PoolConfig(); + + [SetUp] + public new void BeforeTest() + { + SnowflakeDbConnectionPool.SetConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); + SnowflakeDbConnectionPool.ClearAllPools(); + } + + [TearDown] + public new void AfterTest() + { + _previousPoolConfig.Reset(); + } + + [Test] + public async Task TestMinPoolSizeAsync() + { + // arrange + var connection = new SnowflakeDbConnection(); + connection.ConnectionString = ConnectionString + "application=TestMinPoolSizeAsync;minPoolSize=3"; + + // act + await connection.OpenAsync().ConfigureAwait(false); + Thread.Sleep(3000); + + // assert + var pool = SnowflakeDbConnectionPool.GetPool(connection.ConnectionString); + Assert.AreEqual(3, pool.GetCurrentPoolSize()); + + // cleanup + await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false); + } + } +} diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs index 51355b323..d0f412ac4 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs @@ -1,6 +1,7 @@ using System; using System.Data; using System.Linq; +using System.Threading; using System.Threading.Tasks; using NUnit.Framework; using Snowflake.Data.Client; @@ -51,20 +52,21 @@ public void TestBasicConnectionPool() [Test] public void TestReuseSessionInConnectionPool() // old name: TestConnectionPool { - var conn1 = new SnowflakeDbConnection(ConnectionString); + var connectionString = ConnectionString + "minPoolSize=1"; + var conn1 = new SnowflakeDbConnection(connectionString); conn1.Open(); Assert.AreEqual(ConnectionState.Open, conn1.State); conn1.Close(); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(connectionString).GetCurrentPoolSize()); var conn2 = new SnowflakeDbConnection(); - conn2.ConnectionString = ConnectionString; + conn2.ConnectionString = connectionString; conn2.Open(); Assert.AreEqual(ConnectionState.Open, conn2.State); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(connectionString).GetCurrentPoolSize()); conn2.Close(); - Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize()); + Assert.AreEqual(1, SnowflakeDbConnectionPool.GetPool(connectionString).GetCurrentPoolSize()); Assert.AreEqual(ConnectionState.Closed, conn1.State); Assert.AreEqual(ConnectionState.Closed, conn2.State); } @@ -72,16 +74,16 @@ public void TestReuseSessionInConnectionPool() // old name: TestConnectionPool [Test] public void TestReuseSessionInConnectionPoolReachingMaxConnections() // old name: TestConnectionPoolFull { - var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString); - pool.SetMaxPoolSize(2); + var connectionString = ConnectionString + "maxPoolSize=2;minPoolSize=1"; + var pool = SnowflakeDbConnectionPool.GetPool(connectionString); var conn1 = new SnowflakeDbConnection(); - conn1.ConnectionString = ConnectionString; + conn1.ConnectionString = connectionString; conn1.Open(); Assert.AreEqual(ConnectionState.Open, conn1.State); var conn2 = new SnowflakeDbConnection(); - conn2.ConnectionString = ConnectionString; + conn2.ConnectionString = connectionString; conn2.Open(); Assert.AreEqual(ConnectionState.Open, conn2.State); @@ -91,12 +93,12 @@ public void TestReuseSessionInConnectionPoolReachingMaxConnections() // old name Assert.AreEqual(2, pool.GetCurrentPoolSize()); var conn3 = new SnowflakeDbConnection(); - conn3.ConnectionString = ConnectionString; + conn3.ConnectionString = connectionString; conn3.Open(); Assert.AreEqual(ConnectionState.Open, conn3.State); var conn4 = new SnowflakeDbConnection(); - conn4.ConnectionString = ConnectionString; + conn4.ConnectionString = connectionString; conn4.Open(); Assert.AreEqual(ConnectionState.Open, conn4.State); @@ -115,7 +117,7 @@ public void TestReuseSessionInConnectionPoolReachingMaxConnections() // old name public void TestWaitForTheIdleConnectionWhenExceedingMaxConnectionsLimit() { // arrange - var connectionString = ConnectionString + "application=TestWaitForMaxSize1;waitingForIdleSessionTimeout=1s;maxPoolSize=2"; + var connectionString = ConnectionString + "application=TestWaitForMaxSize1;waitingForIdleSessionTimeout=1s;maxPoolSize=2;minPoolSize=1"; var pool = SnowflakeDbConnectionPool.GetPool(connectionString); Assert.AreEqual(0, pool.GetCurrentPoolSize(), "expecting pool to be empty"); var conn1 = OpenedConnection(connectionString); @@ -141,7 +143,7 @@ public void TestWaitForTheIdleConnectionWhenExceedingMaxConnectionsLimit() public void TestWaitForTheIdleConnectionWhenExceedingMaxConnectionsLimitAsync() { // arrange - var connectionString = ConnectionString + "application=TestWaitForMaxSize2;waitingForIdleSessionTimeout=1s;maxPoolSize=2"; + var connectionString = ConnectionString + "application=TestWaitForMaxSize2;waitingForIdleSessionTimeout=1s;maxPoolSize=2;minPoolSize=1"; var pool = SnowflakeDbConnectionPool.GetPool(connectionString); Assert.AreEqual(0, pool.GetCurrentPoolSize(), "expecting pool to be empty"); var conn1 = OpenedConnection(connectionString); @@ -224,10 +226,10 @@ public void TestWaitInAQueueForAnIdleSession() public void TestBusyAndIdleConnectionsCountedInPoolSize() { // arrange - var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString); - pool.SetMaxPoolSize(2); + var connectionString = ConnectionString + "maxPoolSize=2;minPoolSize=1"; + var pool = SnowflakeDbConnectionPool.GetPool(connectionString); var connection = new SnowflakeDbConnection(); - connection.ConnectionString = ConnectionString; + connection.ConnectionString = connectionString; // act connection.Open(); @@ -279,7 +281,7 @@ public void TestConnectionPoolDisable() [Test] public void TestNewConnectionPoolClean() { - var connectionString = ConnectionString + "maxPoolSize=2;"; + var connectionString = ConnectionString + "maxPoolSize=2;minPoolSize=1;"; var conn1 = new SnowflakeDbConnection(); conn1.ConnectionString = connectionString; conn1.Open(); @@ -313,7 +315,7 @@ public void TestNewConnectionPoolClean() [Test] public void TestConnectionPoolExpirationWorks() { - var connectionString = ConnectionString + "expirationTimeout=0;maxPoolSize=2"; + var connectionString = ConnectionString + "expirationTimeout=0;maxPoolSize=2;minPoolSize=1"; var conn1 = new SnowflakeDbConnection(); conn1.ConnectionString = connectionString; conn1.Open(); @@ -333,6 +335,25 @@ public void TestConnectionPoolExpirationWorks() Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(connectionString).GetCurrentPoolSize()); } + [Test] + public void TestMinPoolSize() + { + // arrange + var connection = new SnowflakeDbConnection(); + connection.ConnectionString = ConnectionString + "application=TestMinPoolSize;minPoolSize=3"; + + // act + connection.Open(); + Thread.Sleep(3000); + + // assert + var pool = SnowflakeDbConnectionPool.GetPool(connection.ConnectionString); + Assert.AreEqual(3, pool.GetCurrentPoolSize()); + + // cleanup + connection.Close(); + } + private SnowflakeDbConnection OpenedConnection(string connectionString) { var connection = new SnowflakeDbConnection(); diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs index e848e43e1..c3d69d97d 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs @@ -102,6 +102,7 @@ public void TestConnectionPoolWithDispose() conn1.ConnectionString = "bad connection string"; Assert.Throws(() => conn1.Open()); conn1.Close(); + Thread.Sleep(3000); // minPoolSize = 2 causes that another thread has been started. We sleep to make that thread finish. Assert.AreEqual(ConnectionState.Closed, conn1.State); Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(conn1.ConnectionString).GetCurrentPoolSize()); diff --git a/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs b/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs index c0ddb6c02..71cdfe396 100644 --- a/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs +++ b/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs @@ -19,8 +19,8 @@ namespace Snowflake.Data.Tests.UnitTests class ConnectionPoolManagerTest { private readonly ConnectionPoolManager _connectionPoolManager = new ConnectionPoolManager(); - private const string ConnectionString1 = "database=D1;warehouse=W1;account=A1;user=U1;password=P1;role=R1;"; - private const string ConnectionString2 = "database=D2;warehouse=W2;account=A2;user=U2;password=P2;role=R2;"; + private const string ConnectionString1 = "database=D1;warehouse=W1;account=A1;user=U1;password=P1;role=R1;minPoolSize=1;"; + private const string ConnectionString2 = "database=D2;warehouse=W2;account=A2;user=U2;password=P2;role=R2;minPoolSize=1;"; private readonly SecureString _password = new SecureString(); private static PoolConfig s_poolConfig; diff --git a/Snowflake.Data.Tests/UnitTests/Session/SessionOrCreationTokensTest.cs b/Snowflake.Data.Tests/UnitTests/Session/SessionOrCreationTokensTest.cs new file mode 100644 index 000000000..12cbf36b3 --- /dev/null +++ b/Snowflake.Data.Tests/UnitTests/Session/SessionOrCreationTokensTest.cs @@ -0,0 +1,62 @@ +using System; +using System.Linq; +using NUnit.Framework; +using Snowflake.Data.Core; +using Snowflake.Data.Core.Session; + +namespace Snowflake.Data.Tests.UnitTests.Session +{ + [TestFixture] + public class SessionOrCreationTokensTest + { + private SFSession _session = new SFSession("account=test;user=test;password=test", null); + + [Test] + public void TestNoBackgroundSessionsToCreateWhenInitialisedWithSession() + { + // arrange + var sessionOrTokens = new SessionOrCreationTokens(_session); + + // act + var backgroundCreationTokens = sessionOrTokens.BackgroundSessionCreationTokens(); + + Assert.AreEqual(0, backgroundCreationTokens.Count); + } + + [Test] + public void TestReturnFirstCreationToken() + { + // arrange + var sessionCreationTokenCounter = new SessionCreationTokenCounter(TimeSpan.FromSeconds(10)); + var tokens = Enumerable.Range(1, 3) + .Select(_ => sessionCreationTokenCounter.NewToken()) + .ToList(); + var sessionOrTokens = new SessionOrCreationTokens(tokens); + + // act + var token = sessionOrTokens.SessionCreationToken(); + + // assert + Assert.AreSame(tokens[0], token); + } + + [Test] + public void TestReturnCreationTokensFromTheSecondOneForBackgroundExecution() + { + // arrange + var sessionCreationTokenCounter = new SessionCreationTokenCounter(TimeSpan.FromSeconds(10)); + var tokens = Enumerable.Range(1, 3) + .Select(_ => sessionCreationTokenCounter.NewToken()) + .ToList(); + var sessionOrTokens = new SessionOrCreationTokens(tokens); + + // act + var backgroundTokens = sessionOrTokens.BackgroundSessionCreationTokens(); + + // assert + Assert.AreEqual(2, backgroundTokens.Count); + Assert.AreSame(tokens[1], backgroundTokens[0]); + Assert.AreSame(tokens[2], backgroundTokens[1]); + } + } +} \ No newline at end of file diff --git a/Snowflake.Data/Core/Session/SessionOrCreationToken.cs b/Snowflake.Data/Core/Session/SessionOrCreationToken.cs deleted file mode 100644 index 4d5e8001e..000000000 --- a/Snowflake.Data/Core/Session/SessionOrCreationToken.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; - -namespace Snowflake.Data.Core.Session -{ - internal class SessionOrCreationToken - { - public SFSession Session { get; } - public SessionCreationToken SessionCreationToken { get; } - - public SessionOrCreationToken(SFSession session) - { - Session = session ?? throw new Exception("Internal error: missing session"); - SessionCreationToken = null; - } - - public SessionOrCreationToken(SessionCreationToken sessionCreationToken) - { - Session = null; - SessionCreationToken = sessionCreationToken ?? throw new Exception("Internal error: missing session creation token"); - } - } -} diff --git a/Snowflake.Data/Core/Session/SessionOrCreationTokens.cs b/Snowflake.Data/Core/Session/SessionOrCreationTokens.cs new file mode 100644 index 000000000..ae97d7233 --- /dev/null +++ b/Snowflake.Data/Core/Session/SessionOrCreationTokens.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Snowflake.Data.Core.Session +{ + internal class SessionOrCreationTokens + { + private static readonly List s_emptySessionCreationTokenList = new List(); + + public SFSession Session { get; } + public List SessionCreationTokens { get; } + + public SessionOrCreationTokens(SFSession session) + { + Session = session ?? throw new Exception("Internal error: missing session"); + SessionCreationTokens = s_emptySessionCreationTokenList; + } + + public SessionOrCreationTokens(List sessionCreationTokens) + { + Session = null; + if (sessionCreationTokens == null || sessionCreationTokens.Count == 0) + { + throw new Exception("Internal error: missing session creation token"); + } + SessionCreationTokens = sessionCreationTokens; + } + + public List BackgroundSessionCreationTokens() + { + if (Session == null) + { + return SessionCreationTokens.Skip(1).ToList(); + } + return SessionCreationTokens; + } + + public SessionCreationToken SessionCreationToken() => SessionCreationTokens.First(); + } +} diff --git a/Snowflake.Data/Core/Session/SessionPool.cs b/Snowflake.Data/Core/Session/SessionPool.cs index ad7d1f401..15ade086e 100644 --- a/Snowflake.Data/Core/Session/SessionPool.cs +++ b/Snowflake.Data/Core/Session/SessionPool.cs @@ -117,13 +117,16 @@ internal SFSession GetSession(string connStr, SecureString password) s_logger.Debug("SessionPool::GetSession"); if (!GetPooling()) return NewNonPoolingSession(connStr, password); - var sessionOrCreateToken = GetIdleSession(connStr); - if (sessionOrCreateToken.Session != null) + var sessionOrCreateTokens = GetIdleSession(connStr); + if (sessionOrCreateTokens.Session != null) { _sessionPoolEventHandler.OnSessionProvided(this); } + sessionOrCreateTokens.BackgroundSessionCreationTokens().ForEach(token => + ScheduleNewIdleSession(connStr, password, token) + ); WarnAboutOverridenConfig(); - return sessionOrCreateToken.Session ?? NewSession(connStr, password, sessionOrCreateToken.SessionCreationToken); + return sessionOrCreateTokens.Session ?? NewSession(connStr, password, sessionOrCreateTokens.SessionCreationToken()); } internal async Task GetSessionAsync(string connStr, SecureString password, CancellationToken cancellationToken) @@ -131,13 +134,25 @@ internal async Task GetSessionAsync(string connStr, SecureString pass s_logger.Debug("SessionPool::GetSessionAsync"); if (!GetPooling()) return await NewNonPoolingSessionAsync(connStr, password, cancellationToken).ConfigureAwait(false); - var sessionOrCreateToken = GetIdleSession(connStr); - if (sessionOrCreateToken.Session != null) + var sessionOrCreateTokens = GetIdleSession(connStr); + if (sessionOrCreateTokens.Session != null) { _sessionPoolEventHandler.OnSessionProvided(this); } + sessionOrCreateTokens.BackgroundSessionCreationTokens().ForEach(token => + ScheduleNewIdleSession(connStr, password, token) + ); WarnAboutOverridenConfig(); - return sessionOrCreateToken.Session ?? await NewSessionAsync(connStr, password, sessionOrCreateToken.SessionCreationToken, cancellationToken).ConfigureAwait(false); + return sessionOrCreateTokens.Session ?? await NewSessionAsync(connStr, password, sessionOrCreateTokens.SessionCreationToken(), cancellationToken).ConfigureAwait(false); + } + + internal void ScheduleNewIdleSession(string connStr, SecureString password, SessionCreationToken token) + { + Task.Run(() => + { + var session = NewSession(connStr, password, token); + AddSession(session); + }); } private void WarnAboutOverridenConfig() @@ -160,7 +175,7 @@ internal void SetSessionPoolEventHandler(ISessionPoolEventHandler sessionPoolEve _sessionPoolEventHandler = sessionPoolEventHandler; } - private SessionOrCreationToken GetIdleSession(string connStr) + private SessionOrCreationTokens GetIdleSession(string connStr) { s_logger.Debug("SessionPool::GetIdleSession"); lock (_sessionPoolLock) @@ -175,34 +190,41 @@ private SessionOrCreationToken GetIdleSession(string connStr) if (session != null) { s_logger.Debug("SessionPool::GetIdleSession - no thread was waiting for a session, an idle session was retrieved from the pool"); - return new SessionOrCreationToken(session); + return new SessionOrCreationTokens(session); } s_logger.Debug("SessionPool::GetIdleSession - no thread was waiting for a session, but could not find any idle session available in the pool"); - if (IsAllowedToCreateNewSession()) + var sessionsCount = AllowedNumberOfNewSessionCreations(); + if (sessionsCount > 0) { - // there is no need to wait for a session since we can create a new one - return new SessionOrCreationToken(_sessionCreationTokenCounter.NewToken()); + // there is no need to wait for a session since we can create new ones + var sessionCreationTokens = Enumerable.Range(1, sessionsCount) + .Select(_ => _sessionCreationTokenCounter.NewToken()) + .ToList(); + return new SessionOrCreationTokens(sessionCreationTokens); } } } - return new SessionOrCreationToken(WaitForSession(connStr)); + return new SessionOrCreationTokens(WaitForSession(connStr)); } - private bool IsAllowedToCreateNewSession() + private int AllowedNumberOfNewSessionCreations() { if (!IsMultiplePoolsVersion()) { s_logger.Debug($"SessionPool - creating of new sessions is not limited"); - return true; + return 1; // waiting disabled means we are either in old pool or there is no pooling } var currentSize = GetCurrentPoolSize(); if (currentSize < _poolConfig.MaxPoolSize) { - s_logger.Debug($"SessionPool - allowed to create a session, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}"); - return true; + var maxSessionsToCreate = _poolConfig.MaxPoolSize - currentSize; + var sessionsNeeded = Math.Max(_poolConfig.MinPoolSize - currentSize, 1); + var sessionsToCreate = Math.Min(sessionsNeeded, maxSessionsToCreate); + s_logger.Debug($"SessionPool - allowed to create {sessionsToCreate} sessions, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}"); + return sessionsToCreate; } s_logger.Debug($"SessionPool - not allowed to create a session, current pool size is {currentSize} out of {_poolConfig.MaxPoolSize}"); - return false; + return 0; } private bool IsMultiplePoolsVersion() => _waitingForIdleSessionQueue.IsWaitingEnabled();