Skip to content

Commit

Permalink
SNOW-937190 max pool size reached with wait support and timeout excep…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
sfc-gh-mhofman committed Nov 13, 2023
1 parent 186a6ea commit a12dadc
Showing 1 changed file with 48 additions and 2 deletions.
50 changes: 48 additions & 2 deletions Snowflake.Data/Core/Session/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ sealed class SessionPool : IDisposable
internal SecureString Password { get; }
private bool _pooling = true;
private int _busySessions;
private bool _allowExceedMaxPoolSize = true;
private bool _allowExceedMaxPoolSize = true; // differentiates behavior when max size reached
private int _waitForIdleSessionSleepTimeout = 5;
private int _waitForIdleSessionTimeout = 120;

private SessionPool()
{
Expand All @@ -45,7 +47,7 @@ private SessionPool(string connectionString, SecureString password) : this()
{
ConnectionString = connectionString;
Password = password;
_allowExceedMaxPoolSize = false; // TODO: SNOW-937190
_allowExceedMaxPoolSize = false;
}

internal static SessionPool CreateSessionCache() => new SessionPool();
Expand Down Expand Up @@ -142,6 +144,11 @@ private SFSession GetIdleSession(string connStr)
private SFSession NewSession(String connectionString, SecureString password)
{
s_logger.Debug("SessionPool::NewSession");
if (!_allowExceedMaxPoolSize && GetCurrentPoolSize() >= MaxPoolSize)
{
s_logger.Warn("Max pool size reached");
return WaitForPooledSession(CancellationToken.None);
}
try
{
var session = s_sessionFactory.NewSession(connectionString, password);
Expand All @@ -165,6 +172,11 @@ private SFSession NewSession(String connectionString, SecureString password)
private Task<SFSession> NewSessionAsync(String connectionString, SecureString password, CancellationToken cancellationToken)
{
s_logger.Debug("SessionPool::NewSessionAsync");
if (!_allowExceedMaxPoolSize && GetCurrentPoolSize() >= MaxPoolSize)
{
s_logger.Warn("Max pool size reached");
return Task.Run(() => WaitForPooledSession(cancellationToken), cancellationToken);
}
var session = s_sessionFactory.NewSession(connectionString, password);
_busySessions++;
return session
Expand All @@ -183,12 +195,46 @@ private Task<SFSession> NewSessionAsync(String connectionString, SecureString pa
return session;
}, TaskContinuationOptions.NotOnCanceled);
}

private SFSession WaitForPooledSession(CancellationToken cancellationToken)
{
if (!_pooling)
return null;

long start = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
while (true)
{
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

lock (s_sessionPoolLock)
{
var session = GetIdleSession(_connectionString);
if (session != null)
{
_idleSessions.Remove(session);
_busySessions++;
return session;
}
}

Thread.Sleep(_waitForIdleSessionSleepTimeout);
long now = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
if (start + _waitForIdleSessionTimeout < now)
throw new SnowflakeDbException(
new TimeoutException("No free connections in the pool."), // TODO:
SnowflakeDbException.CONNECTION_FAILURE_SSTATE,
SFError.REQUEST_TIMEOUT,
"Unable to connect.");
}
}

internal bool AddSession(SFSession session)
{
s_logger.Debug("SessionPool::AddSession");
if (!_pooling)
return false;

long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
if (session.IsNotOpen() || session.IsExpired(_timeout, timeNow))
{
Expand Down

0 comments on commit a12dadc

Please sign in to comment.