Skip to content

Commit

Permalink
SNOW-902611 Extract SessionPool to a separate file + naming conventio…
Browse files Browse the repository at this point in the history
…ns (#775)

### Description
Extracted session pool to a separate file.
Initial changes to apply naming conventions.

### Checklist
- [x] Code compiles correctly
- [x] Code is formatted according to [Coding
Conventions](../CodingConventions.md)
- [ ] Created tests which fail without the change (if possible)
- [x] All tests passing (`dotnet test`)
- [x] Extended the README / documentation, if necessary
- [x] Provide JIRA issue id (if possible) or GitHub issue id in PR name
  • Loading branch information
sfc-gh-mhofman authored Sep 13, 2023
1 parent 78ddc27 commit 622680d
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 195 deletions.
8 changes: 4 additions & 4 deletions Snowflake.Data/Client/SnowflakeDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public override void Close()
{
var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined;

if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.addSession(SfSession))
if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession))
{
logger.Debug($"Session pooled: {SfSession.sessionId}");
}
Expand Down Expand Up @@ -191,7 +191,7 @@ public Task CloseAsync(CancellationToken cancellationToken)
{
var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined;

if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.addSession(SfSession))
if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession))
{
logger.Debug($"Session pooled: {SfSession.sessionId}");
_connectionState = ConnectionState.Closed;
Expand Down Expand Up @@ -246,7 +246,7 @@ public override void Open()
logger.Debug($"Open with a connection already opened: {_connectionState}");
return;
}
SfSession = SnowflakeDbConnectionPool.getSession(this.ConnectionString);
SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString);
if (SfSession != null)
{
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
Expand Down Expand Up @@ -289,7 +289,7 @@ public override Task OpenAsync(CancellationToken cancellationToken)
logger.Debug($"Open with a connection already opened: {_connectionState}");
return Task.CompletedTask;
}
SfSession = SnowflakeDbConnectionPool.getSession(this.ConnectionString);
SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString);
if (SfSession != null)
{
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
Expand Down
201 changes: 10 additions & 191 deletions Snowflake.Data/Client/SnowflakeDbConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,209 +1,28 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Collections.Concurrent;
using System.Security;
using Snowflake.Data.Core;
using Snowflake.Data.Core.Session;
using Snowflake.Data.Log;
using System.Linq;
using Snowflake.Data.Core;

namespace Snowflake.Data.Client
{
sealed class SessionPoolSingleton : IDisposable
{
private static SFLogger logger = SFLoggerFactory.GetLogger<SessionPoolSingleton>();
private static SessionPoolSingleton instance = null;
private static readonly object _sessionPoolLock = new object();

private List<SFSession> sessionPool;
private int maxPoolSize;
private long timeout;
private int MAX_POOL_SIZE = 10;
private const long TIMEOUT = 3600;
private bool pooling = true;

SessionPoolSingleton()
{
lock (_sessionPoolLock)
{
sessionPool = new List<SFSession>();
maxPoolSize = MAX_POOL_SIZE;
timeout = TIMEOUT;
}
}
~SessionPoolSingleton()
{
ClearAllPools();
}

public void Dispose()
{
ClearAllPools();
}

public static SessionPoolSingleton Instance
{
get
{
lock (_sessionPoolLock)
{
if(instance == null)
{
instance = new SessionPoolSingleton();
}
return instance;
}
}
}

private void cleanExpiredSessions()
{
logger.Debug("SessionPool::cleanExpiredSessions");
lock (_sessionPoolLock)
{
long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds();

foreach (var item in sessionPool.ToList())
{
if (item.IsExpired(timeout, timeNow))
{
sessionPool.Remove(item);
item.close();
}
}
}
}

internal SFSession getSession(string connStr)
{
logger.Debug("SessionPool::getSession");
if (!pooling)
return null;
lock (_sessionPoolLock)
{
for (int i = 0; i < sessionPool.Count; i++)
{
if (sessionPool[i].connStr.Equals(connStr))
{
SFSession session = sessionPool[i];
sessionPool.RemoveAt(i);
long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
if (session.IsExpired(timeout, timeNow))
{
session.close();
i--;
}
else
{
logger.Debug($"reuse pooled session with sid {session.sessionId}");
return session;
}
}
}
}
return null;
}
internal bool addSession(SFSession session)
{
logger.Debug("SessionPool::addSession");
if (!pooling)
return false;
long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
if (session.IsNotOpen() || session.IsExpired(timeout, timeNow))
return false;

lock (_sessionPoolLock)
{
if (sessionPool.Count >= maxPoolSize)
{
cleanExpiredSessions();
}
if (sessionPool.Count >= maxPoolSize)
{
// pool is full
return false;
}

logger.Debug($"pool connection with sid {session.sessionId}");
sessionPool.Add(session);
return true;
}
}

internal void ClearAllPools()
{
logger.Debug("SessionPool::ClearAllPools");
lock (_sessionPoolLock)
{
foreach (SFSession session in sessionPool)
{
session.close();
}
sessionPool.Clear();
}
}

public void SetMaxPoolSize(int size)
{
maxPoolSize = size;
}

public int GetMaxPoolSize()
{
return maxPoolSize;
}

public void SetTimeout(long time)
{
timeout = time;
}

public long GetTimeout()
{
return timeout;
}

public int GetCurrentPoolSize()
{
return sessionPool.Count;
}

public bool SetPooling(bool isEnable)
{
if (pooling == isEnable)
return false;
pooling = isEnable;
if (!pooling)
{
ClearAllPools();
}
return true;
}

public bool GetPooling()
{
return pooling;
}
}
public class SnowflakeDbConnectionPool
{
private static SFLogger logger = SFLoggerFactory.GetLogger<SnowflakeDbConnection>();
private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger<SnowflakeDbConnectionPool>();

internal static SFSession getSession(string connStr)
internal static SFSession GetSession(string connStr)
{
logger.Debug("SnowflakeDbConnectionPool::getSession");
return SessionPoolSingleton.Instance.getSession(connStr);
s_logger.Debug("SnowflakeDbConnectionPool::GetSession");
return SessionPoolSingleton.Instance.GetSession(connStr);
}

internal static bool addSession(SFSession session)
internal static bool AddSession(SFSession session)
{
logger.Debug("SnowflakeDbConnectionPool::addSession");
return SessionPoolSingleton.Instance.addSession(session);
s_logger.Debug("SnowflakeDbConnectionPool::AddSession");
return SessionPoolSingleton.Instance.AddSession(session);
}

public static void ClearAllPools()
{
logger.Debug("SnowflakeDbConnectionPool::ClearAllPools");
s_logger.Debug("SnowflakeDbConnectionPool::ClearAllPools");
SessionPoolSingleton.Instance.ClearAllPools();
}

Expand Down
Loading

0 comments on commit 622680d

Please sign in to comment.