Skip to content

Commit

Permalink
Provided backward compatible algorithm for session picking for a sing…
Browse files Browse the repository at this point in the history
…le pool behaving as a cache
  • Loading branch information
sfc-gh-mhofman committed Oct 4, 2023
1 parent 1de887f commit 7ee5805
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void TestApplicationName()
try
{
conn.Open();
s_logger.Debug("{appName}");
s_logger.Debug($"{appName}");
Assert.Fail();

}
Expand Down
2 changes: 2 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ public void TestConnectionPoolFull()
conn3.Open();
Assert.AreEqual(ConnectionState.Open, conn3.State);

Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize());

var conn4 = new SnowflakeDbConnection();
conn4.ConnectionString = ConnectionString + " retryCount=3";
conn4.Open();
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data/Core/Session/ConnectionPoolManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected ConnectionPoolManagerBase()

protected virtual string GetPoolKey(string connectionString) => connectionString;

public virtual SessionPool GetPool(string connectionString, SecureString password)
public SessionPool GetPool(string connectionString, SecureString password)
{
string poolKey = GetPoolKey(connectionString);
if (Pools.ContainsKey(poolKey))
Expand Down
3 changes: 3 additions & 0 deletions Snowflake.Data/Core/Session/ConnectionPoolManagerV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ namespace Snowflake.Data.Core.Session
sealed class ConnectionPoolManagerV1 : ConnectionPoolManagerBase
{
private const bool AllowExceedMaxPoolSizeDefault = true;
// private const int MinPoolSizeDefault = 0; // TODO: SNOW-902610
private const int MaxPoolSizeDefault = 10;
private const string SinglePoolKeyForAllDataSources = "CONNECTION_CACHE";
private const SessionPickAlgorithm SessionPicking = SessionPickAlgorithm.MatchConnectionString;

protected override void ApplyPoolDefaults(SessionPool pool)
{
pool.SetAllowExceedMaxPoolSize(AllowExceedMaxPoolSizeDefault);
pool.SetMaxPoolSize(MaxPoolSizeDefault);
pool.SetSessionPickAlgorithm(SessionPicking);
}
protected override PoolManagerVersion GetVersion() => PoolManagerVersion.Version1;
// Same pool for any connection string (backward compatible solution)
Expand Down
2 changes: 2 additions & 0 deletions Snowflake.Data/Core/Session/ConnectionPoolManagerV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ConnectionPoolManagerV2 : ConnectionPoolManagerBase
private const bool AllowExceedMaxPoolSizeDefault = false;
// private const int MinPoolSizeDefault = 0; // TODO: SNOW-902610
private const int MaxPoolSizeDefault = 10;
private const SessionPickAlgorithm SessionPicking = SessionPickAlgorithm.PickOldest;

protected override PoolManagerVersion GetVersion() => PoolManagerVersion.Version2;

Expand All @@ -16,6 +17,7 @@ protected override void ApplyPoolDefaults(SessionPool pool)
pool.SetAllowExceedMaxPoolSize(AllowExceedMaxPoolSizeDefault);
// pool.SetMinPoolSize(MinPoolSizeDefault); // TODO: SNOW-902610
pool.SetMaxPoolSize(MaxPoolSizeDefault);
pool.SetSessionPickAlgorithm(SessionPicking);
}

public new int GetCurrentPoolSize()
Expand Down
136 changes: 61 additions & 75 deletions Snowflake.Data/Core/Session/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

namespace Snowflake.Data.Core.Session
{
public enum SessionPickAlgorithm
{
MatchConnectionString,
PickOldest
}

public class SessionPool : IDisposable
{
private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger<SessionPool>();
Expand All @@ -31,6 +37,7 @@ public class SessionPool : IDisposable
private string _connectionString;
private SecureString _password;
private bool _allowExceedMaxPoolSize; // backward compatibility flag
private SessionPickAlgorithm _sessionPick; // backward compatibility flag

internal SessionPool(string connectionString, SecureString password)
{
Expand Down Expand Up @@ -80,7 +87,7 @@ internal SFSession GetSession(string connectionString, SecureString password)
if (!_pooling)
return OpenNewSession(connectionString, password);
CleanExpiredSessions();
// AsyncEnsureMinPoolSize();
// EnsureMinPoolSizeAsync();
return ProvidePooledSession(connectionString, password);
}

Expand All @@ -89,60 +96,10 @@ internal Task<SFSession> GetSessionAsync(string connectionString, SecureString p
if (!_pooling)
return OpenNewSessionAsync(connectionString, password, cancellationToken);
CleanExpiredSessions();
// AsyncEnsureMinPoolSizeAsync();
// EnsureMinPoolSizeAsync();
return ProvidePooledSessionAsync(connectionString, password, cancellationToken);
}

/*
/// <summary>
/// Creates and opens a new SFSession with the connection string settings of the pool.
/// </summary>
/// <exception cref="SnowflakeDbException">If the session cannot be established</exception>
internal SFSession GetSession(string connectionString, SecureString password)
{
s_logger.Debug("SessionPool::GetSession");
SFSession session = null;
if (!_pooling)
{
session = OpenNewSession(connectionString, password);
s_logger.Info($"create non-poolable session with sid {session.sessionId}");
return session;
}
CleanExpiredSessions();
// EnsureMinPoolSizeAsync(); // TODO:
if (session == null && !_allowExceedMaxPoolSize)
session = WaitForPooledSession(CancellationToken.None);
return session;
}
/// <summary>
/// Creates a request to open a new SFSession with the connection string settings of the pool.
/// </summary>
/// <exception cref="SnowflakeDbException">If the connection string can't be processed</exception>
internal Task<SFSession> GetSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
s_logger.Debug("SessionPool::GetSessionAsync");
var session = GetSessionInternal(connectionString, password);
if (session == null && !_allowExceedMaxPoolSize)
session = WaitForPooledSessionAsync(cancellationToken);
return session;
}
internal SFSession GetSessionInternal(string connectionString, SecureString password)
{
if (!_pooling)
{
var session = OpenNewSession(connectionString, password);
s_logger.Info($"create non-poolable session with sid {session.sessionId}");
return session;
}
CleanExpiredSessions();
// EnsureMinPoolSizeAsync(); // TODO:
return ProvidePooledSession(connectionString, password);
}
*/
private SFSession ProvidePooledSession(string connectionString, SecureString password)
{
if (!_pooling)
Expand All @@ -152,11 +109,14 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas
{
if (GetIdleSessionsSize() > 0)
{
var sessionFromPool = _sessionPool[0]; // oldest idle
_sessionPool.Remove(sessionFromPool);
_busySessions++;
s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}");
return sessionFromPool;
var sessionFromPool = PickSession(connectionString); // oldest idle // TODO:
if (sessionFromPool != null)
{
_sessionPool.Remove(sessionFromPool);
_busySessions++;
s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}");
return sessionFromPool;
}
}

if (GetCurrentPoolSize() < MaxPoolSize)
Expand All @@ -177,7 +137,6 @@ private SFSession ProvidePooledSession(string connectionString, SecureString pas

return WaitForPooledSession(CancellationToken.None);
}


private Task<SFSession> ProvidePooledSessionAsync(string connectionString, SecureString password, CancellationToken cancellationToken)
{
Expand All @@ -188,11 +147,14 @@ private Task<SFSession> ProvidePooledSessionAsync(string connectionString, Secur
{
if (GetIdleSessionsSize() > 0)
{
var sessionFromPool = _sessionPool[0]; // oldest idle
_sessionPool.Remove(sessionFromPool);
_busySessions++;
s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}");
return Task.FromResult(sessionFromPool);
var sessionFromPool = PickSession(connectionString);
if (sessionFromPool != null)
{
_sessionPool.Remove(sessionFromPool);
_busySessions++;
s_logger.Debug($"Reused pooled session with sid {sessionFromPool.sessionId}");
return Task.FromResult(sessionFromPool);
}
}

if (GetCurrentPoolSize() < MaxPoolSize)
Expand Down Expand Up @@ -228,19 +190,18 @@ private SFSession WaitForPooledSession(CancellationToken cancellationToken)
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

if (GetIdleSessionsSize() > 0)
lock (s_sessionPoolLock)
{
lock (s_sessionPoolLock)
session = PickSession(_connectionString);
if (session != null)
{
session = _sessionPool[0];
_sessionPool.Remove(session);
_busySessions++;
return session;
}
}

Thread.Sleep(_openSessionSleepTimeout);

Thread.Sleep(_openSessionSleepTimeout);
long now = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
if (start + _openSessionTimeout < now)
throw new SnowflakeDbException(
Expand All @@ -253,6 +214,28 @@ private SFSession WaitForPooledSession(CancellationToken cancellationToken)
return session;
}

// TODO: inject
SFSession PickSession(string connectionString)
{
SFSession session = null;
lock (s_sessionPoolLock)
{
switch (_sessionPick)
{
case SessionPickAlgorithm.MatchConnectionString:
session = _sessionPool.FirstOrDefault(it => it.connStr.Equals(connectionString));
break;
case SessionPickAlgorithm.PickOldest:
session = _sessionPool.Any() ? _sessionPool[0] : null;
break;
}
if (session != null)
_sessionPool.Remove(session);
}

return session;
}

// TODO:
// private void EnsureMinPoolSize()
// {
Expand Down Expand Up @@ -335,7 +318,7 @@ internal bool AddSession(SFSession session)
// TODO: fix because it is counted in the pool
// TODO: lock
if (_busySessions > 0)
_busySessions--;
_busySessions--;
return false;
}

Expand All @@ -351,14 +334,12 @@ internal bool AddSession(SFSession session)
}

if (_busySessions > 0)
{
_busySessions--;
s_logger.Debug($"Connection returned to the pool with sid {session.sessionId}");
_sessionPool.Add(session);
return true;
}
s_logger.Debug($"Connection returned to the pool with sid {session.sessionId}");
_sessionPool.Add(session);
return true;

s_logger.Warn($"Unexpected session with sid {session.sessionId} was not returned to the pool"); // or clear pool was called and session was created before
// s_logger.Warn($"Unexpected session with sid {session.sessionId} was not returned to the pool"); // or clear pool was called and session was created before
return false;

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on MAC (net6.0, AWS)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, AZURE)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on MAC (net6.0, AZURE)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net6.0, AWS)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, GCP)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net6.0, GCP)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, AWS)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, AWS)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, GCP)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, AWS)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, AZURE)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, GCP)

Unreachable code detected

Check warning on line 343 in Snowflake.Data/Core/Session/SessionPool.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, AZURE)

Unreachable code detected
}
}
Expand Down Expand Up @@ -439,6 +420,11 @@ public void SetAllowExceedMaxPoolSize(bool allowExceedMaxPoolSize)
{
_allowExceedMaxPoolSize = allowExceedMaxPoolSize;
}

public void SetSessionPickAlgorithm(SessionPickAlgorithm sessionPicking)
{
_sessionPick = sessionPicking;
}
}

}

0 comments on commit 7ee5805

Please sign in to comment.