Skip to content

Commit

Permalink
Unify how async continuations are handled for connection and session …
Browse files Browse the repository at this point in the history
…objects
  • Loading branch information
Havret committed May 4, 2024
1 parent c668b81 commit f1d692d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/ArtemisNetCoreClient/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal class Connection : IConnection, IChannel
private readonly Endpoint _endpoint;
private readonly Task _receiveLoopTask;
private readonly ConcurrentDictionary<long, IChannel> _channels = new();
private readonly ConcurrentDictionary<long, TaskCompletionSource<IIncomingPacket>> _completionSources = new();
private readonly ConcurrentDictionary<long, TaskCompletionSource<object>> _completionSources = new();
private readonly SemaphoreSlim _lock = new(1, 1);
private readonly IdGenerator _sessionChannelIdGenerator = new(10);
private readonly CancellationTokenSource _receiveLoopCancellationToken;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void OnPacket(in InboundPacket packet)
var createSessionResponseMessage = new CreateSessionResponseMessage(packet.Payload);
if (_completionSources.TryRemove(-1, out var tcs))
{
tcs.TrySetResult(createSessionResponseMessage);
tcs.TrySetResult(new SessionInfo { ServerVersion = createSessionResponseMessage.ServerVersion });
}
break;
default:
Expand Down Expand Up @@ -109,10 +109,10 @@ public async Task<ISession> CreateSessionAsync(CancellationToken cancellationTok
try
{
await _lock.WaitAsync(cancellationToken);
var tcs = new TaskCompletionSource<IIncomingPacket>(TaskCreationOptions.RunContinuationsAsynchronously);
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_ = _completionSources.TryAdd(-1, tcs);
Send(createSessionMessage, 1);
var incomingPacket = (CreateSessionResponseMessage) await tcs.Task;
var incomingPacket = (SessionInfo) await tcs.Task;

var session = new Session(this, _loggerFactory)
{
Expand Down
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/SessionInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ActiveMQ.Artemis.Core.Client;

internal class SessionInfo
{
public required int ServerVersion { get; init; }
}

0 comments on commit f1d692d

Please sign in to comment.