Skip to content

Commit

Permalink
SNOW-902611 move control over creation of session to session pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mhofman committed Oct 5, 2023
1 parent 6feb3cb commit a65c00f
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 51 deletions.
63 changes: 19 additions & 44 deletions Snowflake.Data/Client/SnowflakeDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,39 +246,21 @@ public override void Open()
logger.Debug($"Open with a connection already opened: {_connectionState}");
return;
}
SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString);
if (SfSession != null)
try
{
OnSessionConnecting();
SfSession = SnowflakeDbConnectionPool.GetSession(ConnectionString, Password);
if (SfSession == null)
throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Could not open session");
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
OnSessionEstablished();
}
else
catch (Exception e)
{
SetSession();
try
{
SfSession.Open();
}
catch (Exception e)
{
// Otherwise when Dispose() is called, the close request would timeout.
_connectionState = ConnectionState.Closed;
logger.Error("Unable to connect", e);
if (!(e.GetType() == typeof(SnowflakeDbException)))
{
throw
new SnowflakeDbException(
e,
SnowflakeDbException.CONNECTION_FAILURE_SSTATE,
SFError.INTERNAL_ERROR,
"Unable to connect. " + e.Message);
}
else
{
throw;
}
}
_connectionState = ConnectionState.Closed;
logger.Error(e.Message);
throw;
}
OnSessionEstablished();
}

public override Task OpenAsync(CancellationToken cancellationToken)
Expand All @@ -289,19 +271,11 @@ public override Task OpenAsync(CancellationToken cancellationToken)
logger.Debug($"Open with a connection already opened: {_connectionState}");
return Task.CompletedTask;
}
SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString);
if (SfSession != null)
{
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
OnSessionEstablished();
return Task.CompletedTask;
}

registerConnectionCancellationCallback(cancellationToken);
SetSession();

return SfSession.OpenAsync(cancellationToken).ContinueWith(
previousTask =>
OnSessionConnecting();
return SnowflakeDbConnectionPool
.GetSessionAsync(ConnectionString, Password, cancellationToken)
.ContinueWith(previousTask =>
{
if (previousTask.IsFaulted)
{
Expand All @@ -322,8 +296,9 @@ public override Task OpenAsync(CancellationToken cancellationToken)
}
else
{
logger.Debug("All good");
// Only continue if the session was opened successfully
SfSession = previousTask.Result;
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
OnSessionEstablished();
}
},
Expand All @@ -349,15 +324,15 @@ public void SetArrayBindStageCreated()
/// Create a new SFsession with the connection string settings.
/// </summary>
/// <exception cref="SnowflakeDbException">If the connection string can't be processed</exception>
private void SetSession()
private void OnSessionConnecting()
{
SfSession = new SFSession(ConnectionString, Password);
_connectionTimeout = (int)SfSession.connectionTimeout.TotalSeconds;
// SfSession = new SFSession(ConnectionString, Password);
_connectionState = ConnectionState.Connecting;
}

private void OnSessionEstablished()
{
_connectionTimeout = (int)SfSession.connectionTimeout.TotalSeconds;
_connectionState = ConnectionState.Open;
}

Expand Down
16 changes: 12 additions & 4 deletions Snowflake.Data/Client/SnowflakeDbConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using Snowflake.Data.Core;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
using Snowflake.Data.Core;
using Snowflake.Data.Core.Session;
using Snowflake.Data.Log;

Expand All @@ -8,12 +11,17 @@ public class SnowflakeDbConnectionPool
{
private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger<SnowflakeDbConnectionPool>();

internal static SFSession GetSession(string connStr)
internal static SFSession GetSession(string connStr, SecureString password)
{
s_logger.Debug("SnowflakeDbConnectionPool::GetSession");
return SessionPoolSingleton.Instance.GetSession(connStr);
return SessionPoolSingleton.Instance.GetSession(connStr, password);
}


internal static Task<SFSession> GetSessionAsync(string connStr, SecureString password, CancellationToken cancellationToken)
{
return SessionPoolSingleton.Instance.GetSessionAsync(connStr, password, cancellationToken);
}

internal static bool AddSession(SFSession session)
{
s_logger.Debug("SnowflakeDbConnectionPool::AddSession");
Expand Down
98 changes: 95 additions & 3 deletions Snowflake.Data/Core/Session/SessionPool.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
using Snowflake.Data.Client;
using Snowflake.Data.Log;

namespace Snowflake.Data.Core.Session
Expand Down Expand Up @@ -70,11 +74,11 @@ private void CleanExpiredSessions()
}
}

internal SFSession GetSession(string connStr)
internal SFSession GetSession(string connStr, SecureString password)
{
s_logger.Debug("SessionPool::GetSession");
if (!_pooling)
return null;
return NewSession(connStr, password);
lock (s_sessionPoolLock)
{
for (int i = 0; i < _sessionPool.Count; i++)
Expand All @@ -97,8 +101,96 @@ internal SFSession GetSession(string connStr)
}
}
}
return null;
return NewSession(connStr, password);
}

internal Task<SFSession> GetSessionAsync(string connStr, SecureString password, CancellationToken cancellationToken)
{
s_logger.Debug("SessionPool::GetSession");
if (!_pooling)
return Task.FromResult(NewSession(connStr, password));
lock (s_sessionPoolLock)
{
for (int i = 0; i < _sessionPool.Count; i++)
{
if (_sessionPool[i].connStr.Equals(connStr))
{
SFSession session = _sessionPool[i];
_sessionPool.RemoveAt(i);
long timeNow = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
if (session.IsExpired(_timeout, timeNow))
{
session.close();
i--;
}
else
{
s_logger.Debug($"reuse pooled session with sid {session.sessionId}");
return Task.FromResult(session);
}
}
}
}
return NewSessionAsync(connStr, password, cancellationToken);
}

internal SFSession NewSession(String connectionString, SecureString password)
{
try
{
var session = new SFSession(connectionString, password);
session.Open();
return session;
}
catch (Exception e)
{
// Otherwise when Dispose() is called, the close request would timeout.
if (!(e is SnowflakeDbException))
throw;
throw new SnowflakeDbException(
e,
SnowflakeDbException.CONNECTION_FAILURE_SSTATE,
SFError.INTERNAL_ERROR,
"Unable to connect. " + e.Message);
}
}

internal Task<SFSession> NewSessionAsync(String connectionString, SecureString password, CancellationToken cancellationToken)
{
try
{
var session = new SFSession(connectionString, password);
return session
.OpenAsync(cancellationToken)
.ContinueWith(previousTask =>
{
if (previousTask.IsCanceled)
cancellationToken.ThrowIfCancellationRequested();

if (previousTask.IsFaulted && previousTask.Exception != null)
throw previousTask.Exception;

if (previousTask.IsFaulted)
throw new SnowflakeDbException(
SnowflakeDbException.CONNECTION_FAILURE_SSTATE,
SFError.INTERNAL_ERROR,
"Async open on connection failure");

return session;
}, cancellationToken);
}
catch (Exception e)
{
if (!(e is SnowflakeDbException))
throw;
throw new SnowflakeDbException(
e,
SnowflakeDbException.CONNECTION_FAILURE_SSTATE,
SFError.INTERNAL_ERROR,
"Unable to connect. " + e.Message);
}
}

internal bool AddSession(SFSession session)
{
s_logger.Debug("SessionPool::AddSession");
Expand Down

0 comments on commit a65c00f

Please sign in to comment.