diff --git a/CHANGELOG.md b/CHANGELOG.md index ff9cd73f..709ba091 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v1.0.4 + +### Microsoft.DurableTask.Worker + +- Fix handling of concurrent external events with the same name (https://github.com/microsoft/durabletask-dotnet/pull/194) + ## v1.0.3 ### Microsoft.DurableTask.Worker diff --git a/eng/proto b/eng/proto index 7d682688..19f0f696 160000 --- a/eng/proto +++ b/eng/proto @@ -1 +1 @@ -Subproject commit 7d6826889eb9b104592ab1020c648517a155ba79 +Subproject commit 19f0f6966186416ad0d2953b2f8e2271d5c93246 diff --git a/eng/targets/Release.props b/eng/targets/Release.props index 7a16111a..049e4588 100644 --- a/eng/targets/Release.props +++ b/eng/targets/Release.props @@ -17,7 +17,7 @@ - 1.0.3 + 1.0.4 diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs index 7e162db0..b7959299 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs @@ -21,11 +21,6 @@ interface IEventSource /// Type EventType { get; } - /// - /// Gets or sets the next task completion source in the stack. - /// - IEventSource? Next { get; set; } - /// /// Tries to set the result on tcs. /// @@ -38,8 +33,6 @@ class EventTaskCompletionSource : TaskCompletionSource, IEventSource /// public Type EventType => typeof(T); - /// - public IEventSource? Next { get; set; } /// void IEventSource.TrySetResult(object result) => this.TrySetResult((T)result); diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 222ed9fe..c388a2b0 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -14,7 +14,7 @@ namespace Microsoft.DurableTask.Worker.Shims; /// sealed partial class TaskOrchestrationContextWrapper : TaskOrchestrationContext { - readonly Dictionary externalEventSources = new(StringComparer.OrdinalIgnoreCase); + readonly Dictionary> externalEventSources = new(StringComparer.OrdinalIgnoreCase); readonly NamedQueue externalEventBuffer = new(); readonly OrchestrationContext innerContext; readonly OrchestrationInvocationContext invocationContext; @@ -195,19 +195,21 @@ public override Task WaitForExternalEvent(string eventName, CancellationTo // Create a task completion source that will be set when the external event arrives. EventTaskCompletionSource eventSource = new(); - if (this.externalEventSources.TryGetValue(eventName, out IEventSource? existing)) + if (this.externalEventSources.TryGetValue(eventName, out Queue? existing)) { - if (existing.EventType != typeof(T)) + if (existing.Count > 0 && existing.Peek().EventType != typeof(T)) { throw new ArgumentException("Events with the same name must have the same type argument. Expected" - + $" {existing.EventType.FullName}."); + + $" {existing.Peek().GetType().FullName} but was requested {typeof(T).FullName}."); } - existing.Next = eventSource; + existing.Enqueue(eventSource); } else { - this.externalEventSources.Add(eventName, eventSource); + Queue eventSourceQueue = new(); + eventSourceQueue.Enqueue(eventSource); + this.externalEventSources.Add(eventName, eventSourceQueue); } // TODO: this needs to be tracked and disposed appropriately. @@ -307,19 +309,16 @@ static void SwapByteArrayElements(byte[] byteArray, int left, int right) /// The serialized event payload. internal void CompleteExternalEvent(string eventName, string rawEventPayload) { - if (this.externalEventSources.TryGetValue(eventName, out IEventSource? waiter)) + if (this.externalEventSources.TryGetValue(eventName, out Queue? waiters)) { + IEventSource waiter = waiters.Dequeue(); object? value = this.DataConverter.Deserialize(rawEventPayload, waiter.EventType); // Events are completed in FIFO order. Remove the key if the last event was delivered. - if (waiter.Next == null) + if (waiters.Count == 0) { this.externalEventSources.Remove(eventName); } - else - { - this.externalEventSources[eventName] = waiter.Next; - } waiter.TrySetResult(value); } diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index b6af1559..071dbad5 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -311,6 +311,49 @@ public async Task ExternalEvents(int eventCount) Assert.Equal(expected, metadata.ReadOutputAs()); } + [Theory] + [InlineData(1)] + [InlineData(5)] + public async Task ExternalEventsInParallel(int eventCount) + { + TaskName orchestratorName = nameof(ExternalEvents); + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + List> events = new(); + for (int i = 0; i < eventCount; i++) + { + events.Add(ctx.WaitForExternalEvent("Event")); + } + + return await Task.WhenAll(events); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // To ensure consistency, wait for the instance to start before sending the events + OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( + instanceId, + this.TimeoutToken); + + // Send events one-at-a-time to that we can better ensure ordered processing. + for (int i = 0; i < eventCount; i++) + { + await server.Client.RaiseEventAsync(metadata.InstanceId, "Event", eventPayload: i); + } + + // Once the orchestration receives all the events it is expecting, it should complete. + metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + int[] expected = Enumerable.Range(0, eventCount).ToArray(); + Assert.Equal(expected, metadata.ReadOutputAs()); + } + [Fact] public async Task Termination() {