From f2c841c1a2c964b4cf91849609cad30277bd5807 Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Wed, 20 Sep 2023 07:54:03 -0600 Subject: [PATCH 1/5] Adding capability to raise multiple events of same name in parallel Signed-off-by: Ryan Lettieri --- eng/proto | 2 +- .../Shims/TaskOrchestrationContextWrapper.cs | 5 +++ .../OrchestrationPatterns.cs | 43 +++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/eng/proto b/eng/proto index 7d682688..19f0f696 160000 --- a/eng/proto +++ b/eng/proto @@ -1 +1 @@ -Subproject commit 7d6826889eb9b104592ab1020c648517a155ba79 +Subproject commit 19f0f6966186416ad0d2953b2f8e2271d5c93246 diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 6fec3e40..4ed50046 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -14,6 +14,7 @@ namespace Microsoft.DurableTask.Worker.Shims; /// sealed partial class TaskOrchestrationContextWrapper : TaskOrchestrationContext { + // TODO: Change externalEventSources value from IEventSource to queue to preserve FIFO in O(1) readonly Dictionary externalEventSources = new(StringComparer.OrdinalIgnoreCase); readonly NamedQueue externalEventBuffer = new(); readonly OrchestrationContext innerContext; @@ -202,6 +203,10 @@ public override Task WaitForExternalEvent(string eventName, CancellationTo + $" {existing.EventType.FullName}."); } + while (existing.Next != null) + { + existing = existing.Next; + } existing.Next = eventSource; } else diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index b6af1559..071dbad5 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -311,6 +311,49 @@ public async Task ExternalEvents(int eventCount) Assert.Equal(expected, metadata.ReadOutputAs()); } + [Theory] + [InlineData(1)] + [InlineData(5)] + public async Task ExternalEventsInParallel(int eventCount) + { + TaskName orchestratorName = nameof(ExternalEvents); + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + List> events = new(); + for (int i = 0; i < eventCount; i++) + { + events.Add(ctx.WaitForExternalEvent("Event")); + } + + return await Task.WhenAll(events); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // To ensure consistency, wait for the instance to start before sending the events + OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( + instanceId, + this.TimeoutToken); + + // Send events one-at-a-time to that we can better ensure ordered processing. + for (int i = 0; i < eventCount; i++) + { + await server.Client.RaiseEventAsync(metadata.InstanceId, "Event", eventPayload: i); + } + + // Once the orchestration receives all the events it is expecting, it should complete. + metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + int[] expected = Enumerable.Range(0, eventCount).ToArray(); + Assert.Equal(expected, metadata.ReadOutputAs()); + } + [Fact] public async Task Termination() { From 62d833cfb10a38b514b0bbe99f87c5f2bfb0aa0e Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Wed, 20 Sep 2023 20:24:27 -0600 Subject: [PATCH 2/5] Changing event sources to be a dictionary of queues Signed-off-by: Ryan Lettieri --- ...OrchestrationContextWrapper.EventSource.cs | 5 ---- .../Shims/TaskOrchestrationContextWrapper.cs | 30 ++++++++----------- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs index 7e162db0..219a4b43 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs @@ -21,11 +21,6 @@ interface IEventSource /// Type EventType { get; } - /// - /// Gets or sets the next task completion source in the stack. - /// - IEventSource? Next { get; set; } - /// /// Tries to set the result on tcs. /// diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 4ed50046..e0b070a0 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -14,8 +14,7 @@ namespace Microsoft.DurableTask.Worker.Shims; /// sealed partial class TaskOrchestrationContextWrapper : TaskOrchestrationContext { - // TODO: Change externalEventSources value from IEventSource to queue to preserve FIFO in O(1) - readonly Dictionary externalEventSources = new(StringComparer.OrdinalIgnoreCase); + readonly Dictionary> externalEventSources = new(StringComparer.OrdinalIgnoreCase); readonly NamedQueue externalEventBuffer = new(); readonly OrchestrationContext innerContext; readonly OrchestrationInvocationContext invocationContext; @@ -195,23 +194,21 @@ public override Task WaitForExternalEvent(string eventName, CancellationTo // Create a task completion source that will be set when the external event arrives. EventTaskCompletionSource eventSource = new(); - if (this.externalEventSources.TryGetValue(eventName, out IEventSource? existing)) + if (this.externalEventSources.TryGetValue(eventName, out Queue? existing)) { - if (existing.EventType != typeof(T)) + if (existing.Peek().EventType != typeof(T)) { throw new ArgumentException("Events with the same name must have the same type argument. Expected" - + $" {existing.EventType.FullName}."); + + $" {existing.Peek().GetType().FullName}."); } - - while (existing.Next != null) - { - existing = existing.Next; - } - existing.Next = eventSource; + + existing.Enqueue(eventSource); } else { - this.externalEventSources.Add(eventName, eventSource); + Queue eventSourceQueue = new(); + eventSourceQueue.Enqueue(eventSource); + this.externalEventSources.Add(eventName, eventSourceQueue); } // TODO: this needs to be tracked and disposed appropriately. @@ -311,19 +308,16 @@ static void SwapByteArrayElements(byte[] byteArray, int left, int right) /// The serialized event payload. internal void CompleteExternalEvent(string eventName, string rawEventPayload) { - if (this.externalEventSources.TryGetValue(eventName, out IEventSource? waiter)) + if (this.externalEventSources.TryGetValue(eventName, out Queue? waiters)) { + IEventSource waiter = waiters.Dequeue(); object? value = this.DataConverter.Deserialize(rawEventPayload, waiter.EventType); // Events are completed in FIFO order. Remove the key if the last event was delivered. - if (waiter.Next == null) + if (waiters.Count == 0) { this.externalEventSources.Remove(eventName); } - else - { - this.externalEventSources[eventName] = waiter.Next; - } waiter.TrySetResult(value); } From 6fedd191ce13c53ef0602530c2740b4d3bff8c7b Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Thu, 21 Sep 2023 00:52:34 -0600 Subject: [PATCH 3/5] Removing Next from event source and adding in NPE safety Signed-off-by: Ryan Lettieri --- .../Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs index 219a4b43..9abef951 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs @@ -33,8 +33,8 @@ class EventTaskCompletionSource : TaskCompletionSource, IEventSource /// public Type EventType => typeof(T); - /// - public IEventSource? Next { get; set; } + // /// + // public IEventSource? Next { get; set; } /// void IEventSource.TrySetResult(object result) => this.TrySetResult((T)result); From 1040ef802eb52826d890d89f43c8443accf89427 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Mon, 25 Sep 2023 16:44:58 -0700 Subject: [PATCH 4/5] Update CHANGELOG.md and version to 1.0.4 --- CHANGELOG.md | 6 ++++++ eng/targets/Release.props | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff9cd73f..709ba091 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v1.0.4 + +### Microsoft.DurableTask.Worker + +- Fix handling of concurrent external events with the same name (https://github.com/microsoft/durabletask-dotnet/pull/194) + ## v1.0.3 ### Microsoft.DurableTask.Worker diff --git a/eng/targets/Release.props b/eng/targets/Release.props index 7a16111a..049e4588 100644 --- a/eng/targets/Release.props +++ b/eng/targets/Release.props @@ -17,7 +17,7 @@ - 1.0.3 + 1.0.4 From 837e280643d2cf682516aa24ca90585719cd5e8f Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Tue, 26 Sep 2023 22:50:42 -0700 Subject: [PATCH 5/5] Cleanup and PR feedback --- .../Shims/TaskOrchestrationContextWrapper.EventSource.cs | 2 -- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs index 9abef951..b7959299 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs @@ -33,8 +33,6 @@ class EventTaskCompletionSource : TaskCompletionSource, IEventSource /// public Type EventType => typeof(T); - // /// - // public IEventSource? Next { get; set; } /// void IEventSource.TrySetResult(object result) => this.TrySetResult((T)result); diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index b842aa79..c388a2b0 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -197,12 +197,12 @@ public override Task WaitForExternalEvent(string eventName, CancellationTo EventTaskCompletionSource eventSource = new(); if (this.externalEventSources.TryGetValue(eventName, out Queue? existing)) { - if (existing.Peek().EventType != typeof(T)) + if (existing.Count > 0 && existing.Peek().EventType != typeof(T)) { throw new ArgumentException("Events with the same name must have the same type argument. Expected" - + $" {existing.Peek().GetType().FullName}."); + + $" {existing.Peek().GetType().FullName} but was requested {typeof(T).FullName}."); } - + existing.Enqueue(eventSource); } else