Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool/SNOW-937183 Prevent evicted connections from returning to the pool #912

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,12 @@ CancellationTokenSource cancellationTokenSource = new CancellationTokenSource()
((SnowflakeDbConnection)conn).CloseAsync(cancellationTokenSource.Token);
```

Evict the Connection
--------------------

For the open connection, call the `PreventPooling()` to mark the connection to be removed on close instead being still pooled.
The busy sessions counter will be decreased when the connection is closed.

Logging
-------
The Snowflake Connector for .NET uses [log4net](http://logging.apache.org/log4net/) as the logging framework.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
using Snowflake.Data.Tests.Mock;
using Snowflake.Data.Tests.Util;

namespace Snowflake.Data.Tests.IntegrationTests
Expand Down Expand Up @@ -44,5 +48,46 @@ public async Task TestMinPoolSizeAsync()
// cleanup
await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false);
}

[Test]
public async Task TestPreventConnectionFromReturningToPool()
{
// arrange
var connectionString = ConnectionString + "minPoolSize=0";
var connection = new SnowflakeDbConnection(connectionString);
await connection.OpenAsync().ConfigureAwait(false);
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
Assert.AreEqual(1, pool.GetCurrentPoolSize());

// act
connection.PreventPooling();
await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false);

// assert
Assert.AreEqual(0, pool.GetCurrentPoolSize());
}

[Test]
public async Task TestReleaseConnectionWhenRollbackFailsAsync()
{
// arrange
var connectionString = ConnectionString + "minPoolSize=0";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
var commandThrowingExceptionOnlyForRollback = MockHelper.CommandThrowingExceptionOnlyForRollback();
var mockDbProviderFactory = new Mock<DbProviderFactory>();
mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object);
Assert.AreEqual(0, pool.GetCurrentPoolSize());
var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object);
connection.ConnectionString = connectionString;
await connection.OpenAsync().ConfigureAwait(false);
connection.BeginTransaction(); // not using async version because it is not available on .net framework
Assert.AreEqual(true, connection.HasActiveExplicitTransaction());

// act
await connection.CloseAsync(CancellationToken.None).ConfigureAwait(false);

// assert
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "Should not return connection to the pool");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
using System;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
using Snowflake.Data.Tests.Mock;
using Snowflake.Data.Tests.Util;

namespace Snowflake.Data.Tests.IntegrationTests
Expand Down Expand Up @@ -374,12 +377,52 @@ public void TestMinPoolSize()
connection.Close();
}

[Test]
public void TestPreventConnectionFromReturningToPool()
{
// arrange
var connectionString = ConnectionString + "minPoolSize=0";
var connection = OpenConnection(connectionString);
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
Assert.AreEqual(1, pool.GetCurrentPoolSize());

// act
connection.PreventPooling();
connection.Close();

// assert
Assert.AreEqual(0, pool.GetCurrentPoolSize());
}

[Test]
public void TestReleaseConnectionWhenRollbackFails()
{
// arrange
var connectionString = ConnectionString + "minPoolSize=0";
var pool = SnowflakeDbConnectionPool.GetPool(connectionString);
var commandThrowingExceptionOnlyForRollback = MockHelper.CommandThrowingExceptionOnlyForRollback();
var mockDbProviderFactory = new Mock<DbProviderFactory>();
mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object);
Assert.AreEqual(0, pool.GetCurrentPoolSize());
var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object);
connection.ConnectionString = connectionString;
connection.Open();
connection.BeginTransaction();
Assert.AreEqual(true, connection.HasActiveExplicitTransaction());

// act
connection.Close();

// assert
Assert.AreEqual(0, pool.GetCurrentPoolSize(), "Should not return connection to the pool");
}

private void WaitUntilAllSessionsCreatedOrTimeout(SessionPool pool)
{
var expectingToWaitAtMostForSessionCreations = TimeSpan.FromSeconds(15);
Awaiter.WaitUntilConditionOrTimeout(() => pool.OngoingSessionCreationsCount() == 0, expectingToWaitAtMostForSessionCreations);
}

private SnowflakeDbConnection OpenConnection(string connectionString)
{
var connection = new SnowflakeDbConnection();
Expand Down
14 changes: 14 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
*/

using System;
using System.Data;
using System.Threading;
using NUnit.Framework;
Expand Down Expand Up @@ -107,5 +108,18 @@ public void TestConnectionPoolWithDispose()
Assert.AreEqual(ConnectionState.Closed, conn1.State);
Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(conn1.ConnectionString).GetCurrentPoolSize());
}

[Test]
public void TestFailWhenPreventingFromReturningToPoolNotOpenedConnection()
{
// arrange
var connection = new SnowflakeDbConnection(ConnectionString);

// act
var thrown = Assert.Throws<Exception>(() => connection.PreventPooling());

// assert
Assert.That(thrown.Message, Does.Contain("Session not yet created for this connection. Unable to prevent the session from pooling"));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Core.Session;
using Snowflake.Data.Tests.Mock;
using Snowflake.Data.Tests.Util;
using Moq;

namespace Snowflake.Data.Tests.IntegrationTests
{
Expand Down Expand Up @@ -300,5 +301,45 @@ public void TestConnectionPoolExpirationWorks()
// so expected result should be 0
Assert.AreEqual(0, SnowflakeDbConnectionPool.GetPool(ConnectionString).GetCurrentPoolSize());
}

[Test]
public void TestPreventConnectionFromReturningToPool()
{
// arrange
var connection = new SnowflakeDbConnection(ConnectionString);
connection.Open();
var pool = SnowflakeDbConnectionPool.GetPool(ConnectionString);
Assert.AreEqual(0, pool.GetCurrentPoolSize());

// act
connection.PreventPooling();
connection.Close();

// assert
Assert.AreEqual(0, pool.GetCurrentPoolSize());
}

[Test]
public void TestReleaseConnectionWhenRollbackFails()
{
// arrange
SnowflakeDbConnectionPool.SetMaxPoolSize(10);
var commandThrowingExceptionOnlyForRollback = MockHelper.CommandThrowingExceptionOnlyForRollback();
var mockDbProviderFactory = new Mock<DbProviderFactory>();
mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object);
Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize());
var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object);
connection.ConnectionString = ConnectionString;
connection.Open();
connection.BeginTransaction();
Assert.AreEqual(true, connection.HasActiveExplicitTransaction());
// no Rollback or Commit; during internal Rollback while closing a connection a mocked exception will be thrown

// act
connection.Close();

// assert
Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Should not return connection to the pool");
}
}
}
37 changes: 0 additions & 37 deletions Snowflake.Data.Tests/IntegrationTests/SFConnectionPoolAsyncIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using Snowflake.Data.Client;
using Snowflake.Data.Core;
using Snowflake.Data.Log;
using Snowflake.Data.Tests.Mock;
using Moq;
using NUnit.Framework;

namespace Snowflake.Data.Tests.IntegrationTests
Expand Down Expand Up @@ -230,30 +227,6 @@ public void TestRollbackTransactionOnPooledWhenConnectionClose()
Assert.AreEqual(1, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Connection should be returned to the pool");
}

[Test]
public void TestFailureOfTransactionRollbackOnConnectionClosePreventsAddingToPool()
{
SnowflakeDbConnectionPool.SetMaxPoolSize(10);
var commandThrowingExceptionOnlyForRollback = new Mock<SnowflakeDbCommand>();
commandThrowingExceptionOnlyForRollback.CallBase = true;
commandThrowingExceptionOnlyForRollback.SetupSet(it => it.CommandText = "ROLLBACK")
.Throws(new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unexpected failure on transaction rollback when connection is returned to the pool with pending transaction"));
var mockDbProviderFactory = new Mock<DbProviderFactory>();
mockDbProviderFactory.Setup(p => p.CreateCommand()).Returns(commandThrowingExceptionOnlyForRollback.Object);

Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize());
using (var connection = new TestSnowflakeDbConnection(mockDbProviderFactory.Object))
{
connection.ConnectionString = ConnectionString;
connection.Open();
connection.BeginTransaction();
Assert.AreEqual(true, connection.HasActiveExplicitTransaction());
// no Rollback or Commit; during internal Rollback while closing a connection a mocked exception will be thrown
}

Assert.AreEqual(0, SnowflakeDbConnectionPool.GetCurrentPoolSize(), "Should not return connection to the pool");
}

[Test]
// test connection pooling with concurrent connection and using async calls no close
// call for connection. Connection is closed when Dispose() is called
Expand Down Expand Up @@ -346,15 +319,5 @@ static async Task InvalidConnectionTaskAsync(string connectionString)
await Task.Delay(100);
}
}

private class TestSnowflakeDbConnection : SnowflakeDbConnection
{
public TestSnowflakeDbConnection(DbProviderFactory dbProviderFactory)
{
DbProviderFactory = dbProviderFactory;
}

protected override DbProviderFactory DbProviderFactory { get; }
}
}
}
18 changes: 18 additions & 0 deletions Snowflake.Data.Tests/Mock/MockHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Moq;
using Snowflake.Data.Client;
using Snowflake.Data.Core;

namespace Snowflake.Data.Tests.Mock
{
public static class MockHelper
{
public static Mock<SnowflakeDbCommand> CommandThrowingExceptionOnlyForRollback()
{
var command = new Mock<SnowflakeDbCommand>();
command.CallBase = true;
command.SetupSet(it => it.CommandText = "ROLLBACK")
.Throws(new SnowflakeDbException(SFError.INTERNAL_ERROR, "Unexpected failure on transaction rollback when connection is returned to the pool with pending transaction"));
return command;
}
}
}
15 changes: 15 additions & 0 deletions Snowflake.Data.Tests/Mock/TestSnowflakeDbConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Data.Common;
using Snowflake.Data.Client;

namespace Snowflake.Data.Tests.Mock
{
public class TestSnowflakeDbConnection : SnowflakeDbConnection
{
public TestSnowflakeDbConnection(DbProviderFactory dbProviderFactory)
{
DbProviderFactory = dbProviderFactory;
}

protected override DbProviderFactory DbProviderFactory { get; }
}
}
Loading
Loading