Skip to content

Commit

Permalink
SNOW-902608 fixes in Async operations for getting a session from the …
Browse files Browse the repository at this point in the history
…pool
  • Loading branch information
sfc-gh-mhofman committed Oct 4, 2023
1 parent e3b540c commit b1bc741
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1898,7 +1898,7 @@ public void TestCloseAsyncFailure()
{
using (var conn = new MockSnowflakeDbConnection(new MockCloseSessionException()))
{
SnowflakeDbConnectionPool.SetPooling(false);
SnowflakeDbConnectionPool.GetPool(ConnectionString, null).SetPooling(false);
conn.ConnectionString = ConnectionString;
Assert.AreEqual(conn.State, ConnectionState.Closed);
Task task = null;
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void TestConnectionPoolIsFull()
conn3.ConnectionString = ConnectionString + " retryCount=2";
conn3.Open();
Assert.AreEqual(ConnectionState.Open, conn3.State);
SnowflakeDbConnectionPool.ClearAllPools();
// SnowflakeDbConnectionPool.ClearAllPools(); // TODO: check it!

conn1.Close();
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize());
Expand Down
2 changes: 2 additions & 0 deletions Snowflake.Data/Client/SnowflakeDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public override Task OpenAsync(CancellationToken cancellationToken)
if (previousTask.IsFaulted)
{
// Exception from SfSession.OpenAsync
_connectionState = ConnectionState.Closed;
RethrowOnSessionOpenFailure(previousTask.Exception);
}
else if (previousTask.IsCanceled)
Expand All @@ -289,6 +290,7 @@ public override Task OpenAsync(CancellationToken cancellationToken)
}
else
{
SfSession = previousTask.Result;
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
// Only continue if the session was opened successfully
OnSessionEstablished();
Expand Down
17 changes: 15 additions & 2 deletions Snowflake.Data/Client/SnowflakeDbConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class SnowflakeDbConnectionPool
private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger<SnowflakeDbConnectionPool>();
private static readonly Object s_instanceLock = new Object();
private static ConnectionPoolManagerBase s_connectionPoolManager;
private static readonly PoolManagerVersion s_poolVersion = PoolManagerVersion.Version1;
private static PoolManagerVersion s_poolVersion = PoolManagerVersion.Version2;

public static ConnectionPoolManagerBase Instance
{
Expand Down Expand Up @@ -100,7 +100,7 @@ internal static SFSession GetSession(string connectionString, SecureString passw
return Instance.GetSession(connectionString, password);
}

internal static Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
internal static Task<SFSession> GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
s_logger.Debug("SnowflakeDbConnectionPool::GetSessionAsync");
return Instance.GetSessionAsync(connectionString, password, cancellationToken);
Expand All @@ -116,5 +116,18 @@ internal static PoolManagerVersion GetVersion()
{
return s_poolVersion;
}

public static void InternalTogglePreviousPool()
{
s_logger.Debug("ClearAllPools");
if (Instance.GetCurrentPoolSize() > 0)
throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Switch pool version before connections are established!");
ClearAllPools();
lock (s_instanceLock)
{
s_poolVersion = PoolManagerVersion.Version1;
s_connectionPoolManager = ProvideConnectionPoolManager();
}
}
}
}
2 changes: 1 addition & 1 deletion Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ internal SFSession GetSession(string connectionString, SecureString password)
return sessionPool.GetSession(connectionString, password);
}

internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
internal Task<SFSession> GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
return GetPool(connectionString, password).GetSessionAsync(connectionString, password, cancellationToken);
}
Expand Down
6 changes: 6 additions & 0 deletions Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Linq;
using Snowflake.Data.Core.Session;

namespace Snowflake.Data.Core.ConnectionPool
Expand All @@ -16,5 +17,10 @@ protected override void ApplyPoolDefaults(SessionPool pool)
// pool.SetMinPoolSize(MinPoolSizeDefault); // TODO: SNOW-902610
pool.SetMaxPoolSize(MaxPoolSizeDefault);
}

public new int GetCurrentPoolSize()
{
return Pools.Values.Sum(sessionPool => sessionPool.GetCurrentPoolSize());
}
}
}
63 changes: 33 additions & 30 deletions Snowflake.Data/Core/Session/SessionPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Security;
using System.Threading;
Expand Down Expand Up @@ -83,7 +84,7 @@ internal SFSession GetSession(string connectionString, SecureString password)
return ProvidePooledSession(connectionString, password);
}

internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
internal Task<SFSession> GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
if (!_pooling)
return OpenNewSessionAsync(connectionString, password, cancellationToken);
Expand Down Expand Up @@ -149,23 +150,22 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas

lock (s_sessionPoolLock)
{
if (GetCurrentPoolSize() < _maxPoolSize)
if (GetIdleSessionsSize() > 0)
{
if (GetIdleSessionsSize() == 0)
{
var newSession = OpenNewSession(connectionString, password);
_busySessions++;
s_logger.Info($"Created new pooled session with sid {newSession.sessionId}");
return newSession;
}

var sessionFromPool = _sessionPool[0]; // oldest idle
_sessionPool.Remove(sessionFromPool);
_busySessions++;
s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}");
return sessionFromPool;
}

if (GetCurrentPoolSize() < MaxPoolSize)
{
var newSession = OpenNewSession(connectionString, password);
_busySessions++;
s_logger.Info($"Created new pooled session with sid {newSession.sessionId}");
return newSession;
}
s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections");
}

Expand All @@ -179,30 +179,30 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas
}


private Task ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
private Task<SFSession> ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
if (!_pooling)
return null;

lock (s_sessionPoolLock)
{
if (GetCurrentPoolSize() < _maxPoolSize)
if (GetIdleSessionsSize() > 0)
{
if (GetIdleSessionsSize() == 0)
{
var session = OpenNewSessionAsync(connectionString, password, cancellationToken);
_busySessions++;
s_logger.Info($"Creating new pooled session");
return session;
}

var sessionFromPool = _sessionPool[0]; // oldest idle
_sessionPool.Remove(sessionFromPool);
_busySessions++;
s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}");
return Task.FromResult(sessionFromPool);
}

if (GetCurrentPoolSize() < MaxPoolSize)
{
var session = OpenNewSessionAsync(connectionString, password, cancellationToken);
_busySessions++;
s_logger.Info($"Creating new pooled session");
return session;
}

s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections");
}

Expand Down Expand Up @@ -301,18 +301,21 @@ internal SFSession OpenNewSession(string connectionString, SecureString password
return session;
}

internal Task OpenNewSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
internal Task<SFSession> OpenNewSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
SFSession session = new SFSession(connectionString, password);
try
{
return session.OpenAsync(cancellationToken);
}
catch (Exception e)
{
s_logger.Error("Unable to connect", e);
return Task.FromException(e);
}
return session
.OpenAsync(cancellationToken)
.ContinueWith(previousTask =>
{
if (previousTask.IsFaulted)
{
Debug.Assert(previousTask.Exception != null, "previousTask.Exception != null");
throw previousTask.Exception;
}

return session;
}, cancellationToken);
}

internal bool AddSession(SFSession session)
Expand Down

0 comments on commit b1bc741

Please sign in to comment.