Skip to content

Commit

Permalink
fix(Runner): Fixed the WorkflowExecutor to pass the context of the pr…
Browse files Browse the repository at this point in the history
…evious task instead of the workflow context when creating a new task executor

Signed-off-by: Charles d'Avernas <[email protected]>
  • Loading branch information
cdavernas committed Jul 8, 2024
1 parent abc51fe commit fb12f79
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/runner/Synapse.Runner/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ protected virtual async Task OnTaskCompletedAsync(ITaskExecutor executor, Cancel
return;
}
var nextTask = await this.Workflow.CreateTaskAsync(nextDefinition.Value, nextDefinition.Key, completedTask.Output ?? new { }, cancellationToken: cancellationToken).ConfigureAwait(false);
var nextExecutor = await this.CreateTaskExecutorAsync(nextTask, nextDefinition.Value, this.Workflow.ContextData, this.Workflow.Arguments, cancellationToken).ConfigureAwait(false);
var nextExecutor = await this.CreateTaskExecutorAsync(nextTask, nextDefinition.Value, executor.Task.ContextData, this.Workflow.Arguments, cancellationToken).ConfigureAwait(false);
await nextExecutor.ExecuteAsync(cancellationToken).ConfigureAwait(false);
}

Expand Down
3 changes: 2 additions & 1 deletion tests/Synapse.UnitTests/Cases/Runner/RunnerTestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

using Docker.DotNet;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Moq;
using Neuroglia.Data.Expressions.JQ;
Expand All @@ -22,6 +21,7 @@
using Neuroglia.Data.PatchModel.Services;
using Neuroglia.Eventing.CloudEvents.Infrastructure;
using Neuroglia.Security.Services;
using ServerlessWorkflow.Sdk.IO;
using StackExchange.Redis;
using Synapse.Api.Client.Services;
using Synapse.Core.Infrastructure.Containers;
Expand Down Expand Up @@ -57,6 +57,7 @@ protected virtual IServiceCollection ConfigureServices(IServiceCollection servic
services.AddSerialization();
services.AddJsonSerializer();
services.AddJQExpressionEvaluator();
services.AddServerlessWorkflowIO();
services.AddSingleton<ITaskExecutionContextFactory, TaskExecutionContextFactory>();
services.AddSingleton<ITaskExecutorFactory, TaskExecutorFactory>();
services.AddMemoryCacheRepository<Document, string>();
Expand Down
67 changes: 67 additions & 0 deletions tests/Synapse.UnitTests/Cases/Runner/WorkflowExecutorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using ServerlessWorkflow.Sdk.IO;
using Synapse.Runner.Services;
using System.Text;

namespace Synapse.UnitTests.Cases.Runner;

public class WorkflowExecutorTests
: RunnerTestsBase
{

[Fact]
public async Task Run_Workflow_Should_Work()
{
//arrange
var yaml = @"
document:
dsl: '0.1'
namespace: default
name: hello-chain
version: '0.2.4'
do:
- say-hello-1:
set:
greetings: ${ ""Hello "" + (. // ""world"") }
input:
from: ${ $workflow.input.name1 }
export:
as:
greetings: ${ $output.greetings }
- say-hello-2:
set:
greetings: ${ $context.greetings + "" and "" + (. // ""world"") }
input:
from: ${ $workflow.input.name2 }
";
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(yaml));
var definition = await this.ServiceProvider.GetRequiredService<IWorkflowDefinitionReader>()
.ReadAsync(stream);
var input = new Neuroglia.EquatableDictionary<string, object>()
{
new("name1", "John Doe"),
new("name2", "Jane Doe")
};
var context = await MockWorkflowExecutionContextFactory.CreateAsync(this.ServiceProvider, definition, input);
var executor = ActivatorUtilities.CreateInstance<WorkflowExecutor>(ServiceProvider, context);

//act
await executor.ExecuteAsync();

//assert
context.Output.Should().NotBeNull();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Neuroglia.Data.Infrastructure.ResourceOriented;
using Neuroglia.Data.Infrastructure.ResourceOriented.Services;
using Neuroglia.Data.Infrastructure.Services;
using Neuroglia;

namespace Synapse.UnitTests.Services;

Expand All @@ -23,7 +24,7 @@ internal static class MockWorkflowExecutionContextFactory

internal static IWorkflowExecutionContext Create(IServiceProvider services, WorkflowDefinition definition, WorkflowInstance instance) => ActivatorUtilities.CreateInstance<WorkflowExecutionContext>(services, definition, instance);

internal static async Task<IWorkflowExecutionContext> CreateAsync(IServiceProvider services, WorkflowDefinition? workflowDefinition = null)
internal static async Task<IWorkflowExecutionContext> CreateAsync(IServiceProvider services, WorkflowDefinition? workflowDefinition = null, EquatableDictionary<string, object>? input = null)
{
var resources = services.GetRequiredService<IResourceRepository>();
var documents = services.GetRequiredService<IRepository<Document, string>>();
Expand Down Expand Up @@ -56,7 +57,7 @@ internal static async Task<IWorkflowExecutionContext> CreateAsync(IServiceProvid
Namespace = workflow.GetNamespace()!,
Version = workflowDefinition.Document.Version
},
Input = []
Input = input ?? []
},
Status = new()
{
Expand Down

0 comments on commit fb12f79

Please sign in to comment.