Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] SNOW-902611 Refactor of SessionPool #777

Closed
wants to merge 8 commits into from
4 changes: 2 additions & 2 deletions Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void TestApplicationName()
try
{
conn.Open();
s_logger.Debug("{appName}");
s_logger.Debug($"{appName}");
Assert.Fail();

}
Expand Down Expand Up @@ -1898,7 +1898,7 @@ public void TestCloseAsyncFailure()
{
using (var conn = new MockSnowflakeDbConnection(new MockCloseSessionException()))
{
SnowflakeDbConnectionPool.SetPooling(false);
SnowflakeDbConnectionPool.GetPool(ConnectionString, null).SetPooling(false);
conn.ConnectionString = ConnectionString;
Assert.AreEqual(conn.State, ConnectionState.Closed);
Task task = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class PoolConfig {

public PoolConfig()
{
var connectionPoolManagerBase = SnowflakeDbConnectionPool.Instance; // TODO: check why necessary
_maxPoolSize = SnowflakeDbConnectionPool.GetMaxPoolSize();
_timeout = SnowflakeDbConnectionPool.GetTimeout();
_pooling = SnowflakeDbConnectionPool.GetPooling();
Expand Down Expand Up @@ -214,7 +215,7 @@ public void TestConnectionPoolIsFull()
conn3.ConnectionString = ConnectionString + " retryCount=2";
conn3.Open();
Assert.AreEqual(ConnectionState.Open, conn3.State);
SnowflakeDbConnectionPool.ClearAllPools();
// SnowflakeDbConnectionPool.ClearAllPools(); // TODO: check it!

conn1.Close();
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize());
Expand Down Expand Up @@ -320,6 +321,8 @@ public void TestConnectionPoolFull()
conn3.Open();
Assert.AreEqual(ConnectionState.Open, conn3.State);

Assert.AreEqual(2, SnowflakeDbConnectionPool.GetCurrentPoolSize());

var conn4 = new SnowflakeDbConnection();
conn4.ConnectionString = ConnectionString + " retryCount=3";
conn4.Open();
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data.Tests/Mock/MockSnowflakeDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public override void Open()

public override Task OpenAsync(CancellationToken cancellationToken)
{
registerConnectionCancellationCallback(cancellationToken);
RegisterConnectionCancellationCallback(cancellationToken);

SetMockSession();

Expand Down
128 changes: 58 additions & 70 deletions Snowflake.Data/Client/SnowflakeDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class SnowflakeDbConnection : DbConnection

internal int _connectionTimeout;

private bool disposed = false;
private bool _disposed = false;

private static Mutex _arraybindingMutex = new Mutex();

Expand Down Expand Up @@ -151,9 +151,9 @@ public override void Close()
logger.Debug("Close Connection.");
if (IsNonClosedWithSession())
{
var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined;
var transactionRollbackStatus = GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined;

if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession))
if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(ConnectionString, Password, SfSession))
{
logger.Debug($"Session pooled: {SfSession.sessionId}");
}
Expand All @@ -172,7 +172,7 @@ public override void Close()
// Adding an override for CloseAsync will prevent the need for casting to SnowflakeDbConnection to call CloseAsync(CancellationToken).
public override async Task CloseAsync()
{
await CloseAsync(CancellationToken.None);
await CloseAsync(CancellationToken.None).ConfigureAwait(false);
}
#endif

Expand All @@ -189,9 +189,9 @@ public Task CloseAsync(CancellationToken cancellationToken)
{
if (IsNonClosedWithSession())
{
var transactionRollbackStatus = SnowflakeDbConnectionPool.GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined;
var transactionRollbackStatus = GetPooling() ? TerminateTransactionForDirtyConnectionReturningToPool() : TransactionRollbackStatus.Undefined;

if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(SfSession))
if (CanReuseSession(transactionRollbackStatus) && SnowflakeDbConnectionPool.AddSession(ConnectionString, Password, SfSession))
{
logger.Debug($"Session pooled: {SfSession.sessionId}");
_connectionState = ConnectionState.Closed;
Expand Down Expand Up @@ -234,86 +234,54 @@ public Task CloseAsync(CancellationToken cancellationToken)

private bool CanReuseSession(TransactionRollbackStatus transactionRollbackStatus)
{
return SnowflakeDbConnectionPool.GetPooling() &&
return GetPooling() &&
transactionRollbackStatus == TransactionRollbackStatus.Success;
}

private bool GetPooling()
{
return SnowflakeDbConnectionPool.GetPool(ConnectionString, Password).GetPooling();
}

public override void Open()
{
logger.Debug("Open Connection.");
if (_connectionState != ConnectionState.Closed)
{
logger.Debug($"Open with a connection already opened: {_connectionState}");
logger.Warn($"Opening a connection already opened: {_connectionState}");
return;
}
SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString);
if (SfSession != null)

try
{
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
OnSessionOpen();
SfSession = SnowflakeDbConnectionPool.GetSession(ConnectionString, Password);
OnSessionEstablished();
}
else
catch (Exception e)
{
SetSession();
try
{
SfSession.Open();
}
catch (Exception e)
{
// Otherwise when Dispose() is called, the close request would timeout.
_connectionState = ConnectionState.Closed;
logger.Error("Unable to connect", e);
if (!(e.GetType() == typeof(SnowflakeDbException)))
{
throw
new SnowflakeDbException(
e,
SnowflakeDbException.CONNECTION_FAILURE_SSTATE,
SFError.INTERNAL_ERROR,
"Unable to connect. " + e.Message);
}
else
{
throw;
}
}
RethrowOnSessionOpenFailure(e);
}
OnSessionEstablished();
}

public override Task OpenAsync(CancellationToken cancellationToken)
{
logger.Debug("Open Connection Async.");
if (_connectionState != ConnectionState.Closed)
{
logger.Debug($"Open with a connection already opened: {_connectionState}");
logger.Warn($"Opening a connection already opened: {_connectionState}");
return Task.CompletedTask;
}
SfSession = SnowflakeDbConnectionPool.GetSession(this.ConnectionString);
if (SfSession != null)
{
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
OnSessionEstablished();
return Task.CompletedTask;
}

registerConnectionCancellationCallback(cancellationToken);
SetSession();

return SfSession.OpenAsync(cancellationToken).ContinueWith(
previousTask =>
OnSessionOpen();
return SnowflakeDbConnectionPool.GetSessionAsync(ConnectionString, Password, cancellationToken)
.ContinueWith(previousTask =>
{
if (previousTask.IsFaulted)
{
// Exception from SfSession.OpenAsync
Exception sfSessionEx = previousTask.Exception;
_connectionState = ConnectionState.Closed;
logger.Error("Unable to connect", sfSessionEx);
throw new SnowflakeDbException(
sfSessionEx,
SnowflakeDbException.CONNECTION_FAILURE_SSTATE,
SFError.INTERNAL_ERROR,
"Unable to connect");
RethrowOnSessionOpenFailure(previousTask.Exception);
}
else if (previousTask.IsCanceled)
{
Expand All @@ -322,7 +290,8 @@ public override Task OpenAsync(CancellationToken cancellationToken)
}
else
{
logger.Debug("All good");
SfSession = previousTask.Result;
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
// Only continue if the session was opened successfully
OnSessionEstablished();
}
Expand All @@ -344,23 +313,42 @@ public void SetArrayBindStageCreated()
{
_isArrayBindStageCreated = true;
}

/// <summary>
/// Create a new SFsession with the connection string settings.
/// </summary>
/// <exception cref="SnowflakeDbException">If the connection string can't be processed</exception>
private void SetSession()

private void OnSessionOpen()
{
SfSession = new SFSession(ConnectionString, Password);
_connectionTimeout = (int)SfSession.connectionTimeout.TotalSeconds;
logger.Debug("Opening session");
_connectionState = ConnectionState.Connecting;
}

private void OnSessionEstablished()
{
if (SfSession == null)
{
logger.Error("Error during opening session");
throw new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unable to establish a session");
}
logger.Debug("Session established");
_connectionState = ConnectionState.Open;
_connectionTimeout = (int)SfSession.connectionTimeout.TotalSeconds;
logger.Debug($"Connection open with pooled session: {SfSession.sessionId}");
}

private void RethrowOnSessionOpenFailure(Exception exception)
{
// Otherwise when Dispose() is called, the close request would timeout.
_connectionState = ConnectionState.Closed;
logger.Error("Unable to connect: ", exception);
if (exception != null && exception is SnowflakeDbException dbException)
throw dbException;

var errorMessage = "Unable to connect. " + (exception != null ? exception.Message : "");
throw new SnowflakeDbException(
exception,
SnowflakeDbException.CONNECTION_FAILURE_SSTATE,
SFError.INTERNAL_ERROR,
errorMessage);
}

protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
{
// Parameterless BeginTransaction() method of the super class calls this method with IsolationLevel.Unspecified,
Expand All @@ -382,20 +370,20 @@ protected override DbCommand CreateDbCommand()

protected override void Dispose(bool disposing)
{
if (disposed)
if (_disposed)
return;

try
{
this.Close();
Close();
}
catch (Exception ex)
{
// Prevent an exception from being thrown when disposing of this object
logger.Error("Unable to close connection", ex);
}

disposed = true;
_disposed = true;

base.Dispose(disposing);
}
Expand All @@ -406,7 +394,7 @@ protected override void Dispose(bool disposing)
/// layer or timeout reached. Whichever comes first would trigger query cancellation.
/// </summary>
/// <param name="externalCancellationToken">cancellation token from upper layer</param>
internal void registerConnectionCancellationCallback(CancellationToken externalCancellationToken)
internal void RegisterConnectionCancellationCallback(CancellationToken externalCancellationToken)
{
if (!externalCancellationToken.IsCancellationRequested)
{
Expand Down
Loading