diff --git a/src/api/Synapse.Api.Application/Commands/WorkflowInstances/CancelWorkflowInstanceCommand.cs b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/CancelWorkflowInstanceCommand.cs new file mode 100644 index 000000000..4af077aba --- /dev/null +++ b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/CancelWorkflowInstanceCommand.cs @@ -0,0 +1,64 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using Synapse.Resources; + +namespace Synapse.Api.Application.Commands.WorkflowInstances; + +/// +/// Represents the used to cancel the execution of a +/// +/// The name of the to cancel the execution of +/// The namespace the to cancel the execution of belongs to +public class CancelWorkflowInstanceCommand(string name, string @namespace) + : Command +{ + + /// + /// Gets the name of the to cancel the execution of + /// + public string Name { get; } = name; + + /// + /// Gets the namespace the to cancel the execution of belongs to + /// + public string Namespace { get; } = @namespace; + +} + +/// +/// Represents the service used to handle s +/// +/// The service used to manage s +public class CancelWorkflowInstanceCommandHandler(IResourceRepository resources) + : ICommandHandler +{ + + /// + public virtual async Task HandleAsync(CancelWorkflowInstanceCommand command, CancellationToken cancellationToken = default) + { + var workflowInstanceReference = new ResourceReference(command.Name, command.Namespace); + var original = await resources.GetAsync(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (!string.IsNullOrWhiteSpace(original.Status?.Phase) && original.Status?.Phase != WorkflowInstanceStatusPhase.Pending && original.Status?.Phase != WorkflowInstanceStatusPhase.Running && original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) + throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Cancelled; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + return this.Ok(); + } + +} \ No newline at end of file diff --git a/src/api/Synapse.Api.Application/Commands/WorkflowInstances/ResumeWorkflowInstanceCommand.cs b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/ResumeWorkflowInstanceCommand.cs new file mode 100644 index 000000000..f66877a74 --- /dev/null +++ b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/ResumeWorkflowInstanceCommand.cs @@ -0,0 +1,63 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using Synapse.Resources; + +namespace Synapse.Api.Application.Commands.WorkflowInstances; + +/// +/// Represents the used to resume the execution of a +/// +/// The name of the to resume the execution of +/// The namespace the to resume the execution of belongs to +public class ResumeWorkflowInstanceCommand(string name, string @namespace) + : Command +{ + + /// + /// Gets the name of the to resume the execution of + /// + public string Name { get; } = name; + + /// + /// Gets the namespace the to resume the execution of belongs to + /// + public string Namespace { get; } = @namespace; + +} + +/// +/// Represents the service used to handle s +/// +/// The service used to manage s +public class ResumeWorkflowInstanceCommandHandler(IResourceRepository resources) + : ICommandHandler +{ + + /// + public virtual async Task HandleAsync(ResumeWorkflowInstanceCommand command, CancellationToken cancellationToken = default) + { + var workflowInstanceReference = new ResourceReference(command.Name, command.Namespace); + var original = await resources.GetAsync(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (!string.IsNullOrWhiteSpace(original.Status?.Phase) && original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Running; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + return this.Ok(); + } + +} diff --git a/src/api/Synapse.Api.Application/Commands/WorkflowInstances/SuspendWorkflowInstanceCommand.cs b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/SuspendWorkflowInstanceCommand.cs new file mode 100644 index 000000000..7c0c12153 --- /dev/null +++ b/src/api/Synapse.Api.Application/Commands/WorkflowInstances/SuspendWorkflowInstanceCommand.cs @@ -0,0 +1,63 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using Synapse.Resources; + +namespace Synapse.Api.Application.Commands.WorkflowInstances; + +/// +/// Represents the used to suspend the execution of a +/// +/// The name of the to suspend the execution of +/// The namespace the to suspend the execution of belongs to +public class SuspendWorkflowInstanceCommand(string name, string @namespace) + : Command +{ + + /// + /// Gets the name of the to suspend the execution of + /// + public string Name { get; } = name; + + /// + /// Gets the namespace the to suspend the execution of belongs to + /// + public string Namespace { get; } = @namespace; + +} + +/// +/// Represents the service used to handle s +/// +/// The service used to manage s +public class SuspendWorkflowInstanceCommandHandler(IResourceRepository resources) + : ICommandHandler +{ + + /// + public virtual async Task HandleAsync(SuspendWorkflowInstanceCommand command, CancellationToken cancellationToken = default) + { + var workflowInstanceReference = new ResourceReference(command.Name, command.Namespace); + var original = await resources.GetAsync(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (original.Status?.Phase != WorkflowInstanceStatusPhase.Running) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Waiting; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + return this.Ok(); + } + +} \ No newline at end of file diff --git a/src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs b/src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs index 9d22f0f22..f9f39f082 100644 --- a/src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs +++ b/src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs @@ -18,6 +18,7 @@ using Synapse.Api.Application.Commands.Documents; using Synapse.Api.Application.Commands.Events; using Synapse.Api.Application.Commands.Resources.Generic; +using Synapse.Api.Application.Commands.WorkflowInstances; using Synapse.Api.Application.Queries.Documents; using Synapse.Api.Application.Queries.Resources.Generic; using Synapse.Api.Application.Queries.Users; @@ -159,6 +160,9 @@ public static IServiceCollection AddApiCommands(this IServiceCollection services services.Add(new ServiceDescriptor(handlerServiceType, handlerImplementationType, serviceLifetime)); } + services.AddScoped, SuspendWorkflowInstanceCommandHandler>(); + services.AddScoped, ResumeWorkflowInstanceCommandHandler>(); + services.AddScoped, CancelWorkflowInstanceCommandHandler>(); services.AddScoped>, CreateDocumentCommandHandler>(); services.AddScoped>, UpdateDocumentCommandHandler>(); return services; diff --git a/src/api/Synapse.Api.Application/Queries/WorkflowInstances/ReadWorkflowInstanceLogsQuery.cs b/src/api/Synapse.Api.Application/Queries/WorkflowInstances/ReadWorkflowInstanceLogsQuery.cs index 34a8b8cc9..35883e72d 100644 --- a/src/api/Synapse.Api.Application/Queries/WorkflowInstances/ReadWorkflowInstanceLogsQuery.cs +++ b/src/api/Synapse.Api.Application/Queries/WorkflowInstances/ReadWorkflowInstanceLogsQuery.cs @@ -19,31 +19,21 @@ namespace Synapse.Api.Application.Queries.WorkflowInstances; /// /// Represents the query used to read the logs of the specified workflow instance /// -public class ReadWorkflowInstanceLogsQuery - : Query +/// The name of the to read the logs of +/// The namespace the to read the logs of belongs to +public class ReadWorkflowInstanceLogsQuery(string name, string @namespace) + : Query { - /// - /// Initializes a new - /// - /// The name of the to read the logs of - /// The namespace the to read the logs of belongs to - public ReadWorkflowInstanceLogsQuery(string name, string? @namespace) - { - if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name)); - this.Name = name; - this.Namespace = @namespace; - } - /// /// Gets the name of the to read the logs of /// - public string Name { get; } + public string Name { get; } = name; /// /// Gets the namespace the to read the logs of belongs to /// - public string? Namespace { get; } + public string Namespace { get; } = @namespace; } diff --git a/src/api/Synapse.Api.Application/Queries/WorkflowInstances/WatchWorkflowInstanceLogsQuery.cs b/src/api/Synapse.Api.Application/Queries/WorkflowInstances/WatchWorkflowInstanceLogsQuery.cs index dcc83ea6f..6d7a1661c 100644 --- a/src/api/Synapse.Api.Application/Queries/WorkflowInstances/WatchWorkflowInstanceLogsQuery.cs +++ b/src/api/Synapse.Api.Application/Queries/WorkflowInstances/WatchWorkflowInstanceLogsQuery.cs @@ -20,31 +20,21 @@ namespace Synapse.Api.Application.Queries.WorkflowInstances; /// /// Represents the query used to watch the logs of a specified /// -public class WatchWorkflowInstanceLogsQuery - : Query> +/// The name of the to watch the logs of +/// The namespace the to watch the logs of belongs to +public class WatchWorkflowInstanceLogsQuery(string name, string @namespace) + : Query> { - /// - /// Initializes a new - /// - /// The name of the to watch the logs of - /// The namespace the to watch the logs of belongs to - public WatchWorkflowInstanceLogsQuery(string name, string? @namespace) - { - if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name)); - this.Name = name; - this.Namespace = @namespace; - } - /// /// Gets the name of the to watch the logs of /// - public string Name { get; } + public string Name { get; } = name; /// /// Gets the namespace the to watch the logs of belongs to /// - public string? Namespace { get; } + public string? Namespace { get; } = @namespace; } diff --git a/src/api/Synapse.Api.Client.Core/Services/IWorkflowInstanceApiClient.cs b/src/api/Synapse.Api.Client.Core/Services/IWorkflowInstanceApiClient.cs index ac77c6e18..7961fbdb0 100644 --- a/src/api/Synapse.Api.Client.Core/Services/IWorkflowInstanceApiClient.cs +++ b/src/api/Synapse.Api.Client.Core/Services/IWorkflowInstanceApiClient.cs @@ -22,6 +22,33 @@ public interface IWorkflowInstanceApiClient : INamespacedResourceApiClient { + /// + /// Suspends the execution of the specified workflow instance + /// + /// The name of the workflow instance to suspend the execution of + /// The namespace the workflow instance to suspend the execution of belongs to + /// A + /// A new awaitable + Task SuspendAsync(string name, string @namespace, CancellationToken cancellationToken = default); + + /// + /// Resumes the execution of the specified workflow instance + /// + /// The name of the workflow instance to resume the execution of + /// The namespace the workflow instance to resume the execution of belongs to + /// A + /// A new awaitable + Task ResumeAsync(string name, string @namespace, CancellationToken cancellationToken = default); + + /// + /// Cancels the execution of the specified workflow instance + /// + /// The name of the workflow instance to cancel the execution of + /// The namespace the workflow instance to cancel the execution of belongs to + /// A + /// A new awaitable + Task CancelAsync(string name, string @namespace, CancellationToken cancellationToken = default); + /// /// Reads the logs of the specified /// diff --git a/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs b/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs index b8deddf8e..01c91dec1 100644 --- a/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs +++ b/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs @@ -117,10 +117,10 @@ public virtual async IAsyncEnumerable> WatchAsync using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false); var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync()); + using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false)); while (!streamReader.EndOfStream) { - var sseMessage = await streamReader.ReadLineAsync(); + var sseMessage = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(sseMessage)) continue; var json = sseMessage["data: ".Length..].Trim(); var e = JsonSerializer.Deserialize>(json)!; @@ -140,10 +140,10 @@ public virtual async IAsyncEnumerable> WatchAsync request.EnableWebAssemblyStreamingResponse(); var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync()); + using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false)); while (!streamReader.EndOfStream) { - var sseMessage = await streamReader.ReadLineAsync(); + var sseMessage = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(sseMessage)) continue; var json = sseMessage["data: ".Length..].Trim(); var e = JsonSerializer.Deserialize>(json)!; @@ -161,10 +161,10 @@ public virtual async IAsyncEnumerable> MonitorAsy using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false); var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync()); + using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false)); while (!streamReader.EndOfStream) { - var sseMessage = await streamReader.ReadLineAsync(); + var sseMessage = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(sseMessage)) continue; var json = sseMessage["data: ".Length..].Trim(); var e = JsonSerializer.Deserialize>(json)!; @@ -181,10 +181,10 @@ public virtual async IAsyncEnumerable> MonitorAsy using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false); var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync()); + using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false)); while (!streamReader.EndOfStream) { - var sseMessage = await streamReader.ReadLineAsync(); + var sseMessage = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(sseMessage)) continue; var json = sseMessage["data: ".Length..].Trim(); var e = JsonSerializer.Deserialize>(json)!; diff --git a/src/api/Synapse.Api.Client.Http/Services/WorkflowInstanceHttpApiClient.cs b/src/api/Synapse.Api.Client.Http/Services/WorkflowInstanceHttpApiClient.cs index 7e721a5c1..1c613cd9e 100644 --- a/src/api/Synapse.Api.Client.Http/Services/WorkflowInstanceHttpApiClient.cs +++ b/src/api/Synapse.Api.Client.Http/Services/WorkflowInstanceHttpApiClient.cs @@ -23,6 +23,39 @@ public class WorkflowInstanceHttpApiClient(IServiceProvider serviceProvider, ILo : ResourceHttpApiClient(serviceProvider, logger, options, jsonSerializer, httpClient), IWorkflowInstanceApiClient { + /// + public virtual async Task SuspendAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var resource = new WorkflowInstance(); + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/suspend"; + using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Put, uri), cancellationToken).ConfigureAwait(false); + using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); + } + + /// + public virtual async Task ResumeAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var resource = new WorkflowInstance(); + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/resume"; + using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Put, uri), cancellationToken).ConfigureAwait(false); + using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); + } + + /// + public virtual async Task CancelAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var resource = new WorkflowInstance(); + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/cancel"; + using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Put, uri), cancellationToken).ConfigureAwait(false); + using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); + } + /// public virtual async Task ReadLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default) { diff --git a/src/api/Synapse.Api.Http/ClusterResourceController.cs b/src/api/Synapse.Api.Http/ClusterResourceController.cs index 3428bfc7d..d7c985964 100644 --- a/src/api/Synapse.Api.Http/ClusterResourceController.cs +++ b/src/api/Synapse.Api.Http/ClusterResourceController.cs @@ -109,9 +109,9 @@ public virtual async Task WatchResourcesUsingSSE(string? labelSel this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - await foreach (var e in response.Data!) + await foreach (var e in response.Data!.WithCancellation(cancellationToken)) { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; + var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); } @@ -149,9 +149,9 @@ public virtual async Task MonitorResourceUsingSSE(string name, Ca this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - await foreach (var e in response.Data!) + await foreach (var e in response.Data!.WithCancellation(cancellationToken)) { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; + var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); } diff --git a/src/api/Synapse.Api.Http/Controllers/WorkflowInstancesController.cs b/src/api/Synapse.Api.Http/Controllers/WorkflowInstancesController.cs index 4496fa85b..f58acfd87 100644 --- a/src/api/Synapse.Api.Http/Controllers/WorkflowInstancesController.cs +++ b/src/api/Synapse.Api.Http/Controllers/WorkflowInstancesController.cs @@ -12,6 +12,7 @@ // limitations under the License. using Neuroglia.Data.Infrastructure; +using Synapse.Api.Application.Commands.WorkflowInstances; using Synapse.Api.Application.Queries.WorkflowInstances; namespace Synapse.Api.Http.Controllers; @@ -26,6 +27,54 @@ public class WorkflowInstancesController(IMediator mediator, IJsonSerializer jso : NamespacedResourceController(mediator, jsonSerializer) { + /// + /// Suspends the execution of the specified workflow instance + /// + /// The name of the workflow instance to suspend + /// The namespace the workflow instance to suspend belongs to + /// A + /// A new + [HttpPut("{namespace}/{name}/suspend")] + [ProducesResponseType((int)HttpStatusCode.Accepted)] + [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] + public async Task SuspendWorkflowInstance(string name, string @namespace, CancellationToken cancellationToken = default) + { + if(!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState); + return this.Process(await this.Mediator.ExecuteAsync(new SuspendWorkflowInstanceCommand(name, @namespace)).ConfigureAwait(false)); + } + + /// + /// Resumes the execution of the specified workflow instance + /// + /// The name of the workflow instance to resume + /// The namespace the workflow instance to resume belongs to + /// A + /// A new + [HttpPut("{namespace}/{name}/resume")] + [ProducesResponseType((int)HttpStatusCode.Accepted)] + [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] + public async Task ResumeWorkflowInstance(string name, string @namespace, CancellationToken cancellationToken = default) + { + if (!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState); + return this.Process(await this.Mediator.ExecuteAsync(new ResumeWorkflowInstanceCommand(name, @namespace)).ConfigureAwait(false)); + } + + /// + /// Cancels the execution of the specified workflow instance + /// + /// The name of the workflow instance to cancel + /// The namespace the workflow instance to cancel belongs to + /// A + /// A new + [HttpPut("{namespace}/{name}/cancel")] + [ProducesResponseType((int)HttpStatusCode.Accepted)] + [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] + public async Task CancelWorkflowInstance(string name, string @namespace, CancellationToken cancellationToken = default) + { + if (!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState); + return this.Process(await this.Mediator.ExecuteAsync(new CancelWorkflowInstanceCommand(name, @namespace)).ConfigureAwait(false)); + } + /// /// Gets the logs produced by workflow instance with the the specified name and namespace /// @@ -36,7 +85,7 @@ public class WorkflowInstancesController(IMediator mediator, IJsonSerializer jso [HttpGet("{namespace}/{name}/logs")] [ProducesResponseType(typeof(Resource), (int)HttpStatusCode.Created)] [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] - public async Task GetResourceLogs(string name, string @namespace, CancellationToken cancellationToken = default) + public async Task GetWorkflowInstanceLogs(string name, string @namespace, CancellationToken cancellationToken = default) { if (!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState); return this.Process(await this.Mediator.ExecuteAsync(new ReadWorkflowInstanceLogsQuery(name, @namespace), cancellationToken).ConfigureAwait(false)); @@ -52,7 +101,7 @@ public async Task GetResourceLogs(string name, string @namespace, [HttpGet("{namespace}/{name}/logs/watch")] [ProducesResponseType(typeof(IAsyncEnumerable), (int)HttpStatusCode.OK)] [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] - public virtual async Task> WatchResourceLogs(string name, string @namespace, CancellationToken cancellationToken = default) + public virtual async Task> WatchWorkflowInstanceLogs(string name, string @namespace, CancellationToken cancellationToken = default) { var response = await this.Mediator.ExecuteAsync(new WatchWorkflowInstanceLogsQuery(name, @namespace), cancellationToken).ConfigureAwait(false); return response.Data!; diff --git a/src/api/Synapse.Api.Http/NamespacedResourceController.cs b/src/api/Synapse.Api.Http/NamespacedResourceController.cs index 65ee55980..f9bad3771 100644 --- a/src/api/Synapse.Api.Http/NamespacedResourceController.cs +++ b/src/api/Synapse.Api.Http/NamespacedResourceController.cs @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia.Data.Infrastructure.ResourceOriented; + namespace Synapse.Api.Http; /// @@ -120,20 +122,11 @@ public virtual async Task ListResources(string @namespace, string [HttpGet("watch")] [ProducesResponseType(typeof(IAsyncEnumerable), (int)HttpStatusCode.OK)] [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] - public virtual async Task WatchResources(string? labelSelector = null, CancellationToken cancellationToken = default) + public virtual async Task>> WatchResources(string? labelSelector = null, CancellationToken cancellationToken = default) { - if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) return this.InvalidLabelSelector(labelSelector!); + if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) throw new Exception($"Invalid label selector '{labelSelector}'"); var response = await this.Mediator.ExecuteAsync(new WatchResourcesQuery(null, labelSelectors), cancellationToken).ConfigureAwait(false); - this.Response.Headers.ContentType = "text/event-stream"; - this.Response.Headers.CacheControl = "no-cache"; - this.Response.Headers.Connection = "keep-alive"; - await foreach (var e in response.Data!) - { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; - await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); - await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - } - return this.Ok(); + return response.Data!; } /// @@ -171,9 +164,9 @@ public virtual async Task WatchResourcesUsingSSE(string @namespac this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - await foreach (var e in response.Data!) + await foreach (var e in response.Data!.WithCancellation(cancellationToken)) { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; + var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); } @@ -213,9 +206,9 @@ public virtual async Task MonitorResourceUsingSSE(string name, st this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - await foreach(var e in response.Data!) + await foreach(var e in response.Data!.WithCancellation(cancellationToken)) { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; + var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); } diff --git a/src/dashboard/Synapse.Dashboard/Pages/Workflows/Create/Store.cs b/src/dashboard/Synapse.Dashboard/Pages/Workflows/Create/Store.cs index 6d7cf7507..2886b162e 100644 --- a/src/dashboard/Synapse.Dashboard/Pages/Workflows/Create/Store.cs +++ b/src/dashboard/Synapse.Dashboard/Pages/Workflows/Create/Store.cs @@ -538,24 +538,16 @@ public override async Task InitializeAsync() { await this.GetWorkflowDefinitionAsync(workflow.ns, workflow.name); }, cancellationToken: this.CancellationTokenSource.Token); - this.WorkflowDefinitionText.Where(document => !string.IsNullOrEmpty(document)).Throttle(new(100)).SubscribeAsync(async (document) => { - if (string.IsNullOrWhiteSpace(document)) - { - return; - } + this.WorkflowDefinitionText.Where(document => !string.IsNullOrEmpty(document)).Throttle(new(100)).SubscribeAsync(async (document) => + { + if (string.IsNullOrWhiteSpace(document)) return; var currentDslVersion = this.Get(state => state.DslVersion); var versionExtractor = new Regex("'?\"?(dsl|DSL)'?\"?\\s*:\\s*'?\"?([\\w\\.\\-\\+]*)'?\"?"); var match = versionExtractor.Match(document); - if (match == null) - { - return; - } + if (match == null) return; var documentDslVersion = match.Groups[2].Value; - if (documentDslVersion == currentDslVersion) - { - return; - } - await this.SetValidationSchema("v" + documentDslVersion); + if (documentDslVersion == currentDslVersion) return; + await this.SetValidationSchema(documentDslVersion); }, cancellationToken: this.CancellationTokenSource.Token); await base.InitializeAsync(); } diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs index 4b1ac2f2f..f89105ab9 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs @@ -33,6 +33,7 @@ public class WorkflowInstanceHandler(ILogger logger, IO bool _persistingLogs; bool _disposed; + bool _suspended; /// /// Gets the service used to perform logging @@ -70,9 +71,9 @@ public class WorkflowInstanceHandler(ILogger logger, IO protected IResourceMonitor WorkflowInstance { get; } = workflowInstance; /// - /// Gets the handler's subscription to the handled + /// Gets the handler's subscription used to monitor changes to the handled 's correlation contexts /// - protected IDisposable? WorkflowInstanceSubscription { get; set; } + protected IDisposable? CorrelationContextSubscription { get; set; } /// /// Gets the that represents the subscription to the logs produced by the 's @@ -106,12 +107,17 @@ public class WorkflowInstanceHandler(ILogger logger, IO /// A new awaitable public virtual async Task HandleAsync(CancellationToken cancellationToken = default) { - this.WorkflowInstanceSubscription = this.WorkflowInstance + this.CorrelationContextSubscription = this.WorkflowInstance .Where(e => e.Type == ResourceWatchEventType.Updated) .Select(e => e.Resource.Status?.Correlation?.Contexts) .Scan((Previous: (EquatableDictionary?)null, Current: (EquatableDictionary?)null), (accumulator, current) => (accumulator.Current ?? [], current)) .Where(v => v.Current?.Count > v.Previous?.Count) //ensures we are not handling changes in a circular loop: if length of current is smaller than previous, it means a context has been processed .SubscribeAsync(async e => await OnCorrelationContextsChangedAsync(e, cancellationToken).ConfigureAwait(false)); + this.WorkflowInstance.Where(e => e.Type == ResourceWatchEventType.Updated) + .Select(e => e.Resource.Status?.Phase) + .DistinctUntilChanged() + .Where(ShouldResumeExecution) + .SubscribeAsync(async _ => await this.StartProcessAsync(cancellationToken).ConfigureAwait(false)); if (string.IsNullOrWhiteSpace(this.WorkflowInstance.Resource.Status?.Phase) || this.WorkflowInstance.Resource.Status.Phase == WorkflowInstanceStatusPhase.Pending || this.WorkflowInstance.Resource.Status.Phase == WorkflowInstanceStatusPhase.Running) @@ -157,6 +163,21 @@ protected virtual async Task GetServiceAccountAsync(Cancellation ?? throw new NullReferenceException($"Failed to find the default {nameof(ServiceAccount)} resource. Make sure the resource database is properly initialized."); } + /// + /// Determines whether or not the handler should resume the execution of the handled workflow instance + /// + /// The handled workflow instance's current status phase + /// A boolean indicating whether or not the handler should resume the execution of the handled workflow instance + protected virtual bool ShouldResumeExecution(string? statusPhase) + { + if (statusPhase == WorkflowInstanceStatusPhase.Waiting) + { + this._suspended = true; + return false; + } + return this._suspended && statusPhase == WorkflowInstanceStatusPhase.Running; + } + /// /// Handles changes to the the handled workflow instance's correlation contexts /// @@ -194,7 +215,7 @@ protected virtual async ValueTask DisposeAsync(bool disposing) { if (!disposing || this._disposed) return; await this.WorkflowInstance.DisposeAsync().ConfigureAwait(false); - this.WorkflowInstanceSubscription?.Dispose(); + this.CorrelationContextSubscription?.Dispose(); this.LogSubscription?.Dispose(); if (this.Process != null) await this.Process.DisposeAsync().ConfigureAwait(false); if (this.LogBatchTimer != null) await this.LogBatchTimer.DisposeAsync().ConfigureAwait(false); @@ -218,7 +239,7 @@ protected virtual void Dispose(bool disposing) { if (!disposing || this._disposed) return; this.WorkflowInstance.Dispose(); - this.WorkflowInstanceSubscription?.Dispose(); + this.CorrelationContextSubscription?.Dispose(); this.LogSubscription?.Dispose(); this.Process?.Dispose(); this.LogBatchTimer?.Dispose(); diff --git a/src/runner/Synapse.Runner/Program.cs b/src/runner/Synapse.Runner/Program.cs index e97eae2cd..5d9269faf 100644 --- a/src/runner/Synapse.Runner/Program.cs +++ b/src/runner/Synapse.Runner/Program.cs @@ -39,6 +39,7 @@ }); }); services.AddSerialization(); + services.AddJsonSerializer(options => options.DefaultBufferSize = 128); services.AddJQExpressionEvaluator(); services.AddJavaScriptExpressionEvaluator(); services.AddNodeJSScriptExecutor(); diff --git a/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs index 0e9e7ea5e..cafe21eee 100644 --- a/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs @@ -58,12 +58,14 @@ protected override async Task CreateTaskExecutorAsync(TaskInstanc /// protected override async Task DoExecuteAsync(CancellationToken cancellationToken) { - var last = await this.Task.GetSubTasksAsync(cancellationToken).LastOrDefaultAsync(cancellationToken).ConfigureAwait(false); + var subtasks = this.Task.GetSubTasksAsync(cancellationToken); + var last = await subtasks.LastOrDefaultAsync(cancellationToken).ConfigureAwait(false); var nextDefinition = last == null ? this.Tasks.First() - : last.IsOperative + : last.Status == null || last.IsOperative || last.Status == TaskInstanceStatus.Suspended ? this.Task.Definition.Do.FirstOrDefault(e => e.Key == last.Name) ?? throw new NullReferenceException($"Failed to find a task with the specified name '{last.Name}' at '{this.Task.Instance.Reference}'") : this.Task.Definition.Do.GetTaskAfter(last); + if (last != null && (last.Status == null || last.IsOperative || last.Status == TaskInstanceStatus.Suspended)) last = await subtasks.LastOrDefaultAsync(t => last.Status != null && !t.IsOperative && t.Status != TaskInstanceStatus.Suspended).ConfigureAwait(false); if (nextDefinition == null) { await this.SetResultAsync(last!.OutputReference, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); diff --git a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs index c08873e39..ac389ecf1 100644 --- a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs @@ -45,17 +45,26 @@ public class WorkflowProcessExecutor(IServiceProvider serviceProvider, ILogger protected WorkflowProcessDefinition ProcessDefinition => this.Task.Definition.Run.Workflow!; + /// + /// Gets the subflow + /// + protected WorkflowInstance? Subflow { get; private set; } + + /// + /// Gets a boolean indicating whether or not the execution of the subflow is being cancelled + /// + protected bool Cancelling { get; private set; } + /// protected override async Task DoExecuteAsync(CancellationToken cancellationToken) { var hash = Convert.ToHexString(MD5.HashData(Encoding.UTF8.GetBytes($"{Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name)}{this.Task.Instance.Reference}"))).ToLowerInvariant(); var workflowInstanceName = $"{this.ProcessDefinition.Name}-{hash}"; var workflowInstanceNamespace = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace)!; - WorkflowInstance workflowInstance; try { - workflowInstance = await this.Api.WorkflowInstances.GetAsync(workflowInstanceName, workflowInstanceNamespace, cancellationToken).ConfigureAwait(false); - switch (workflowInstance.Status?.Phase) + this.Subflow = await this.Api.WorkflowInstances.GetAsync(workflowInstanceName, workflowInstanceNamespace, cancellationToken).ConfigureAwait(false); + switch (this.Subflow.Status?.Phase) { case WorkflowInstanceStatusPhase.Cancelled: await this.SetErrorAsync(new() @@ -63,14 +72,14 @@ await this.SetErrorAsync(new() Type = ErrorType.Runtime, Status = ErrorStatus.Runtime, Title = ErrorTitle.Runtime, - Detail = $"The execution of workflow instance '{workflowInstance.GetQualifiedName()}' has been cancelled" + Detail = $"The execution of workflow instance '{this.Subflow.GetQualifiedName()}' has been cancelled" }, cancellationToken).ConfigureAwait(false); return; case WorkflowInstanceStatusPhase.Faulted: - await this.SetErrorAsync(workflowInstance.Status.Error!, cancellationToken).ConfigureAwait(false); + await this.SetErrorAsync(this.Subflow.Status.Error!, cancellationToken).ConfigureAwait(false); return; case WorkflowInstanceStatusPhase.Completed: - var output = string.IsNullOrWhiteSpace(workflowInstance.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(workflowInstance.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content; + var output = string.IsNullOrWhiteSpace(this.Subflow.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(this.Subflow.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content; await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); return; } @@ -82,7 +91,7 @@ await this.SetErrorAsync(new() ? workflow.Spec.Versions.Last() : workflow.Spec.Versions.Get(this.ProcessDefinition.Version) ?? throw new NullReferenceException($"Failed to find version '{this.ProcessDefinition.Version}' of workflow '{workflow.GetQualifiedName()}'"); var input = await this.Task.Workflow.Expressions.EvaluateAsync>(this.ProcessDefinition.Input ?? new(), this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken: cancellationToken).ConfigureAwait(false); - workflowInstance = new WorkflowInstance() + this.Subflow = new WorkflowInstance() { Metadata = new() { @@ -100,30 +109,52 @@ await this.SetErrorAsync(new() Input = input } }; - workflowInstance = await this.Api.WorkflowInstances.CreateAsync(workflowInstance, cancellationToken).ConfigureAwait(false); + this.Subflow = await this.Api.WorkflowInstances.CreateAsync(this.Subflow, cancellationToken).ConfigureAwait(false); } - await foreach(var watchEvent in this.Api.WorkflowInstances.MonitorAsync(workflowInstance.GetName(), workflowInstance.GetNamespace()!, cancellationToken)) + await foreach(var watchEvent in this.Api.WorkflowInstances.MonitorAsync(this.Subflow.GetName(), this.Subflow.GetNamespace()!, cancellationToken)) { switch (watchEvent.Resource.Status?.Phase) { case WorkflowInstanceStatusPhase.Cancelled: - await this.SetErrorAsync(new() + if (!this.Cancelling) { - Type = ErrorType.Runtime, - Status = ErrorStatus.Runtime, - Title = ErrorTitle.Runtime, - Detail = $"The execution of workflow instance '{workflowInstance.GetQualifiedName()}' has been cancelled" - }, cancellationToken).ConfigureAwait(false); + await this.SetErrorAsync(new() + { + Type = ErrorType.Runtime, + Status = ErrorStatus.Runtime, + Title = ErrorTitle.Runtime, + Detail = $"The execution of workflow instance '{this.Subflow.GetQualifiedName()}' has been cancelled" + }, cancellationToken).ConfigureAwait(false); + } break; case WorkflowInstanceStatusPhase.Faulted: - await this.SetErrorAsync(workflowInstance.Status!.Error!, cancellationToken).ConfigureAwait(false); + await this.SetErrorAsync(watchEvent.Resource.Status.Error!, cancellationToken).ConfigureAwait(false); return; case WorkflowInstanceStatusPhase.Completed: var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content; await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); return; } + if (this.Cancelling) break; + } + } + + /// + public override async Task CancelAsync(CancellationToken cancellationToken = default) + { + if(this.Subflow != null) + { + try + { + this.Cancelling = true; + await this.Api.WorkflowInstances.CancelAsync(this.Subflow.GetName(), this.Subflow.GetNamespace()!, cancellationToken).ConfigureAwait(false); + } + catch(Exception ex) + { + this.Logger.LogError("An error occurred while cancelling the subflow instance '{subflow}': {ex}", this.Subflow.GetQualifiedName(), ex); + } } + await base.CancelAsync(cancellationToken).ConfigureAwait(false); } } \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Services/RunnerApplication.cs b/src/runner/Synapse.Runner/Services/RunnerApplication.cs index 34514b586..86dfd9f21 100644 --- a/src/runner/Synapse.Runner/Services/RunnerApplication.cs +++ b/src/runner/Synapse.Runner/Services/RunnerApplication.cs @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia.Data.Infrastructure.ResourceOriented; + namespace Synapse.Runner.Services; /// @@ -56,6 +58,11 @@ internal class RunnerApplication(IServiceProvider serviceProvider, IHostApplicat /// protected RunnerOptions Options { get; } = options.Value; + /// + /// Gets the used to monitor the workflow instance to run + /// + protected IObservable>? Events { get; private set; } + /// /// Gets the service used to execute the workflow instance to run /// @@ -88,15 +95,19 @@ protected virtual async Task RunAsync(CancellationToken cancellationToken) var expressionEvaluator = this.ServiceProvider.GetRequiredService().GetEvaluator(expressionLanguage) ?? throw new NullReferenceException($"Failed to find an expression evaluator for the language '{expressionLanguage}' defined by workflow '{instance.Spec.Definition.Namespace}.{instance.Spec.Definition.Name}:{instance.Spec.Definition.Version}'"); var context = ActivatorUtilities.CreateInstance(this.ServiceProvider, expressionEvaluator, definition, instance); + this.Events = this.ApiClient.WorkflowInstances.MonitorAsync(this.Options.Workflow.GetInstanceName(), this.Options.Workflow.GetInstanceNamespace(), cancellationToken).ToObservable(); + this.Events + .Where(e => e.Type == ResourceWatchEventType.Updated && e.Resource.Status?.Phase != context.Instance.Status?.Phase) + .Select(e => e.Resource.Status?.Phase) + .SubscribeAsync(async phase => await this.OnHandleStatusPhaseChangedAsync(phase, cancellationToken).ConfigureAwait(false), cancellationToken: cancellationToken); this.Executor = ActivatorUtilities.CreateInstance(this.ServiceProvider, context); await this.Executor.ExecuteAsync(cancellationToken).ConfigureAwait(false); - this.ApplicationLifetime.StopApplication(); } catch(Exception ex) { this.Logger.LogError("An error occurred while running the specified workflow instance: {ex}", ex); - this.ApplicationLifetime.StopApplication(); } + this.ApplicationLifetime.StopApplication(); } /// @@ -109,6 +120,41 @@ public virtual async Task StopAsync(CancellationToken cancellationToken) } } + protected virtual async Task OnHandleStatusPhaseChangedAsync(string? phase, CancellationToken cancellationToken) + { + switch (phase) + { + case WorkflowInstanceStatusPhase.Waiting: + await this.OnSuspendAsync(cancellationToken).ConfigureAwait(false); + break; + case WorkflowInstanceStatusPhase.Cancelled: + await this.OnCancelAsync(cancellationToken).ConfigureAwait(false); + break; + default: + return; + } + } + + protected virtual async Task OnSuspendAsync(CancellationToken cancellationToken) + { + if (this.Executor == null) + { + this.ApplicationLifetime.StopApplication(); + return; + } + await this.Executor.SuspendAsync(cancellationToken).ConfigureAwait(false); + } + + protected virtual async Task OnCancelAsync(CancellationToken cancellationToken) + { + if (this.Executor == null) + { + this.ApplicationLifetime.StopApplication(); + return; + } + await this.Executor.CancelAsync(cancellationToken).ConfigureAwait(false); + } + /// /// Disposes of the /// diff --git a/src/runner/Synapse.Runner/Services/TaskExecutor.cs b/src/runner/Synapse.Runner/Services/TaskExecutor.cs index 7189a1bbd..78c7687c6 100644 --- a/src/runner/Synapse.Runner/Services/TaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/TaskExecutor.cs @@ -115,6 +115,7 @@ public abstract class TaskExecutor(IServiceProvider serviceProvider /// public virtual async Task InitializeAsync(CancellationToken cancellationToken = default) { + if (this.Task.Instance.Status != null && !this.Task.Instance.IsOperative) return; try { await this.DoInitializeAsync(cancellationToken).ConfigureAwait(false); @@ -157,8 +158,8 @@ await this.SetErrorAsync(new Error() [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "VSTHRD003:Avoid awaiting foreign Tasks", Justification = "")] public virtual async Task ExecuteAsync(CancellationToken cancellationToken = default) { - this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); if (this.Task.Instance.Status != null && !this.Task.Instance.IsOperative) return; + this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); if (this.Task.Definition.Timeout?.After != null) { var duration = this.Task.Definition.Timeout.After.ToTimeSpan(); @@ -286,11 +287,17 @@ protected virtual async Task AfterExecuteAsync(CancellationToken cancellationTok /// public virtual async Task SuspendAsync(CancellationToken cancellationToken = default) { + foreach (var executor in this.Executors) + { + await executor.SuspendAsync(cancellationToken).ConfigureAwait(false); + this.Executors.Remove(executor); + } this.Stopwatch.Stop(); await this.DoSuspendAsync(cancellationToken).ConfigureAwait(false); await this.Task.SuspendAsync(cancellationToken).ConfigureAwait(false); this.Subject.OnNext(new TaskLifeCycleEvent(TaskLifeCycleEventType.Suspended)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult(); + this.CancellationTokenSource?.Cancel(); } /// @@ -404,10 +411,17 @@ public virtual async Task SkipAsync(object? result, string? then = FlowDirective /// public virtual async Task CancelAsync(CancellationToken cancellationToken = default) { + foreach(var executor in this.Executors) + { + await executor.CancelAsync(cancellationToken).ConfigureAwait(false); + this.Executors.Remove(executor); + } + this.Stopwatch.Stop(); await this.Task.CancelAsync(cancellationToken).ConfigureAwait(false); await this.DoCancelAsync(cancellationToken).ConfigureAwait(false); this.Subject.OnNext(new TaskLifeCycleEvent(TaskLifeCycleEventType.Cancelled)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetCanceled(cancellationToken); + this.CancellationTokenSource?.Cancel(); } /// diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs index 3dc42d29e..af13cf913 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs @@ -20,6 +20,7 @@ using Synapse.Events.Tasks; using Synapse.Events.Workflows; using System.Net.Mime; +using System.Runtime.CompilerServices; namespace Synapse.Runner.Services; @@ -272,9 +273,9 @@ public virtual async Task ResumeAsync(CancellationToken cancellationToken = defa this.Instance.Status.StartedAt ??= DateTimeOffset.Now; this.Instance.Status.Runs ??= []; this.Instance.Status.Runs.Add(new() { StartedAt = DateTimeOffset.Now }); - this.Instance.Status.ContextReference ??= (await this.Documents.CreateAsync(this.Instance.GetQualifiedName(), this.ContextData, cancellationToken).ConfigureAwait(false)).Id; var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(originalInstance, this.Instance); this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false); + this.ContextData = (await this.Api.Documents.GetAsync(this.Instance.Status!.ContextReference, cancellationToken).ConfigureAwait(false)).Content.ConvertTo>()!; if (this.Options.CloudEvents.PublishLifecycleEvents) await this.Api.Events.PublishAsync(new CloudEvent() { SpecVersion = CloudEventSpecVersion.V1.Version, @@ -697,6 +698,7 @@ public virtual async Task SkipAsync(TaskInstance task, object? res /// public virtual async Task SuspendAsync(CancellationToken cancellationToken = default) { + if (this.Instance.Status?.Phase == WorkflowInstanceStatusPhase.Waiting) return; using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); var originalInstance = this.Instance.Clone(); this.Instance.Status ??= new(); @@ -757,6 +759,7 @@ public virtual async Task SuspendAsync(TaskInstance task, Cancella /// public virtual async Task CancelAsync(CancellationToken cancellationToken = default) { + if (this.Instance.Status?.Phase == WorkflowInstanceStatusPhase.Cancelled) return; using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); var originalInstance = this.Instance.Clone(); this.Instance.Status ??= new(); diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs index 7e0ead1d6..20a133c3c 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs @@ -12,6 +12,7 @@ // limitations under the License. using Neuroglia.Data.Infrastructure.ResourceOriented; +using ServerlessWorkflow.Sdk.Models; namespace Synapse.Runner.Services; @@ -63,7 +64,7 @@ public class WorkflowExecutor(IServiceProvider serviceProvider, ILogger /// Gets the used to stream s /// - protected Subject Subject { get; } = new(); + protected Subject LifeCycleEvents { get; } = new(); /// /// Gets a containing all child s @@ -131,7 +132,7 @@ public virtual async Task ExecuteAsync(CancellationToken cancellationToken = def protected virtual async Task StartAsync(CancellationToken cancellationToken) { await this.Workflow.StartAsync(cancellationToken).ConfigureAwait(false); - var taskDefinition = this.Workflow.Definition.Do.First(); //todo: we might add much more complex rules here (event based, etc) + var taskDefinition = this.Workflow.Definition.Do.First(); var task = await this.Workflow.CreateTaskAsync(taskDefinition.Value, taskDefinition.Key, this.Workflow.Instance.Spec.Input ?? [], cancellationToken: cancellationToken).ConfigureAwait(false); var executor = await this.CreateTaskExecutorAsync(task, taskDefinition.Value, this.Workflow.ContextData, this.Workflow.Arguments, cancellationToken).ConfigureAwait(false); await executor.ExecuteAsync(cancellationToken).ConfigureAwait(false); @@ -144,17 +145,27 @@ protected virtual async Task StartAsync(CancellationToken cancellationToken) /// A new awaitable protected virtual async Task ResumeAsync(CancellationToken cancellationToken) { - //todo await this.Workflow.ResumeAsync(cancellationToken).ConfigureAwait(false); + var task = this.Workflow.Instance.Status?.Tasks?.FirstOrDefault(t => string.IsNullOrWhiteSpace(t.ParentId) && (t.Status == null || t.IsOperative || t.Status == TaskInstanceStatus.Suspended)); + if (task == null) return; + var taskDefinition = this.Workflow.Definition.GetComponent(task.Reference.OriginalString); + var executor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Workflow.ContextData, this.Workflow.Arguments, cancellationToken).ConfigureAwait(false); + await executor.ExecuteAsync(cancellationToken).ConfigureAwait(false); } /// public virtual async Task SuspendAsync(CancellationToken cancellationToken = default) { + foreach (var executor in this.Executors) + { + await executor.SuspendAsync(cancellationToken).ConfigureAwait(false); + this.Executors.Remove(executor); + } this.Stopwatch.Stop(); await this.Workflow.SuspendAsync(cancellationToken).ConfigureAwait(false); - this.Subject.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Suspended)); + this.LifeCycleEvents.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Suspended)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult(); + this.CancellationTokenSource?.Cancel(); } /// @@ -167,8 +178,8 @@ protected virtual async Task SetErrorAsync(Error error, CancellationToken cancel { this.Stopwatch.Stop(); await this.Workflow.SetErrorAsync(error, cancellationToken).ConfigureAwait(false); - this.Subject.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Faulted)); - this.Subject.OnError(new ErrorRaisedException(error)); + this.LifeCycleEvents.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Faulted)); + this.LifeCycleEvents.OnError(new ErrorRaisedException(error)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult(); } @@ -184,21 +195,28 @@ protected virtual async Task SetResultAsync(object? result, CancellationToken ca this.Stopwatch.Stop(); var output = result; await this.Workflow.SetResultAsync(output, cancellationToken).ConfigureAwait(false); - this.Subject.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Completed)); - this.Subject.OnCompleted(); + this.LifeCycleEvents.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Completed)); + this.LifeCycleEvents.OnCompleted(); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult(); } /// public virtual async Task CancelAsync(CancellationToken cancellationToken = default) { + foreach(var executor in this.Executors) + { + await executor.CancelAsync(cancellationToken).ConfigureAwait(false); + this.Executors.Remove(executor); + } + this.Stopwatch.Stop(); await this.Workflow.CancelAsync(cancellationToken).ConfigureAwait(false); - this.Subject.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Cancelled)); + this.LifeCycleEvents.OnNext(new WorkflowLifeCycleEvent(WorkflowLifeCycleEventType.Cancelled)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetCanceled(cancellationToken); + this.CancellationTokenSource?.Cancel(); } /// - public virtual IDisposable Subscribe(IObserver observer) => this.Subject.Subscribe(observer); + public virtual IDisposable Subscribe(IObserver observer) => this.LifeCycleEvents.Subscribe(observer); /// /// Creates a new for the specified @@ -232,8 +250,6 @@ protected virtual async Task CreateTaskExecutorAsync(TaskInstance /// Handles the timeout of the to execute /// /// The timer's state - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "VSTHRD100:Avoid async void methods", Justification = "")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "VSTHRD200:Use \"Async\" suffix for async methods", Justification = "")] protected virtual async void OnTimeoutAsync(object? state) { await this.SetErrorAsync(new Error() @@ -295,7 +311,7 @@ protected virtual async ValueTask DisposeAsync(bool disposing) try { this.TaskCompletionSource.SetCanceled(); } catch { } this.CancellationTokenSource?.Dispose(); if (this.Timer != null) await this.Timer.DisposeAsync().ConfigureAwait(false); - this.Subject.Dispose(); + this.LifeCycleEvents.Dispose(); this._disposed = true; } @@ -318,7 +334,7 @@ protected virtual void Dispose(bool disposing) try { this.TaskCompletionSource.SetCanceled(); } catch { } this.CancellationTokenSource?.Dispose(); this.Timer?.Dispose(); - this.Subject.Dispose(); + this.LifeCycleEvents.Dispose(); this._disposed = true; } diff --git a/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs b/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs index 6cd2c82ae..6404566c2 100644 --- a/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs +++ b/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs @@ -24,32 +24,34 @@ internal class MockNamespacedResourceApiClient(IResourceRepository re where TResource : class, IResource, new() { - public Task CreateAsync(TResource resource, CancellationToken cancellationToken = default) => resources.AddAsync(resource, false, cancellationToken); + protected IResourceRepository Resources { get; } = resources; - public Task DeleteAsync(string name, string @namespace, CancellationToken cancellationToken = default) => resources.RemoveAsync(name, @namespace, false, cancellationToken); + public Task CreateAsync(TResource resource, CancellationToken cancellationToken = default) => this.Resources.AddAsync(resource, false, cancellationToken); - public Task> ListAsync(string? @namespace, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => Task.FromResult(resources.GetAllAsync(@namespace, labelSelectors, cancellationToken: cancellationToken)!); + public Task GetAsync(string name, string @namespace, CancellationToken cancellationToken = default) => this.Resources.GetAsync(name, @namespace, cancellationToken)!; + + public async Task GetDefinitionAsync(CancellationToken cancellationToken = default) => ((ResourceDefinition)(await this.Resources.GetDefinitionAsync(cancellationToken))!)!; + + public Task> ListAsync(string? @namespace, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => Task.FromResult(this.Resources.GetAllAsync(@namespace, labelSelectors, cancellationToken: cancellationToken)!); public async IAsyncEnumerable> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, [EnumeratorCancellation]CancellationToken cancellationToken = default) { - await foreach (var e in (await resources.WatchAsync(@namespace, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e; + await foreach (var e in (await this.Resources.WatchAsync(@namespace, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e; } public async IAsyncEnumerable> MonitorAsync(string name, string @namespace, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await foreach (var e in (await resources.MonitorAsync(name, @namespace, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e; + await foreach (var e in (await this.Resources.MonitorAsync(name, @namespace, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e; } - public Task GetAsync(string name, string @namespace, CancellationToken cancellationToken = default) => resources.GetAsync(name, @namespace, cancellationToken)!; - - public async Task GetDefinitionAsync(CancellationToken cancellationToken = default) => ((ResourceDefinition)(await resources.GetDefinitionAsync(cancellationToken))!)!; + public Task PatchAsync(string name, string @namespace, Patch patch, string? resourceVersion = null, CancellationToken cancellationToken = default) => this.Resources.PatchAsync(patch, name, @namespace, resourceVersion, false, cancellationToken); - public Task PatchAsync(string name, string @namespace, Patch patch, string? resourceVersion = null, CancellationToken cancellationToken = default) => resources.PatchAsync(patch, name, @namespace, resourceVersion, false, cancellationToken); + public Task PatchStatusAsync(string name, string @namespace, Patch patch, string? resourceVersion = null, CancellationToken cancellationToken = default) => this.Resources.PatchStatusAsync(patch, name, @namespace, resourceVersion, false, cancellationToken); - public Task PatchStatusAsync(string name, string @namespace, Patch patch, string? resourceVersion = null, CancellationToken cancellationToken = default) => resources.PatchStatusAsync(patch, name, @namespace, resourceVersion, false, cancellationToken); + public Task ReplaceAsync(TResource resource, CancellationToken cancellationToken = default) => this.Resources.ReplaceAsync(resource, false, cancellationToken); - public Task ReplaceAsync(TResource resource, CancellationToken cancellationToken = default) => resources.ReplaceAsync(resource, false, cancellationToken); + public Task ReplaceStatusAsync(TResource resource, CancellationToken cancellationToken = default) => this.Resources.ReplaceStatusAsync(resource, false, cancellationToken); - public Task ReplaceStatusAsync(TResource resource, CancellationToken cancellationToken = default) => resources.ReplaceStatusAsync(resource, false, cancellationToken); + public Task DeleteAsync(string name, string @namespace, CancellationToken cancellationToken = default) => this.Resources.RemoveAsync(name, @namespace, false, cancellationToken); } diff --git a/tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs b/tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs index 7e0301feb..49cf10a41 100644 --- a/tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs +++ b/tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs @@ -11,10 +11,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia.Data; +using Neuroglia; using Neuroglia.Data.Infrastructure; +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using Neuroglia.Data.Infrastructure.ResourceOriented; using Neuroglia.Data.Infrastructure.ResourceOriented.Services; using Neuroglia.Data.Infrastructure.Services; using Synapse.Api.Client.Services; +using System.Net; namespace Synapse.UnitTests.Services; @@ -22,8 +27,55 @@ internal class MockWorkflowInstanceApiClient(IResourceRepository resources, ITex : MockNamespacedResourceApiClient(resources), IWorkflowInstanceApiClient { + public async Task SuspendAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var workflowInstanceReference = new ResourceReference(name, @namespace); + var original = await this.Resources.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (original.Status?.Phase != WorkflowInstanceStatusPhase.Running) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Waiting; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), name, @namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + } + + public async Task ResumeAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var workflowInstanceReference = new ResourceReference(name, @namespace); + var original = await this.Resources.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Running; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), name, @namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + } + + public async Task CancelAsync(string name, string @namespace, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + var workflowInstanceReference = new ResourceReference(name, @namespace); + var original = await this.Resources.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false) + ?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString()))); + if (original.Status?.Phase != WorkflowInstanceStatusPhase.Pending && original.Status?.Phase != WorkflowInstanceStatusPhase.Running && original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) + throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'")); + var updated = original.Clone()!; + updated.Status ??= new(); + updated.Status.Phase = WorkflowInstanceStatusPhase.Cancelled; + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, jsonPatch), name, @namespace, cancellationToken: cancellationToken).ConfigureAwait(false); + } + public Task ReadLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default) => logs.ReadToEndAsync($"{name}.{@namespace}", cancellationToken); + public async Task> WatchLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default) => (await logs.WatchAsync($"{name}.{@namespace}", cancellationToken)).ToAsyncEnumerable(); } \ No newline at end of file