Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing for multiple events of same name to be raised in parallel #194

Merged
merged 8 commits into from
Sep 27, 2023
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.0.4

### Microsoft.DurableTask.Worker

- Fix handling of concurrent external events with the same name (https://github.com/microsoft/durabletask-dotnet/pull/194)

## v1.0.3

### Microsoft.DurableTask.Worker
Expand Down
2 changes: 1 addition & 1 deletion eng/proto
2 changes: 1 addition & 1 deletion eng/targets/Release.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</PropertyGroup>

<PropertyGroup>
<VersionPrefix>1.0.3</VersionPrefix>
<VersionPrefix>1.0.4</VersionPrefix>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@
/// </summary>
Type EventType { get; }

/// <summary>
/// Gets or sets the next task completion source in the stack.
/// </summary>
IEventSource? Next { get; set; }

/// <summary>
/// Tries to set the result on tcs.
/// </summary>
Expand All @@ -37,9 +32,7 @@
{
/// <inheritdoc/>
public Type EventType => typeof(T);

Check warning on line 35 in src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs

View workflow job for this annotation

GitHub Actions / build

Code should not contain multiple blank lines in a row

Check warning on line 35 in src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs

View workflow job for this annotation

GitHub Actions / build

Code should not contain multiple blank lines in a row
/// <inheritdoc/>
public IEventSource? Next { get; set; }

/// <inheritdoc/>
void IEventSource.TrySetResult(object result) => this.TrySetResult((T)result);
Expand Down
23 changes: 11 additions & 12 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.DurableTask.Worker.Shims;
/// </summary>
sealed partial class TaskOrchestrationContextWrapper : TaskOrchestrationContext
{
readonly Dictionary<string, IEventSource> externalEventSources = new(StringComparer.OrdinalIgnoreCase);
readonly Dictionary<string, Queue<IEventSource>> externalEventSources = new(StringComparer.OrdinalIgnoreCase);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the typical event usage is, but I am going to hazard a guess single events are common. In which case allocating a queue for it is not optimal. We will have to revisit this and see if there is a way to optimize for a few element count cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed we should revisit this. It turns out we do the same thing with the NamedQueue<string> class for buffered events, so we should consider optimizing both.

readonly NamedQueue<string> externalEventBuffer = new();
readonly OrchestrationContext innerContext;
readonly OrchestrationInvocationContext invocationContext;
Expand Down Expand Up @@ -195,19 +195,21 @@ public override Task<T> WaitForExternalEvent<T>(string eventName, CancellationTo

// Create a task completion source that will be set when the external event arrives.
EventTaskCompletionSource<T> eventSource = new();
if (this.externalEventSources.TryGetValue(eventName, out IEventSource? existing))
if (this.externalEventSources.TryGetValue(eventName, out Queue<IEventSource>? existing))
{
if (existing.EventType != typeof(T))
if (existing.Count > 0 && existing.Peek().EventType != typeof(T))
{
throw new ArgumentException("Events with the same name must have the same type argument. Expected"
+ $" {existing.EventType.FullName}.");
+ $" {existing.Peek().GetType().FullName} but was requested {typeof(T).FullName}.");
}

existing.Next = eventSource;
existing.Enqueue(eventSource);
}
else
{
this.externalEventSources.Add(eventName, eventSource);
Queue<IEventSource> eventSourceQueue = new();
eventSourceQueue.Enqueue(eventSource);
this.externalEventSources.Add(eventName, eventSourceQueue);
}

// TODO: this needs to be tracked and disposed appropriately.
Expand Down Expand Up @@ -307,19 +309,16 @@ static void SwapByteArrayElements(byte[] byteArray, int left, int right)
/// <param name="rawEventPayload">The serialized event payload.</param>
internal void CompleteExternalEvent(string eventName, string rawEventPayload)
{
if (this.externalEventSources.TryGetValue(eventName, out IEventSource? waiter))
if (this.externalEventSources.TryGetValue(eventName, out Queue<IEventSource>? waiters))
{
IEventSource waiter = waiters.Dequeue();
object? value = this.DataConverter.Deserialize(rawEventPayload, waiter.EventType);

// Events are completed in FIFO order. Remove the key if the last event was delivered.
if (waiter.Next == null)
if (waiters.Count == 0)
{
this.externalEventSources.Remove(eventName);
}
else
{
this.externalEventSources[eventName] = waiter.Next;
}

waiter.TrySetResult(value);
}
Expand Down
43 changes: 43 additions & 0 deletions test/Grpc.IntegrationTests/OrchestrationPatterns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,49 @@ public async Task ExternalEvents(int eventCount)
Assert.Equal<int>(expected, metadata.ReadOutputAs<int[]>());
}

[Theory]
[InlineData(1)]
[InlineData(5)]
public async Task ExternalEventsInParallel(int eventCount)
{
TaskName orchestratorName = nameof(ExternalEvents);
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
List<Task<int>> events = new();
for (int i = 0; i < eventCount; i++)
{
events.Add(ctx.WaitForExternalEvent<int>("Event"));
}

return await Task.WhenAll(events);
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);

// To ensure consistency, wait for the instance to start before sending the events
OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync(
instanceId,
this.TimeoutToken);

// Send events one-at-a-time to that we can better ensure ordered processing.
for (int i = 0; i < eventCount; i++)
{
await server.Client.RaiseEventAsync(metadata.InstanceId, "Event", eventPayload: i);
}

// Once the orchestration receives all the events it is expecting, it should complete.
metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

int[] expected = Enumerable.Range(0, eventCount).ToArray();
Assert.Equal<int>(expected, metadata.ReadOutputAs<int[]>());
}

[Fact]
public async Task Termination()
{
Expand Down