Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] SNOW-902608 Connection pool V2 #781

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1898,7 +1898,7 @@ public void TestCloseAsyncFailure()
{
using (var conn = new MockSnowflakeDbConnection(new MockCloseSessionException()))
{
SnowflakeDbConnectionPool.SetPooling(false);
SnowflakeDbConnectionPool.GetPool(ConnectionString, null).SetPooling(false);
conn.ConnectionString = ConnectionString;
Assert.AreEqual(conn.State, ConnectionState.Closed);
Task task = null;
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void TestConnectionPoolIsFull()
conn3.ConnectionString = ConnectionString + " retryCount=2";
conn3.Open();
Assert.AreEqual(ConnectionState.Open, conn3.State);
SnowflakeDbConnectionPool.ClearAllPools();
// SnowflakeDbConnectionPool.ClearAllPools(); // TODO: check it!

conn1.Close();
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize());
Expand Down
2 changes: 2 additions & 0 deletions Snowflake.Data/Client/SnowflakeDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public override Task OpenAsync(CancellationToken cancellationToken)
if (previousTask.IsFaulted)
{
// Exception from SfSession.OpenAsync
_connectionState = ConnectionState.Closed;
RethrowOnSessionOpenFailure(previousTask.Exception);
}
else if (previousTask.IsCanceled)
Expand All @@ -289,6 +290,7 @@ public override Task OpenAsync(CancellationToken cancellationToken)
}
else
{
SfSession = previousTask.Result;
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
// Only continue if the session was opened successfully
OnSessionEstablished();
Expand Down
28 changes: 21 additions & 7 deletions Snowflake.Data/Client/SnowflakeDbConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using Snowflake.Data.Core;
using Snowflake.Data.Core.ConnectionPool;
using Snowflake.Data.Core.Session;
using Snowflake.Data.Log;

Expand All @@ -13,7 +14,7 @@ public class SnowflakeDbConnectionPool
private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger<SnowflakeDbConnectionPool>();
private static readonly Object s_instanceLock = new Object();
private static ConnectionPoolManagerBase s_connectionPoolManager;
private static readonly PoolManagerVersion s_poolVersion = PoolManagerVersion.Version1;
private static PoolManagerVersion s_poolVersion = PoolManagerVersion.Version2;

public static ConnectionPoolManagerBase Instance
{
Expand Down Expand Up @@ -85,10 +86,12 @@ public static bool GetPooling()

private static ConnectionPoolManagerBase ProvideConnectionPoolManager()
{
if (s_poolVersion == PoolManagerVersion.Version1)
return new ConnectionPoolManagerV1();

throw new NotSupportedException("Pool version not supported");
switch (s_poolVersion)
{
case PoolManagerVersion.Version1: return new ConnectionPoolManagerV1();
case PoolManagerVersion.Version2: return new ConnectionPoolManagerV2();
default: throw new NotSupportedException("Pool version not supported");
}
}

internal static SFSession GetSession(string connectionString, SecureString password)
Expand All @@ -97,7 +100,7 @@ internal static SFSession GetSession(string connectionString, SecureString passw
return Instance.GetSession(connectionString, password);
}

internal static Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
internal static Task<SFSession> GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
s_logger.Debug("SnowflakeDbConnectionPool::GetSessionAsync");
return Instance.GetSessionAsync(connectionString, password, cancellationToken);
Expand All @@ -109,6 +112,17 @@ internal static bool AddSession(string connectionString, SecureString password,
return Instance.AddSession(connectionString, password, session);
}


public static void InternalTogglePreviousPool()
{
s_logger.Debug("ClearAllPools");
if (Instance.GetCurrentPoolSize() > 0)
throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Switch pool version before connections are established!");
ClearAllPools();
lock (s_instanceLock)
{
s_poolVersion = PoolManagerVersion.Version1;
s_connectionPoolManager = ProvideConnectionPoolManager();
}
}
}
}
2 changes: 1 addition & 1 deletion Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ internal SFSession GetSession(string connectionString, SecureString password)
return sessionPool.GetSession(connectionString, password);
}

internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
internal Task<SFSession> GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
return GetPool(connectionString, password).GetSessionAsync(connectionString, password, cancellationToken);
}
Expand Down
26 changes: 26 additions & 0 deletions Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Linq;
using Snowflake.Data.Core.Session;

namespace Snowflake.Data.Core.ConnectionPool
{
class ConnectionPoolManagerV2 : ConnectionPoolManagerBase
{
private const bool AllowExceedMaxPoolSizeDefault = false;
// private const int MinPoolSizeDefault = 0; // TODO: SNOW-902610
private const int MaxPoolSizeDefault = 10;

protected override PoolManagerVersion GetVersion() => PoolManagerVersion.Version2;

protected override void ApplyPoolDefaults(SessionPool pool)
{
pool.SetAllowExceedMaxPoolSize(AllowExceedMaxPoolSizeDefault);
// pool.SetMinPoolSize(MinPoolSizeDefault); // TODO: SNOW-902610
pool.SetMaxPoolSize(MaxPoolSizeDefault);
}

public new int GetCurrentPoolSize()
{
return Pools.Values.Sum(sessionPool => sessionPool.GetCurrentPoolSize());
}
}
}
68 changes: 38 additions & 30 deletions Snowflake.Data/Core/Session/SessionPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Security;
using System.Threading;
Expand Down Expand Up @@ -82,7 +83,7 @@ internal SFSession GetSession(string connectionString, SecureString password)
return ProvidePooledSession(connectionString, password);
}

internal Task GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
internal Task<SFSession> GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
if (!_pooling)
return OpenNewSessionAsync(connectionString, password, cancellationToken);
Expand Down Expand Up @@ -148,23 +149,22 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas

lock (s_sessionPoolLock)
{
if (GetCurrentPoolSize() < _maxPoolSize)
if (GetIdleSessionsSize() > 0)
{
if (GetIdleSessionsSize() == 0)
{
var newSession = OpenNewSession(connectionString, password);
_busySessions++;
s_logger.Info($"Created new pooled session with sid {newSession.sessionId}");
return newSession;
}

var sessionFromPool = _sessionPool[0]; // oldest idle
_sessionPool.Remove(sessionFromPool);
_busySessions++;
s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}");
return sessionFromPool;
}

if (GetCurrentPoolSize() < MaxPoolSize)
{
var newSession = OpenNewSession(connectionString, password);
_busySessions++;
s_logger.Info($"Created new pooled session with sid {newSession.sessionId}");
return newSession;
}
s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections");
}

Expand All @@ -178,30 +178,30 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas
}


private Task ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
private Task<SFSession> ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
if (!_pooling)
return null;

lock (s_sessionPoolLock)
{
if (GetCurrentPoolSize() < _maxPoolSize)
if (GetIdleSessionsSize() > 0)
{
if (GetIdleSessionsSize() == 0)
{
var session = OpenNewSessionAsync(connectionString, password, cancellationToken);
_busySessions++;
s_logger.Info($"Creating new pooled session");
return session;
}

var sessionFromPool = _sessionPool[0]; // oldest idle
_sessionPool.Remove(sessionFromPool);
_busySessions++;
s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}");
return Task.FromResult(sessionFromPool);
}

if (GetCurrentPoolSize() < MaxPoolSize)
{
var session = OpenNewSessionAsync(connectionString, password, cancellationToken);
_busySessions++;
s_logger.Info($"Creating new pooled session");
return session;
}

s_logger.Debug($"Pool size {_maxPoolSize} reached, no free idle connections");
}

Expand Down Expand Up @@ -300,18 +300,26 @@ internal SFSession OpenNewSession(string connectionString, SecureString password
return session;
}

internal Task OpenNewSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
internal Task<SFSession> OpenNewSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
SFSession session = new SFSession(connectionString, password);
try
{
return session.OpenAsync(cancellationToken);
}
catch (Exception e)
{
s_logger.Error("Unable to connect", e);
return Task.FromException(e);
}
return session
.OpenAsync(cancellationToken)
.ContinueWith(previousTask =>
{
if (previousTask.IsFaulted)
{
Debug.Assert(previousTask.Exception != null, "previousTask.Exception != null");
throw previousTask.Exception;
}

if (cancellationToken.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
}

return session;
}, cancellationToken);
}

internal bool AddSession(SFSession session)
Expand Down