Skip to content

Commit

Permalink
Merge pull request #441 from serverlessworkflow/feat-suspend-resume-c…
Browse files Browse the repository at this point in the history
…ancel

Added means to suspend, resume and cancel workflow instances
  • Loading branch information
JBBianchi authored Oct 25, 2024
2 parents 71d9adb + 803f66d commit 86a437a
Show file tree
Hide file tree
Showing 23 changed files with 585 additions and 129 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented.Properties;
using Synapse.Resources;

namespace Synapse.Api.Application.Commands.WorkflowInstances;

/// <summary>
/// Represents the <see cref="Command"/> used to cancel the execution of a <see cref="WorkflowInstance"/>
/// </summary>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to cancel the execution of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to cancel the execution of belongs to</param>
public class CancelWorkflowInstanceCommand(string name, string @namespace)
: Command
{

/// <summary>
/// Gets the name of the <see cref="WorkflowInstance"/> to cancel the execution of
/// </summary>
public string Name { get; } = name;

/// <summary>
/// Gets the namespace the <see cref="WorkflowInstance"/> to cancel the execution of belongs to
/// </summary>
public string Namespace { get; } = @namespace;

}

/// <summary>
/// Represents the service used to handle <see cref="CancelWorkflowInstanceCommand"/>s
/// </summary>
/// <param name="resources">The service used to manage <see cref="IResource"/>s</param>
public class CancelWorkflowInstanceCommandHandler(IResourceRepository resources)
: ICommandHandler<CancelWorkflowInstanceCommand>
{

/// <inheritdoc/>
public virtual async Task<IOperationResult> HandleAsync(CancelWorkflowInstanceCommand command, CancellationToken cancellationToken = default)
{
var workflowInstanceReference = new ResourceReference<WorkflowInstance>(command.Name, command.Namespace);
var original = await resources.GetAsync<WorkflowInstance>(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false)
?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString())));
if (!string.IsNullOrWhiteSpace(original.Status?.Phase) && original.Status?.Phase != WorkflowInstanceStatusPhase.Pending && original.Status?.Phase != WorkflowInstanceStatusPhase.Running && original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting)
throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'"));
var updated = original.Clone()!;
updated.Status ??= new();
updated.Status.Phase = WorkflowInstanceStatusPhase.Cancelled;
var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated);
await resources.PatchStatusAsync<WorkflowInstance>(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false);
return this.Ok();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented.Properties;
using Synapse.Resources;

namespace Synapse.Api.Application.Commands.WorkflowInstances;

/// <summary>
/// Represents the <see cref="Command"/> used to resume the execution of a <see cref="WorkflowInstance"/>
/// </summary>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to resume the execution of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to resume the execution of belongs to</param>
public class ResumeWorkflowInstanceCommand(string name, string @namespace)
: Command
{

/// <summary>
/// Gets the name of the <see cref="WorkflowInstance"/> to resume the execution of
/// </summary>
public string Name { get; } = name;

/// <summary>
/// Gets the namespace the <see cref="WorkflowInstance"/> to resume the execution of belongs to
/// </summary>
public string Namespace { get; } = @namespace;

}

/// <summary>
/// Represents the service used to handle <see cref="ResumeWorkflowInstanceCommand"/>s
/// </summary>
/// <param name="resources">The service used to manage <see cref="IResource"/>s</param>
public class ResumeWorkflowInstanceCommandHandler(IResourceRepository resources)
: ICommandHandler<ResumeWorkflowInstanceCommand>
{

/// <inheritdoc/>
public virtual async Task<IOperationResult> HandleAsync(ResumeWorkflowInstanceCommand command, CancellationToken cancellationToken = default)
{
var workflowInstanceReference = new ResourceReference<WorkflowInstance>(command.Name, command.Namespace);
var original = await resources.GetAsync<WorkflowInstance>(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false)
?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString())));
if (!string.IsNullOrWhiteSpace(original.Status?.Phase) && original.Status?.Phase != WorkflowInstanceStatusPhase.Waiting) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'"));
var updated = original.Clone()!;
updated.Status ??= new();
updated.Status.Phase = WorkflowInstanceStatusPhase.Running;
var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated);
await resources.PatchStatusAsync<WorkflowInstance>(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false);
return this.Ok();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented.Properties;
using Synapse.Resources;

namespace Synapse.Api.Application.Commands.WorkflowInstances;

/// <summary>
/// Represents the <see cref="Command"/> used to suspend the execution of a <see cref="WorkflowInstance"/>
/// </summary>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to suspend the execution of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to suspend the execution of belongs to</param>
public class SuspendWorkflowInstanceCommand(string name, string @namespace)
: Command
{

/// <summary>
/// Gets the name of the <see cref="WorkflowInstance"/> to suspend the execution of
/// </summary>
public string Name { get; } = name;

/// <summary>
/// Gets the namespace the <see cref="WorkflowInstance"/> to suspend the execution of belongs to
/// </summary>
public string Namespace { get; } = @namespace;

}

/// <summary>
/// Represents the service used to handle <see cref="SuspendWorkflowInstanceCommand"/>s
/// </summary>
/// <param name="resources">The service used to manage <see cref="IResource"/>s</param>
public class SuspendWorkflowInstanceCommandHandler(IResourceRepository resources)
: ICommandHandler<SuspendWorkflowInstanceCommand>
{

/// <inheritdoc/>
public virtual async Task<IOperationResult> HandleAsync(SuspendWorkflowInstanceCommand command, CancellationToken cancellationToken = default)
{
var workflowInstanceReference = new ResourceReference<WorkflowInstance>(command.Name, command.Namespace);
var original = await resources.GetAsync<WorkflowInstance>(command.Name, command.Namespace, cancellationToken).ConfigureAwait(false)
?? throw new ProblemDetailsException(new(ProblemTypes.NotFound, ProblemTitles.NotFound, (int)HttpStatusCode.NotFound, ProblemDescriptions.ResourceNotFound.Format(workflowInstanceReference.ToString())));
if (original.Status?.Phase != WorkflowInstanceStatusPhase.Running) throw new ProblemDetailsException(new(ProblemTypes.AdmissionFailed, ProblemTitles.AdmissionFailed, (int)HttpStatusCode.BadRequest, $"The workflow instance '{workflowInstanceReference}' is in an expected phase '{original.Status?.Phase}'"));
var updated = original.Clone()!;
updated.Status ??= new();
updated.Status.Phase = WorkflowInstanceStatusPhase.Waiting;
var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated);
await resources.PatchStatusAsync<WorkflowInstance>(new(PatchType.JsonPatch, jsonPatch), command.Name, command.Namespace, cancellationToken: cancellationToken).ConfigureAwait(false);
return this.Ok();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Synapse.Api.Application.Commands.Documents;
using Synapse.Api.Application.Commands.Events;
using Synapse.Api.Application.Commands.Resources.Generic;
using Synapse.Api.Application.Commands.WorkflowInstances;
using Synapse.Api.Application.Queries.Documents;
using Synapse.Api.Application.Queries.Resources.Generic;
using Synapse.Api.Application.Queries.Users;
Expand Down Expand Up @@ -159,6 +160,9 @@ public static IServiceCollection AddApiCommands(this IServiceCollection services
services.Add(new ServiceDescriptor(handlerServiceType, handlerImplementationType, serviceLifetime));

}
services.AddScoped<IRequestHandler<SuspendWorkflowInstanceCommand, IOperationResult>, SuspendWorkflowInstanceCommandHandler>();
services.AddScoped<IRequestHandler<ResumeWorkflowInstanceCommand, IOperationResult>, ResumeWorkflowInstanceCommandHandler>();
services.AddScoped<IRequestHandler<CancelWorkflowInstanceCommand, IOperationResult>, CancelWorkflowInstanceCommandHandler>();
services.AddScoped<IRequestHandler<CreateDocumentCommand, IOperationResult<Document>>, CreateDocumentCommandHandler>();
services.AddScoped<IRequestHandler<UpdateDocumentCommand, IOperationResult<Document>>, UpdateDocumentCommandHandler>();
return services;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,21 @@ namespace Synapse.Api.Application.Queries.WorkflowInstances;
/// <summary>
/// Represents the query used to read the logs of the specified workflow instance
/// </summary>
public class ReadWorkflowInstanceLogsQuery
: Query<string>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to read the logs of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to read the logs of belongs to</param>
public class ReadWorkflowInstanceLogsQuery(string name, string @namespace)
: Query<string>
{

/// <summary>
/// Initializes a new <see cref="ReadWorkflowInstanceLogsQuery"/>
/// </summary>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to read the logs of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to read the logs of belongs to</param>
public ReadWorkflowInstanceLogsQuery(string name, string? @namespace)
{
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name));
this.Name = name;
this.Namespace = @namespace;
}

/// <summary>
/// Gets the name of the <see cref="WorkflowInstance"/> to read the logs of
/// </summary>
public string Name { get; }
public string Name { get; } = name;

/// <summary>
/// Gets the namespace the <see cref="WorkflowInstance"/> to read the logs of belongs to
/// </summary>
public string? Namespace { get; }
public string Namespace { get; } = @namespace;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,21 @@ namespace Synapse.Api.Application.Queries.WorkflowInstances;
/// <summary>
/// Represents the query used to watch the logs of a specified <see cref="WorkflowInstance"/>
/// </summary>
public class WatchWorkflowInstanceLogsQuery
: Query<IAsyncEnumerable<ITextDocumentWatchEvent>>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to watch the logs of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to watch the logs of belongs to</param>
public class WatchWorkflowInstanceLogsQuery(string name, string @namespace)
: Query<IAsyncEnumerable<ITextDocumentWatchEvent>>
{

/// <summary>
/// Initializes a new <see cref="WatchWorkflowInstanceLogsQuery"/>
/// </summary>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to watch the logs of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to watch the logs of belongs to</param>
public WatchWorkflowInstanceLogsQuery(string name, string? @namespace)
{
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name));
this.Name = name;
this.Namespace = @namespace;
}

/// <summary>
/// Gets the name of the <see cref="WorkflowInstance"/> to watch the logs of
/// </summary>
public string Name { get; }
public string Name { get; } = name;

/// <summary>
/// Gets the namespace the <see cref="WorkflowInstance"/> to watch the logs of belongs to
/// </summary>
public string? Namespace { get; }
public string? Namespace { get; } = @namespace;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,33 @@ public interface IWorkflowInstanceApiClient
: INamespacedResourceApiClient<WorkflowInstance>
{

/// <summary>
/// Suspends the execution of the specified workflow instance
/// </summary>
/// <param name="name">The name of the workflow instance to suspend the execution of</param>
/// <param name="namespace">The namespace the workflow instance to suspend the execution of belongs to</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
Task SuspendAsync(string name, string @namespace, CancellationToken cancellationToken = default);

/// <summary>
/// Resumes the execution of the specified workflow instance
/// </summary>
/// <param name="name">The name of the workflow instance to resume the execution of</param>
/// <param name="namespace">The namespace the workflow instance to resume the execution of belongs to</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
Task ResumeAsync(string name, string @namespace, CancellationToken cancellationToken = default);

/// <summary>
/// Cancels the execution of the specified workflow instance
/// </summary>
/// <param name="name">The name of the workflow instance to cancel the execution of</param>
/// <param name="namespace">The namespace the workflow instance to cancel the execution of belongs to</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
Task CancelAsync(string name, string @namespace, CancellationToken cancellationToken = default);

/// <summary>
/// Reads the logs of the specified <see cref="WorkflowInstance"/>
/// </summary>
Expand Down
Loading

0 comments on commit 86a437a

Please sign in to comment.