Skip to content

Commit

Permalink
Exception from cancellation when a token is already cancelled.
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed Aug 20, 2021
1 parent c224ae1 commit 09f1a60
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 58 deletions.
4 changes: 2 additions & 2 deletions Microsoft.Azure.Amqp/Amqp/AmqpObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,6 @@ public void Signal(bool syncComplete, Exception exception)

protected void Start()
{
this.SetTimer();

bool shouldComplete = false;
Exception completeException = null;

Expand All @@ -618,6 +616,8 @@ protected void Start()
amqpObject.pendingClose = null;
this.Signal(true, completeException);
}

this.StartTracking();
}

protected abstract bool OnStart();
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,6 @@ public static Outcome End(IAsyncResult result)

public void Start()
{
this.SetTimer();
DeliveryState deliveryState;
if (txnId.Array != null)
{
Expand All @@ -894,6 +893,8 @@ public void Start()
// Delivery tag not found
link.pendingDispositions.CompleteWork(deliveryTag, true, AmqpConstants.RejectedNotFoundOutcome);
}

this.StartTracking();
}

public void Done(bool completedSynchronously, DeliveryState state)
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Amqp/Amqp/RequestResponseAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,9 @@ public static AmqpMessage End(IAsyncResult result)

public void Start()
{
this.SetTimer();
this.parent.sender.SendMessageNoWait(this.request, AmqpConstants.EmptyBinary, this.transactionId);
this.request = null;
this.StartTracking();
}

public void Done(bool completedSynchronously, AmqpMessage response)
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ public static Outcome End(IAsyncResult result)

public void Start()
{
this.SetTimer();
this.link.SendMessageInternal(this.message, this.deliveryTag, this.txnId);
this.StartTracking();
}

public void Done(bool completedSynchronously, Outcome outcome)
Expand Down
36 changes: 16 additions & 20 deletions Microsoft.Azure.Amqp/Amqp/TimeoutAsyncResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,36 @@ namespace Microsoft.Azure.Amqp
abstract class TimeoutAsyncResult<T> : AsyncResult where T : class
{
readonly TimeSpan timeout;
readonly CancellationTokenRegistration cancellationTokenRegistration;
readonly CancellationToken cancellationToken;
CancellationTokenRegistration cancellationTokenRegistration;
Timer timer;
#if DEBUG
bool setTimerCalled; // make sure derived class always call SetTimer
#endif

protected TimeoutAsyncResult(TimeSpan timeout, CancellationToken cancellationToken, AsyncCallback callback, object state)
: base(callback, state)
{
// The derived class must call SetTimer to start the timer.
// Timer is not started here because it could fire before the
// derived class ctor completes.
// The derived class must call StartTracking to start the timer and cancelation token registration.
// Timer is not started here because it could fire before the derived class ctor completes.
this.timeout = timeout;
if (cancellationToken.CanBeCanceled)
{
this.cancellationTokenRegistration = cancellationToken.Register(o => ((TimeoutAsyncResult<T>)o).Cancel(), this);
}
this.cancellationToken = cancellationToken;
}

protected abstract T Target { get; }

public abstract void Cancel();

protected void SetTimer()
protected void StartTracking()
{
#if DEBUG
this.setTimerCalled = true;
#endif
if (this.timeout != TimeSpan.MaxValue)
if (!this.IsCompleted)
{
this.timer = new Timer(s => OnTimerCallback(s), this, this.timeout, Timeout.InfiniteTimeSpan);
if (this.timeout != Timeout.InfiniteTimeSpan && this.timeout != TimeSpan.MaxValue)
{
this.timer = new Timer(s => OnTimerCallback(s), this, this.timeout, Timeout.InfiniteTimeSpan);
}

if (this.cancellationToken.CanBeCanceled)
{
this.cancellationTokenRegistration = this.cancellationToken.Register(o => ((TimeoutAsyncResult<T>)o).Cancel(), this);
}
}
}

Expand Down Expand Up @@ -78,9 +77,6 @@ static void OnTimerCallback(object state)

bool CompleteInternal(bool syncComplete, Exception exception)
{
#if DEBUG
Fx.AssertAndThrow(exception != null || this.setTimerCalled, "Must call SetTimer.");
#endif
if (this.timer != null)
{
this.timer.Dispose();
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Amqp/Amqp/Transport/AmqpTransportInitiator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,12 @@ public ConnectAsyncResult(AmqpTransportInitiator initiator, TimeSpan timeout, Ca
this.args = new TransportAsyncCallbackArgs();
this.args.CompletedCallback = onConnect;
this.args.UserToken = this;
this.SetTimer();

if (!initiator.ConnectAsync(timeout, this.args))
{
OnConnect(this.args);
}

this.StartTracking();
}

protected override string Target
Expand Down
6 changes: 6 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/Transport/AmqpTransportProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ public AmqpTransportProvider()
this.ProtocolId = ProtocolId.Amqp;
}

internal AmqpTransportProvider(AmqpVersion version)
: this()
{
this.Versions.Add(version);
}

protected override TransportBase OnCreateTransport(TransportBase innerTransport, bool isInitiator)
{
return innerTransport;
Expand Down
Loading

0 comments on commit 09f1a60

Please sign in to comment.