Skip to content

Commit

Permalink
Fix bug where we accept any SemaphoreSlim instead of a
Browse files Browse the repository at this point in the history
DeadlockAwareLock.
  • Loading branch information
uglybugger committed May 3, 2022
1 parent f867e59 commit 7fb4d7e
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void ConcurrentAccessDoesNotGoBoom()
using (var transaction = Store.BeginTransaction())
{
Enumerable.Range(0, NumberOfDocuments)
.Select(i => new DocumentWithIdentityId {Name = $"{namePrefix}{i}"})
.Select(i => new DocumentWithIdentityId { Name = $"{namePrefix}{i}" })
.AsParallel()
.WithDegreeOfParallelism(DegreeOfParallelism)
.Select(document =>
Expand Down
29 changes: 29 additions & 0 deletions source/Nevermore.Tests/DeadlockAwareLockFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using Nevermore.Advanced;
using Nito.AsyncEx;
using NUnit.Framework;

namespace Nevermore.Tests
Expand Down Expand Up @@ -131,5 +132,33 @@ public async Task MultipleTasksContending_ShouldNotThrow()

// ReSharper restore AccessToDisposedClosure
}

[Test]
public void UsingSyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow()
{
using var deadlockAwareLock = new DeadlockAwareLock();

using (var _ = deadlockAwareLock.Lock())
{
}

using (var _ = deadlockAwareLock.Lock())
{
}
}

[Test]
public async Task UsingAsyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow()
{
using var deadlockAwareLock = new DeadlockAwareLock();

using (var _ = await deadlockAwareLock.LockAsync(cancellationToken))
{
}

using (var _ = await deadlockAwareLock.LockAsync(cancellationToken))
{
}
}
}
}
21 changes: 12 additions & 9 deletions source/Nevermore/Advanced/ReadTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ReadTransaction : IReadTransaction, ITransactionDiagnostic
SqlConnection? connection;

protected IUniqueParameterNameGenerator ParameterNameGenerator { get; } = new UniqueParameterNameGenerator();
protected DeadlockAwareLock Semaphore { get; } = new();
protected DeadlockAwareLock DeadlockAwareLock { get; } = new();

// To help track deadlocks
readonly List<string> commandTrace;
Expand Down Expand Up @@ -447,7 +447,7 @@ IEnumerable<TRecord> Execute()
yield return item;
}

return new ThreadSafeEnumerable<TRecord>(Execute, Semaphore);
return new ThreadSafeEnumerable<TRecord>(Execute, DeadlockAwareLock);
}

public IAsyncEnumerable<TRecord> StreamAsync<TRecord>(PreparedCommand command, CancellationToken cancellationToken = default)
Expand All @@ -459,7 +459,7 @@ async IAsyncEnumerable<TRecord> Execute()
yield return result;
}

return new ThreadSafeAsyncEnumerable<TRecord>(Execute, Semaphore);
return new ThreadSafeAsyncEnumerable<TRecord>(Execute, DeadlockAwareLock);
}

IEnumerable<TRecord> ProcessReader<TRecord>(DbDataReader reader, PreparedCommand command)
Expand Down Expand Up @@ -533,14 +533,14 @@ public Task<int> ExecuteNonQueryAsync(string query, CommandParameterValues? args

public int ExecuteNonQuery(PreparedCommand preparedCommand)
{
using var mutex = Semaphore.Lock();
using var mutex = DeadlockAwareLock.Lock();
using var command = CreateCommand(preparedCommand);
return command.ExecuteNonQuery();
}

public async Task<int> ExecuteNonQueryAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default)
{
using var mutex = await Semaphore.LockAsync(cancellationToken);
using var mutex = await DeadlockAwareLock.LockAsync(cancellationToken);
using var command = CreateCommand(preparedCommand);
return await command.ExecuteNonQueryAsync(cancellationToken);
}
Expand All @@ -557,7 +557,7 @@ public Task<TResult> ExecuteScalarAsync<TResult>(string query, CommandParameterV

public TResult ExecuteScalar<TResult>(PreparedCommand preparedCommand)
{
using var mutex = Semaphore.Lock();
using var mutex = DeadlockAwareLock.Lock();
using var command = CreateCommand(preparedCommand);
var result = command.ExecuteScalar();
if (result == DBNull.Value)
Expand All @@ -567,7 +567,7 @@ public TResult ExecuteScalar<TResult>(PreparedCommand preparedCommand)

public async Task<TResult> ExecuteScalarAsync<TResult>(PreparedCommand preparedCommand, CancellationToken cancellationToken = default)
{
using var mutex = await Semaphore.LockAsync(cancellationToken);
using var mutex = await DeadlockAwareLock.LockAsync(cancellationToken);
using var command = CreateCommand(preparedCommand);
var result = await command.ExecuteScalarAsync(cancellationToken);
if (result == DBNull.Value)
Expand Down Expand Up @@ -599,15 +599,15 @@ public async Task<DbDataReader> ExecuteReaderAsync(PreparedCommand preparedComma

protected TResult[] ReadResults<TResult>(PreparedCommand preparedCommand, Func<DbDataReader, TResult> mapper)
{
using var mutex = Semaphore.Lock();
using var mutex = DeadlockAwareLock.Lock();

using var command = CreateCommand(preparedCommand);
return command.ReadResults(mapper);
}

protected async Task<TResult[]> ReadResultsAsync<TResult>(PreparedCommand preparedCommand, Func<DbDataReader, Task<TResult>> mapper, CancellationToken cancellationToken)
{
using var mutex = await Semaphore.LockAsync(cancellationToken);
using var mutex = await DeadlockAwareLock.LockAsync(cancellationToken);

using var command = CreateCommand(preparedCommand);
return await command.ReadResultsAsync(mapper, cancellationToken);
Expand Down Expand Up @@ -727,8 +727,11 @@ void ITransactionDiagnostic.WriteCurrentTransactions(StringBuilder output)

public void Dispose()
{
// ReSharper disable ConstantConditionalAccessQualifier
Transaction?.Dispose();
DeadlockAwareLock?.Dispose();
connection?.Dispose();
// ReSharper restore ConstantConditionalAccessQualifier
registry.Remove(this);
}
}
Expand Down
10 changes: 5 additions & 5 deletions source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ namespace Nevermore.Advanced
public class ThreadSafeAsyncEnumerable<T> : IAsyncEnumerable<T>
{
readonly Func<IAsyncEnumerable<T>> innerFunc;
readonly SemaphoreSlim semaphore;
readonly DeadlockAwareLock deadlockAwareLock;

public ThreadSafeAsyncEnumerable(IAsyncEnumerable<T> inner, SemaphoreSlim semaphore) : this(() => inner, semaphore)
public ThreadSafeAsyncEnumerable(IAsyncEnumerable<T> inner, DeadlockAwareLock deadlockAwareLock) : this(() => inner, deadlockAwareLock)
{
}

public ThreadSafeAsyncEnumerable(Func<IAsyncEnumerable<T>> innerFunc, SemaphoreSlim semaphore)
public ThreadSafeAsyncEnumerable(Func<IAsyncEnumerable<T>> innerFunc, DeadlockAwareLock deadlockAwareLock)
{
this.innerFunc = innerFunc;
this.semaphore = semaphore;
this.deadlockAwareLock = deadlockAwareLock;
}

public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new())
{
using var mutex = await semaphore.LockAsync(cancellationToken);
using var mutex = await deadlockAwareLock.LockAsync(cancellationToken);
var inner = innerFunc();
await foreach (var item in inner.WithCancellation(cancellationToken)) yield return item;
}
Expand Down

0 comments on commit 7fb4d7e

Please sign in to comment.