From 83132aa2b4ad46e978b05cf31ceec1f6de86c02f Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Fri, 25 Oct 2024 15:52:32 +0200 Subject: [PATCH] feat(Runner): Implemented the ability to run processes asynchronously without awaiting their completion Signed-off-by: Charles d'Avernas --- .../Synapse.Api.Client.Http.csproj | 2 +- src/cli/Synapse.Cli/Synapse.Cli.csproj | 2 +- .../Synapse.Core.Infrastructure.csproj | 2 +- src/core/Synapse.Core/Synapse.Core.csproj | 2 +- .../Executors/ContainerProcessExecutor.cs | 5 ++ .../Executors/ScriptProcessExecutor.cs | 8 +++- .../Executors/ShellProcessExecutor.cs | 8 +++- .../Executors/WorkflowProcessExecutor.cs | 48 ++++++++++--------- .../Synapse.Runner/Synapse.Runner.csproj | 2 +- .../Synapse.IntegrationTests.csproj | 2 +- .../Synapse.UnitTests.csproj | 4 +- 11 files changed, 53 insertions(+), 32 deletions(-) diff --git a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj index 8f58ea247..4fabc3bea 100644 --- a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj +++ b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj @@ -43,7 +43,7 @@ - + diff --git a/src/cli/Synapse.Cli/Synapse.Cli.csproj b/src/cli/Synapse.Cli/Synapse.Cli.csproj index 05cdb1697..a49ae597f 100644 --- a/src/cli/Synapse.Cli/Synapse.Cli.csproj +++ b/src/cli/Synapse.Cli/Synapse.Cli.csproj @@ -33,7 +33,7 @@ - + diff --git a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj index dc7afc2bc..5d4d4e5b7 100644 --- a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj +++ b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj @@ -50,7 +50,7 @@ - + diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj index 1fd685500..537f04d4c 100644 --- a/src/core/Synapse.Core/Synapse.Core.csproj +++ b/src/core/Synapse.Core/Synapse.Core.csproj @@ -69,7 +69,7 @@ - + diff --git a/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs index 908658ebd..12c276b67 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs @@ -62,6 +62,11 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken try { await this.Container!.StartAsync(cancellationToken).ConfigureAwait(false); + if (this.Task.Definition.Run.Await != false) + { + await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + return; + } await this.Container.WaitForExitAsync(cancellationToken).ConfigureAwait(false); var standardOutput = (this.Container.StandardOutput == null ? null : await this.Container.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false))?.Trim(); if (this.Options.Containers.Platform == ContainerPlatform.Docker) standardOutput = standardOutput?[8..]; diff --git a/src/runner/Synapse.Runner/Services/Executors/ScriptProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ScriptProcessExecutor.cs index cf048c0a1..7910e6077 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ScriptProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ScriptProcessExecutor.cs @@ -68,7 +68,12 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken var environment = this.ProcessDefinition.Environment == null ? null : await this.ProcessDefinition.Environment.ToAsyncEnumerable().ToDictionaryAwaitAsync(kvp => ValueTask.FromResult(kvp.Key), async kvp => (await this.EvaluateAndSerializeAsync(kvp.Value, cancellationToken).ConfigureAwait(false))!, cancellationToken).ConfigureAwait(false); - using var process = await executor.ExecuteAsync(script, arguments, environment, cancellationToken).ConfigureAwait(false); + var process = await executor.ExecuteAsync(script, arguments, environment, cancellationToken).ConfigureAwait(false); + if (this.Task.Definition.Run.Await != false) + { + await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + return; + } await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false); var rawOutput = (await process.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim(); var errorMessage = (await process.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim(); @@ -81,6 +86,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken Detail = errorMessage, Instance = this.Task.Instance.Reference }, cancellationToken).ConfigureAwait(false); + process.Dispose(); } /// diff --git a/src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs index 7ac3bfacc..a1de99171 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs @@ -49,7 +49,12 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken RedirectStandardError = true, CreateNoWindow = true }; - using var process = Process.Start(startInfo) ?? throw new NullReferenceException($"Failed to create the shell process defined at '{this.Task.Instance.Reference}'"); + var process = Process.Start(startInfo) ?? throw new NullReferenceException($"Failed to create the shell process defined at '{this.Task.Instance.Reference}'"); + if (this.Task.Definition.Run.Await != false) + { + await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + return; + } await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false); var rawOutput = (await process.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim(); var errorMessage = (await process.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim(); @@ -62,6 +67,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken Detail = errorMessage, Instance = this.Task.Instance.Reference }, cancellationToken).ConfigureAwait(false); + process.Dispose(); } } diff --git a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs index ac389ecf1..95486488e 100644 --- a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs @@ -111,38 +111,42 @@ await this.SetErrorAsync(new() }; this.Subflow = await this.Api.WorkflowInstances.CreateAsync(this.Subflow, cancellationToken).ConfigureAwait(false); } - await foreach(var watchEvent in this.Api.WorkflowInstances.MonitorAsync(this.Subflow.GetName(), this.Subflow.GetNamespace()!, cancellationToken)) + if (this.Task.Definition.Run.Await == false) await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + else { - switch (watchEvent.Resource.Status?.Phase) + await foreach (var watchEvent in this.Api.WorkflowInstances.MonitorAsync(this.Subflow.GetName(), this.Subflow.GetNamespace()!, cancellationToken)) { - case WorkflowInstanceStatusPhase.Cancelled: - if (!this.Cancelling) - { - await this.SetErrorAsync(new() + switch (watchEvent.Resource.Status?.Phase) + { + case WorkflowInstanceStatusPhase.Cancelled: + if (!this.Cancelling) { - Type = ErrorType.Runtime, - Status = ErrorStatus.Runtime, - Title = ErrorTitle.Runtime, - Detail = $"The execution of workflow instance '{this.Subflow.GetQualifiedName()}' has been cancelled" - }, cancellationToken).ConfigureAwait(false); - } - break; - case WorkflowInstanceStatusPhase.Faulted: - await this.SetErrorAsync(watchEvent.Resource.Status.Error!, cancellationToken).ConfigureAwait(false); - return; - case WorkflowInstanceStatusPhase.Completed: - var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content; - await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); - return; + await this.SetErrorAsync(new() + { + Type = ErrorType.Runtime, + Status = ErrorStatus.Runtime, + Title = ErrorTitle.Runtime, + Detail = $"The execution of workflow instance '{this.Subflow.GetQualifiedName()}' has been cancelled" + }, cancellationToken).ConfigureAwait(false); + } + break; + case WorkflowInstanceStatusPhase.Faulted: + await this.SetErrorAsync(watchEvent.Resource.Status.Error!, cancellationToken).ConfigureAwait(false); + return; + case WorkflowInstanceStatusPhase.Completed: + var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content; + await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + return; + } + if (this.Cancelling) break; } - if (this.Cancelling) break; } } /// public override async Task CancelAsync(CancellationToken cancellationToken = default) { - if(this.Subflow != null) + if (this.Subflow != null && this.Task.Definition.Run.Await != false) { try { diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index 9b9d7b30c..8d08eebc3 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -51,7 +51,7 @@ - + diff --git a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj index 9cef53eac..722714451 100644 --- a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj +++ b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj @@ -17,7 +17,7 @@ - + diff --git a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj index fbcb2aa0f..c2a6e7326 100644 --- a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj +++ b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj @@ -22,8 +22,8 @@ - - + +