From 62460d159411d1c7147e25ad7c515e8f6a913ffe Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Fri, 25 Oct 2024 09:53:44 +0200 Subject: [PATCH] feat(Api): Added controller actions to suspend, resume and cancel workflow instances Signed-off-by: Charles d'Avernas --- .../CancelWorkflowInstanceCommand.cs | 64 +++++++++++++++++++ .../ResumeWorkflowInstanceCommand.cs | 63 ++++++++++++++++++ .../SuspendWorkflowInstanceCommand.cs | 63 ++++++++++++++++++ .../IServiceCollectionExtensions.cs | 4 ++ .../ReadWorkflowInstanceLogsQuery.cs | 22 ++----- .../WatchWorkflowInstanceLogsQuery.cs | 22 ++----- .../Services/IWorkflowInstanceApiClient.cs | 27 ++++++++ .../Services/WorkflowInstanceHttpApiClient.cs | 33 ++++++++++ .../WorkflowInstancesController.cs | 53 ++++++++++++++- src/runner/Synapse.Runner/Program.cs | 1 + .../Executors/WorkflowProcessExecutor.cs | 2 +- .../Services/RunnerApplication.cs | 50 ++++++++++++++- .../Services/WorkflowExecutionContext.cs | 2 + .../Services/WorkflowExecutor.cs | 22 +++---- .../MockNamespacedResourceApiClient.cs | 24 +++---- .../Services/MockWorkflowInstanceApiClient.cs | 52 +++++++++++++++ 16 files changed, 444 insertions(+), 60 deletions(-) create mode 100644 src/api/Synapse.Api.Application/Commands/WorkflowInstances/CancelWorkflowInstanceCommand.cs create mode 100644 src/api/Synapse.Api.Application/Commands/WorkflowInstances/ResumeWorkflowInstanceCommand.cs create mode 100644 src/api/Synapse.Api.Application/Commands/WorkflowInstances/SuspendWorkflowInstanceCommand.cs diff --git a/src/api/Synapse.Api.Application/Commands/WorkflowInstances/CancelWorkflowInstanceCommand.cs b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/CancelWorkflowInstanceCommand.cs new file mode 100644 index 000000000..4af077aba --- /dev/null +++ b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/CancelWorkflowInstanceCommand.cs @@ -0,0 +1,64 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using Synapse.Resources; + +namespace Synapse.Api.Application.Commands.WorkflowInstances; + +/// +/// Represents the used to cancel the execution of a +/// +/// The name of the to cancel the execution of +/// The namespace the to cancel the execution of belongs to +public class CancelWorkflowInstanceCommand(string name, string @namespace) + : Command +{ + + /// + /// Gets the name of the to cancel the execution of + /// + public string Name { get; } = name; + + /// + /// Gets the namespace the to cancel the execution of belongs to + /// + public string Namespace { get; } = @namespace; + +} + +/// +/// Represents the service used to handle s +/// +/// The service used to manage s +public class CancelWorkflowInstanceCommandHandler(IResourceRepository resources) + : ICommandHandler +{ + + /// + public virtual async Task HandleAsync(CancelWorkflowInstanceCommand command, CancellationToken cancellationToken = default) + { + var workflowInstanceReference = new ResourceReference(command.Name, command.Namespace); + var original = await resources.GetAsync(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (!string.IsNullOrWhiteSpace(original.Status?.Phase) && original.Status?.Phase != WorkflowInstanceStatusPhase.Pending && original.Status?.Phase != WorkflowInstanceStatusPhase.Running && original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) + throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Cancelled; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + return this.Ok(); + } + +} \ No newline at end of file diff --git a/src/api/Synapse.Api.Application/Commands/WorkflowInstances/ResumeWorkflowInstanceCommand.cs b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/ResumeWorkflowInstanceCommand.cs new file mode 100644 index 000000000..f66877a74 --- /dev/null +++ b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/ResumeWorkflowInstanceCommand.cs @@ -0,0 +1,63 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using Synapse.Resources; + +namespace Synapse.Api.Application.Commands.WorkflowInstances; + +/// +/// Represents the used to resume the execution of a +/// +/// The name of the to resume the execution of +/// The namespace the to resume the execution of belongs to +public class ResumeWorkflowInstanceCommand(string name, string @namespace) + : Command +{ + + /// + /// Gets the name of the to resume the execution of + /// + public string Name { get; } = name; + + /// + /// Gets the namespace the to resume the execution of belongs to + /// + public string Namespace { get; } = @namespace; + +} + +/// +/// Represents the service used to handle s +/// +/// The service used to manage s +public class ResumeWorkflowInstanceCommandHandler(IResourceRepository resources) + : ICommandHandler +{ + + /// + public virtual async Task HandleAsync(ResumeWorkflowInstanceCommand command, CancellationToken cancellationToken = default) + { + var workflowInstanceReference = new ResourceReference(command.Name, command.Namespace); + var original = await resources.GetAsync(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (!string.IsNullOrWhiteSpace(original.Status?.Phase) && original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Running; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + return this.Ok(); + } + +} diff --git a/src/api/Synapse.Api.Application/Commands/WorkflowInstances/SuspendWorkflowInstanceCommand.cs b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/SuspendWorkflowInstanceCommand.cs new file mode 100644 index 000000000..7c0c12153 --- /dev/null +++ b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/SuspendWorkflowInstanceCommand.cs @@ -0,0 +1,63 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using Synapse.Resources; + +namespace Synapse.Api.Application.Commands.WorkflowInstances; + +/// +/// Represents the used to suspend the execution of a +/// +/// The name of the to suspend the execution of +/// The namespace the to suspend the execution of belongs to +public class SuspendWorkflowInstanceCommand(string name, string @namespace) + : Command +{ + + /// + /// Gets the name of the to suspend the execution of + /// + public string Name { get; } = name; + + /// + /// Gets the namespace the to suspend the execution of belongs to + /// + public string Namespace { get; } = @namespace; + +} + +/// +/// Represents the service used to handle s +/// +/// The service used to manage s +public class SuspendWorkflowInstanceCommandHandler(IResourceRepository resources) + : ICommandHandler +{ + + /// + public virtual async Task HandleAsync(SuspendWorkflowInstanceCommand command, CancellationToken cancellationToken = default) + { + var workflowInstanceReference = new ResourceReference(command.Name, command.Namespace); + var original = await resources.GetAsync(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (original.Status?.Phase != WorkflowInstanceStatusPhase.Running) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Waiting; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + return this.Ok(); + } + +} \ No newline at end of file diff --git a/src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs b/src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs index 9d22f0f22..f9f39f082 100644 --- a/src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs +++ b/src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs @@ -18,6 +18,7 @@ using Synapse.Api.Application.Commands.Documents; using Synapse.Api.Application.Commands.Events; using Synapse.Api.Application.Commands.Resources.Generic; +using Synapse.Api.Application.Commands.WorkflowInstances; using Synapse.Api.Application.Queries.Documents; using Synapse.Api.Application.Queries.Resources.Generic; using Synapse.Api.Application.Queries.Users; @@ -159,6 +160,9 @@ public static IServiceCollection AddApiCommands(this IServiceCollection services services.Add(new ServiceDescriptor(handlerServiceType, handlerImplementationType, serviceLifetime)); } + services.AddScoped, SuspendWorkflowInstanceCommandHandler>(); + services.AddScoped, ResumeWorkflowInstanceCommandHandler>(); + services.AddScoped, CancelWorkflowInstanceCommandHandler>(); services.AddScoped>, CreateDocumentCommandHandler>(); services.AddScoped>, UpdateDocumentCommandHandler>(); return services; diff --git a/src/api/Synapse.Api.Application/Queries/WorkflowInstances/ReadWorkflowInstanceLogsQuery.cs b/src/api/Synapse.Api.Application/Queries/WorkflowInstances/ReadWorkflowInstanceLogsQuery.cs index 34a8b8cc9..35883e72d 100644 --- a/src/api/Synapse.Api.Application/Queries/WorkflowInstances/ReadWorkflowInstanceLogsQuery.cs +++ b/src/api/Synapse.Api.Application/Queries/WorkflowInstances/ReadWorkflowInstanceLogsQuery.cs @@ -19,31 +19,21 @@ namespace Synapse.Api.Application.Queries.WorkflowInstances; /// /// Represents the query used to read the logs of the specified workflow instance /// -public class ReadWorkflowInstanceLogsQuery - : Query +/// The name of the to read the logs of +/// The namespace the to read the logs of belongs to +public class ReadWorkflowInstanceLogsQuery(string name, string @namespace) + : Query { - /// - /// Initializes a new - /// - /// The name of the to read the logs of - /// The namespace the to read the logs of belongs to - public ReadWorkflowInstanceLogsQuery(string name, string? @namespace) - { - if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name)); - this.Name = name; - this.Namespace = @namespace; - } - /// /// Gets the name of the to read the logs of /// - public string Name { get; } + public string Name { get; } = name; /// /// Gets the namespace the to read the logs of belongs to /// - public string? Namespace { get; } + public string Namespace { get; } = @namespace; } diff --git a/src/api/Synapse.Api.Application/Queries/WorkflowInstances/WatchWorkflowInstanceLogsQuery.cs b/src/api/Synapse.Api.Application/Queries/WorkflowInstances/WatchWorkflowInstanceLogsQuery.cs index dcc83ea6f..6d7a1661c 100644 --- a/src/api/Synapse.Api.Application/Queries/WorkflowInstances/WatchWorkflowInstanceLogsQuery.cs +++ b/src/api/Synapse.Api.Application/Queries/WorkflowInstances/WatchWorkflowInstanceLogsQuery.cs @@ -20,31 +20,21 @@ namespace Synapse.Api.Application.Queries.WorkflowInstances; /// /// Represents the query used to watch the logs of a specified /// -public class WatchWorkflowInstanceLogsQuery - : Query> +/// The name of the to watch the logs of +/// The namespace the to watch the logs of belongs to +public class WatchWorkflowInstanceLogsQuery(string name, string @namespace) + : Query> { - /// - /// Initializes a new - /// - /// The name of the to watch the logs of - /// The namespace the to watch the logs of belongs to - public WatchWorkflowInstanceLogsQuery(string name, string? @namespace) - { - if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name)); - this.Name = name; - this.Namespace = @namespace; - } - /// /// Gets the name of the to watch the logs of /// - public string Name { get; } + public string Name { get; } = name; /// /// Gets the namespace the to watch the logs of belongs to /// - public string? Namespace { get; } + public string? Namespace { get; } = @namespace; } diff --git a/src/api/Synapse.Api.Client.Core/Services/IWorkflowInstanceApiClient.cs b/src/api/Synapse.Api.Client.Core/Services/IWorkflowInstanceApiClient.cs index ac77c6e18..7961fbdb0 100644 --- a/src/api/Synapse.Api.Client.Core/Services/IWorkflowInstanceApiClient.cs +++ b/src/api/Synapse.Api.Client.Core/Services/IWorkflowInstanceApiClient.cs @@ -22,6 +22,33 @@ public interface IWorkflowInstanceApiClient : INamespacedResourceApiClient { + /// + /// Suspends the execution of the specified workflow instance + /// + /// The name of the workflow instance to suspend the execution of + /// The namespace the workflow instance to suspend the execution of belongs to + /// A + /// A new awaitable + Task SuspendAsync(string name, string @namespace, CancellationToken cancellationToken = default); + + /// + /// Resumes the execution of the specified workflow instance + /// + /// The name of the workflow instance to resume the execution of + /// The namespace the workflow instance to resume the execution of belongs to + /// A + /// A new awaitable + Task ResumeAsync(string name, string @namespace, CancellationToken cancellationToken = default); + + /// + /// Cancels the execution of the specified workflow instance + /// + /// The name of the workflow instance to cancel the execution of + /// The namespace the workflow instance to cancel the execution of belongs to + /// A + /// A new awaitable + Task CancelAsync(string name, string @namespace, CancellationToken cancellationToken = default); + /// /// Reads the logs of the specified /// diff --git a/src/api/Synapse.Api.Client.Http/Services/WorkflowInstanceHttpApiClient.cs b/src/api/Synapse.Api.Client.Http/Services/WorkflowInstanceHttpApiClient.cs index 7e721a5c1..1c613cd9e 100644 --- a/src/api/Synapse.Api.Client.Http/Services/WorkflowInstanceHttpApiClient.cs +++ b/src/api/Synapse.Api.Client.Http/Services/WorkflowInstanceHttpApiClient.cs @@ -23,6 +23,39 @@ public class WorkflowInstanceHttpApiClient(IServiceProvider serviceProvider, ILo : ResourceHttpApiClient(serviceProvider, logger, options, jsonSerializer, httpClient), IWorkflowInstanceApiClient { + /// + public virtual async Task SuspendAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var resource = new WorkflowInstance(); + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/suspend"; + using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Put, uri), cancellationToken).ConfigureAwait(false); + using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); + } + + /// + public virtual async Task ResumeAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var resource = new WorkflowInstance(); + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/resume"; + using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Put, uri), cancellationToken).ConfigureAwait(false); + using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); + } + + /// + public virtual async Task CancelAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var resource = new WorkflowInstance(); + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/cancel"; + using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Put, uri), cancellationToken).ConfigureAwait(false); + using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); + } + /// public virtual async Task ReadLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default) { diff --git a/src/api/Synapse.Api.Http/Controllers/WorkflowInstancesController.cs b/src/api/Synapse.Api.Http/Controllers/WorkflowInstancesController.cs index 4496fa85b..f58acfd87 100644 --- a/src/api/Synapse.Api.Http/Controllers/WorkflowInstancesController.cs +++ b/src/api/Synapse.Api.Http/Controllers/WorkflowInstancesController.cs @@ -12,6 +12,7 @@ // limitations under the License. using Neuroglia.Data.Infrastructure; +using Synapse.Api.Application.Commands.WorkflowInstances; using Synapse.Api.Application.Queries.WorkflowInstances; namespace Synapse.Api.Http.Controllers; @@ -26,6 +27,54 @@ public class WorkflowInstancesController(IMediator mediator, IJsonSerializer jso : NamespacedResourceController(mediator, jsonSerializer) { + /// + /// Suspends the execution of the specified workflow instance + /// + /// The name of the workflow instance to suspend + /// The namespace the workflow instance to suspend belongs to + /// A + /// A new + [HttpPut("{namespace}/{name}/suspend")] + [ProducesResponseType((int)HttpStatusCode.Accepted)] + [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] + public async Task SuspendWorkflowInstance(string name, string @namespace, CancellationToken cancellationToken = default) + { + if(!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState); + return this.Process(await this.Mediator.ExecuteAsync(new SuspendWorkflowInstanceCommand(name, @namespace)).ConfigureAwait(false)); + } + + /// + /// Resumes the execution of the specified workflow instance + /// + /// The name of the workflow instance to resume + /// The namespace the workflow instance to resume belongs to + /// A + /// A new + [HttpPut("{namespace}/{name}/resume")] + [ProducesResponseType((int)HttpStatusCode.Accepted)] + [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] + public async Task ResumeWorkflowInstance(string name, string @namespace, CancellationToken cancellationToken = default) + { + if (!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState); + return this.Process(await this.Mediator.ExecuteAsync(new ResumeWorkflowInstanceCommand(name, @namespace)).ConfigureAwait(false)); + } + + /// + /// Cancels the execution of the specified workflow instance + /// + /// The name of the workflow instance to cancel + /// The namespace the workflow instance to cancel belongs to + /// A + /// A new + [HttpPut("{namespace}/{name}/cancel")] + [ProducesResponseType((int)HttpStatusCode.Accepted)] + [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] + public async Task CancelWorkflowInstance(string name, string @namespace, CancellationToken cancellationToken = default) + { + if (!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState); + return this.Process(await this.Mediator.ExecuteAsync(new CancelWorkflowInstanceCommand(name, @namespace)).ConfigureAwait(false)); + } + /// /// Gets the logs produced by workflow instance with the the specified name and namespace /// @@ -36,7 +85,7 @@ public class WorkflowInstancesController(IMediator mediator, IJsonSerializer jso [HttpGet("{namespace}/{name}/logs")] [ProducesResponseType(typeof(Resource), (int)HttpStatusCode.Created)] [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] - public async Task GetResourceLogs(string name, string @namespace, CancellationToken cancellationToken = default) + public async Task GetWorkflowInstanceLogs(string name, string @namespace, CancellationToken cancellationToken = default) { if (!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState); return this.Process(await this.Mediator.ExecuteAsync(new ReadWorkflowInstanceLogsQuery(name, @namespace), cancellationToken).ConfigureAwait(false)); @@ -52,7 +101,7 @@ public async Task GetResourceLogs(string name, string @namespace, [HttpGet("{namespace}/{name}/logs/watch")] [ProducesResponseType(typeof(IAsyncEnumerable), (int)HttpStatusCode.OK)] [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] - public virtual async Task> WatchResourceLogs(string name, string @namespace, CancellationToken cancellationToken = default) + public virtual async Task> WatchWorkflowInstanceLogs(string name, string @namespace, CancellationToken cancellationToken = default) { var response = await this.Mediator.ExecuteAsync(new WatchWorkflowInstanceLogsQuery(name, @namespace), cancellationToken).ConfigureAwait(false); return response.Data!; diff --git a/src/runner/Synapse.Runner/Program.cs b/src/runner/Synapse.Runner/Program.cs index e97eae2cd..5d9269faf 100644 --- a/src/runner/Synapse.Runner/Program.cs +++ b/src/runner/Synapse.Runner/Program.cs @@ -39,6 +39,7 @@ }); }); services.AddSerialization(); + services.AddJsonSerializer(options => options.DefaultBufferSize = 128); services.AddJQExpressionEvaluator(); services.AddJavaScriptExpressionEvaluator(); services.AddNodeJSScriptExecutor(); diff --git a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs index 2997d654e..148b02a45 100644 --- a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs @@ -117,7 +117,7 @@ await this.SetErrorAsync(new() }, cancellationToken).ConfigureAwait(false); break; case WorkflowInstanceStatusPhase.Faulted: - await this.SetErrorAsync(workflowInstance.Status!.Error!, cancellationToken).ConfigureAwait(false); + await this.SetErrorAsync(watchEvent.Resource.Status.Error!, cancellationToken).ConfigureAwait(false); return; case WorkflowInstanceStatusPhase.Completed: var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content; diff --git a/src/runner/Synapse.Runner/Services/RunnerApplication.cs b/src/runner/Synapse.Runner/Services/RunnerApplication.cs index 34514b586..f503e1349 100644 --- a/src/runner/Synapse.Runner/Services/RunnerApplication.cs +++ b/src/runner/Synapse.Runner/Services/RunnerApplication.cs @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia.Data.Infrastructure.ResourceOriented; + namespace Synapse.Runner.Services; /// @@ -56,6 +58,11 @@ internal class RunnerApplication(IServiceProvider serviceProvider, IHostApplicat /// protected RunnerOptions Options { get; } = options.Value; + /// + /// Gets the used to monitor the workflow instance to run + /// + protected IObservable>? Events { get; private set; } + /// /// Gets the service used to execute the workflow instance to run /// @@ -88,15 +95,19 @@ protected virtual async Task RunAsync(CancellationToken cancellationToken) var expressionEvaluator = this.ServiceProvider.GetRequiredService().GetEvaluator(expressionLanguage) ?? throw new NullReferenceException($"Failed to find an expression evaluator for the language '{expressionLanguage}' defined by workflow '{instance.Spec.Definition.Namespace}.{instance.Spec.Definition.Name}:{instance.Spec.Definition.Version}'"); var context = ActivatorUtilities.CreateInstance(this.ServiceProvider, expressionEvaluator, definition, instance); + this.Events = (await this.ApiClient.WorkflowInstances.MonitorAsync(this.Options.Workflow.GetInstanceName(), this.Options.Workflow.GetInstanceNamespace(), cancellationToken).ConfigureAwait(false)).ToObservable(); + this.Events + .Where(e => e.Type == ResourceWatchEventType.Updated && e.Resource.Status?.Phase != context.Instance.Status?.Phase) + .Select(e => e.Resource.Status?.Phase) + .SubscribeAsync(async phase => await this.OnHandleStatusPhaseChangedAsync(phase, cancellationToken).ConfigureAwait(false)); this.Executor = ActivatorUtilities.CreateInstance(this.ServiceProvider, context); await this.Executor.ExecuteAsync(cancellationToken).ConfigureAwait(false); - this.ApplicationLifetime.StopApplication(); } catch(Exception ex) { this.Logger.LogError("An error occurred while running the specified workflow instance: {ex}", ex); - this.ApplicationLifetime.StopApplication(); } + this.ApplicationLifetime.StopApplication(); } /// @@ -109,6 +120,41 @@ public virtual async Task StopAsync(CancellationToken cancellationToken) } } + protected virtual async Task OnHandleStatusPhaseChangedAsync(string? phase, CancellationToken cancellationToken) + { + switch (phase) + { + case WorkflowInstanceStatusPhase.Waiting: + await this.OnSuspendAsync(cancellationToken).ConfigureAwait(false); + break; + case WorkflowInstanceStatusPhase.Cancelled: + await this.OnCancelAsync(cancellationToken).ConfigureAwait(false); + break; + default: + return; + } + } + + protected virtual async Task OnSuspendAsync(CancellationToken cancellationToken) + { + if (this.Executor == null) + { + this.ApplicationLifetime.StopApplication(); + return; + } + await this.Executor.SuspendAsync(cancellationToken).ConfigureAwait(false); + } + + protected virtual async Task OnCancelAsync(CancellationToken cancellationToken) + { + if (this.Executor == null) + { + this.ApplicationLifetime.StopApplication(); + return; + } + await this.Executor.CancelAsync(cancellationToken).ConfigureAwait(false); + } + /// /// Disposes of the /// diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs index ac3a16c12..9e8cea38b 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs @@ -697,6 +697,7 @@ public virtual async Task SkipAsync(TaskInstance task, object? res /// public virtual async Task SuspendAsync(CancellationToken cancellationToken = default) { + if (this.Instance.Status?.Phase == WorkflowInstanceStatusPhase.Waiting) return; using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); var originalInstance = this.Instance.Clone(); this.Instance.Status ??= new(); @@ -757,6 +758,7 @@ public virtual async Task SuspendAsync(TaskInstance task, Cancella /// public virtual async Task CancelAsync(CancellationToken cancellationToken = default) { + if (this.Instance.Status?.Phase == WorkflowInstanceStatusPhase.Cancelled) return; using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); var originalInstance = this.Instance.Clone(); this.Instance.Status ??= new(); diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs index 7e0ead1d6..97f871281 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs @@ -63,7 +63,7 @@ public class WorkflowExecutor(IServiceProvider serviceProvider, ILogger /// Gets the used to stream s /// - protected Subject Subject { get; } = new(); + protected Subject LifeCycleEvents { get; } = new(); /// /// Gets a containing all child s @@ -153,7 +153,7 @@ public virtual async Task SuspendAsync(CancellationToken cancellationToken = def { this.Stopwatch.Stop(); await this.Workflow.SuspendAsync(cancellationToken).ConfigureAwait(false); - this.Subject.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Suspended)); + this.LifeCycleEvents.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Suspended)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult(); } @@ -167,8 +167,8 @@ protected virtual async Task SetErrorAsync(Error error, CancellationToken cancel { this.Stopwatch.Stop(); await this.Workflow.SetErrorAsync(error, cancellationToken).ConfigureAwait(false); - this.Subject.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Faulted)); - this.Subject.OnError(new ErrorRaisedException(error)); + this.LifeCycleEvents.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Faulted)); + this.LifeCycleEvents.OnError(new ErrorRaisedException(error)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult(); } @@ -184,8 +184,8 @@ protected virtual async Task SetResultAsync(object? result, CancellationToken ca this.Stopwatch.Stop(); var output = result; await this.Workflow.SetResultAsync(output, cancellationToken).ConfigureAwait(false); - this.Subject.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Completed)); - this.Subject.OnCompleted(); + this.LifeCycleEvents.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Completed)); + this.LifeCycleEvents.OnCompleted(); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult(); } @@ -193,12 +193,12 @@ protected virtual async Task SetResultAsync(object? result, CancellationToken ca public virtual async Task CancelAsync(CancellationToken cancellationToken = default) { await this.Workflow.CancelAsync(cancellationToken).ConfigureAwait(false); - this.Subject.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Cancelled)); + this.LifeCycleEvents.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Cancelled)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetCanceled(cancellationToken); } /// - public virtual IDisposable Subscribe(IObserver observer) => this.Subject.Subscribe(observer); + public virtual IDisposable Subscribe(IObserver observer) => this.LifeCycleEvents.Subscribe(observer); /// /// Creates a new for the specified @@ -232,8 +232,6 @@ protected virtual async Task CreateTaskExecutorAsync(TaskInstance /// Handles the timeout of the to execute /// /// The timer's state - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "VSTHRD100:Avoid async void methods", Justification = "")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "VSTHRD200:Use \"Async\" suffix for async methods", Justification = "")] protected virtual async void OnTimeoutAsync(object? state) { await this.SetErrorAsync(new Error() @@ -295,7 +293,7 @@ protected virtual async ValueTask DisposeAsync(bool disposing) try { this.TaskCompletionSource.SetCanceled(); } catch { } this.CancellationTokenSource?.Dispose(); if (this.Timer != null) await this.Timer.DisposeAsync().ConfigureAwait(false); - this.Subject.Dispose(); + this.LifeCycleEvents.Dispose(); this._disposed = true; } @@ -318,7 +316,7 @@ protected virtual void Dispose(bool disposing) try { this.TaskCompletionSource.SetCanceled(); } catch { } this.CancellationTokenSource?.Dispose(); this.Timer?.Dispose(); - this.Subject.Dispose(); + this.LifeCycleEvents.Dispose(); this._disposed = true; } diff --git a/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs b/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs index a83462ad1..037e890ee 100644 --- a/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs +++ b/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs @@ -23,26 +23,28 @@ internal class MockNamespacedResourceApiClient(IResourceRepository re where TResource : class, IResource, new() { - public Task CreateAsync(TResource resource, CancellationToken cancellationToken = default) => resources.AddAsync(resource, false, cancellationToken); + protected IResourceRepository Resources { get; } = resources; - public Task DeleteAsync(string name, string @namespace, CancellationToken cancellationToken = default) => resources.RemoveAsync(name, @namespace, false, cancellationToken); + public Task CreateAsync(TResource resource, CancellationToken cancellationToken = default) => this.Resources.AddAsync(resource, false, cancellationToken); - public Task> ListAsync(string? @namespace, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => Task.FromResult(resources.GetAllAsync(@namespace, labelSelectors, cancellationToken: cancellationToken)!); + public Task DeleteAsync(string name, string @namespace, CancellationToken cancellationToken = default) => this.Resources.RemoveAsync(name, @namespace, false, cancellationToken); - public async Task>> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => (await resources.WatchAsync(@namespace, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable(); + public Task> ListAsync(string? @namespace, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => Task.FromResult(this.Resources.GetAllAsync(@namespace, labelSelectors, cancellationToken: cancellationToken)!); - public async Task>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default) => (await resources.MonitorAsync(name, @namespace, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable(); + public async Task>> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => (await this.Resources.WatchAsync(@namespace, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable(); - public Task GetAsync(string name, string @namespace, CancellationToken cancellationToken = default) => resources.GetAsync(name, @namespace, cancellationToken)!; + public async Task>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default) => (await this.Resources.MonitorAsync(name, @namespace, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable(); - public async Task GetDefinitionAsync(CancellationToken cancellationToken = default) => ((ResourceDefinition)(await resources.GetDefinitionAsync(cancellationToken))!)!; + public Task GetAsync(string name, string @namespace, CancellationToken cancellationToken = default) => this.Resources.GetAsync(name, @namespace, cancellationToken)!; - public Task PatchAsync(string name, string @namespace, Patch patch, string? resourceVersion = null, CancellationToken cancellationToken = default) => resources.PatchAsync(patch, name, @namespace, resourceVersion, false, cancellationToken); + public async Task GetDefinitionAsync(CancellationToken cancellationToken = default) => ((ResourceDefinition)(await this.Resources.GetDefinitionAsync(cancellationToken))!)!; - public Task PatchStatusAsync(string name, string @namespace, Patch patch, string? resourceVersion = null, CancellationToken cancellationToken = default) => resources.PatchStatusAsync(patch, name, @namespace, resourceVersion, false, cancellationToken); + public Task PatchAsync(string name, string @namespace, Patch patch, string? resourceVersion = null, CancellationToken cancellationToken = default) => this.Resources.PatchAsync(patch, name, @namespace, resourceVersion, false, cancellationToken); - public Task ReplaceAsync(TResource resource, CancellationToken cancellationToken = default) => resources.ReplaceAsync(resource, false, cancellationToken); + public Task PatchStatusAsync(string name, string @namespace, Patch patch, string? resourceVersion = null, CancellationToken cancellationToken = default) => this.Resources.PatchStatusAsync(patch, name, @namespace, resourceVersion, false, cancellationToken); - public Task ReplaceStatusAsync(TResource resource, CancellationToken cancellationToken = default) => resources.ReplaceStatusAsync(resource, false, cancellationToken); + public Task ReplaceAsync(TResource resource, CancellationToken cancellationToken = default) => this.Resources.ReplaceAsync(resource, false, cancellationToken); + + public Task ReplaceStatusAsync(TResource resource, CancellationToken cancellationToken = default) => this.Resources.ReplaceStatusAsync(resource, false, cancellationToken); } diff --git a/tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs b/tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs index 7e0301feb..49cf10a41 100644 --- a/tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs +++ b/tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs @@ -11,10 +11,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia.Data; +using Neuroglia; using Neuroglia.Data.Infrastructure; +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using Neuroglia.Data.Infrastructure.ResourceOriented; using Neuroglia.Data.Infrastructure.ResourceOriented.Services; using Neuroglia.Data.Infrastructure.Services; using Synapse.Api.Client.Services; +using System.Net; namespace Synapse.UnitTests.Services; @@ -22,8 +27,55 @@ internal class MockWorkflowInstanceApiClient(IResourceRepository resources, ITex : MockNamespacedResourceApiClient(resources), IWorkflowInstanceApiClient { + public async Task SuspendAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var workflowInstanceReference = new ResourceReference(name, @namespace); + var original = await this.Resources.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (original.Status?.Phase != WorkflowInstanceStatusPhase.Running) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Waiting; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), name, @namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + } + + public async Task ResumeAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var workflowInstanceReference = new ResourceReference(name, @namespace); + var original = await this.Resources.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Running; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), name, @namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + } + + public async Task CancelAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var workflowInstanceReference = new ResourceReference(name, @namespace); + var original = await this.Resources.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (original.Status?.Phase != WorkflowInstanceStatusPhase.Pending && original.Status?.Phase != WorkflowInstanceStatusPhase.Running && original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) + throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Cancelled; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), name, @namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + } + public Task ReadLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default) => logs.ReadToEndAsync($"{name}.{@namespace}", cancellationToken); + public async Task> WatchLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default) => (await logs.WatchAsync($"{name}.{@namespace}", cancellationToken)).ToAsyncEnumerable(); } \ No newline at end of file