Skip to content

Commit

Permalink
Merge pull request #443 from serverlessworkflow/feat-await-process-task
Browse files Browse the repository at this point in the history
Implemented the feature to run processes asynchronously without awaiting their completion
  • Loading branch information
cdavernas authored Oct 25, 2024
2 parents d4c4ca7 + 83132aa commit 192bdf8
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.10" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
<PackageReference Include="System.Reactive" Version="6.0.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/cli/Synapse.Cli/Synapse.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" />
<PackageReference Include="moment.net" Version="1.3.4" />
<PackageReference Include="NetEscapades.Configuration.Yaml" Version="3.1.0" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
<PackageReference Include="System.CommandLine.NamingConventionBinder" Version="2.0.0-beta4.22272.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.15.8" />
<PackageReference Include="Neuroglia.Mediation" Version="4.15.8" />
<PackageReference Include="Neuroglia.Plugins" Version="4.15.8" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Synapse.Core/Synapse.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.15.8" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.15.8" />
<PackageReference Include="Semver" Version="2.3.0" />
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha5" />
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha5.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
try
{
await this.Container!.StartAsync(cancellationToken).ConfigureAwait(false);
if (this.Task.Definition.Run.Await != false)
{
await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
}
await this.Container.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
var standardOutput = (this.Container.StandardOutput == null ? null : await this.Container.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false))?.Trim();
if (this.Options.Containers.Platform == ContainerPlatform.Docker) standardOutput = standardOutput?[8..];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
var environment = this.ProcessDefinition.Environment == null
? null
: await this.ProcessDefinition.Environment.ToAsyncEnumerable().ToDictionaryAwaitAsync(kvp => ValueTask.FromResult(kvp.Key), async kvp => (await this.EvaluateAndSerializeAsync(kvp.Value, cancellationToken).ConfigureAwait(false))!, cancellationToken).ConfigureAwait(false);
using var process = await executor.ExecuteAsync(script, arguments, environment, cancellationToken).ConfigureAwait(false);
var process = await executor.ExecuteAsync(script, arguments, environment, cancellationToken).ConfigureAwait(false);
if (this.Task.Definition.Run.Await != false)
{
await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
}
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
var rawOutput = (await process.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim();
var errorMessage = (await process.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim();
Expand All @@ -81,6 +86,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
Detail = errorMessage,
Instance = this.Task.Instance.Reference
}, cancellationToken).ConfigureAwait(false);
process.Dispose();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
RedirectStandardError = true,
CreateNoWindow = true
};
using var process = Process.Start(startInfo) ?? throw new NullReferenceException($"Failed to create the shell process defined at '{this.Task.Instance.Reference}'");
var process = Process.Start(startInfo) ?? throw new NullReferenceException($"Failed to create the shell process defined at '{this.Task.Instance.Reference}'");
if (this.Task.Definition.Run.Await != false)
{
await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
}
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
var rawOutput = (await process.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim();
var errorMessage = (await process.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim();
Expand All @@ -62,6 +67,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
Detail = errorMessage,
Instance = this.Task.Instance.Reference
}, cancellationToken).ConfigureAwait(false);
process.Dispose();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,38 +111,42 @@ await this.SetErrorAsync(new()
};
this.Subflow = await this.Api.WorkflowInstances.CreateAsync(this.Subflow, cancellationToken).ConfigureAwait(false);
}
await foreach(var watchEvent in this.Api.WorkflowInstances.MonitorAsync(this.Subflow.GetName(), this.Subflow.GetNamespace()!, cancellationToken))
if (this.Task.Definition.Run.Await == false) await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
else
{
switch (watchEvent.Resource.Status?.Phase)
await foreach (var watchEvent in this.Api.WorkflowInstances.MonitorAsync(this.Subflow.GetName(), this.Subflow.GetNamespace()!, cancellationToken))
{
case WorkflowInstanceStatusPhase.Cancelled:
if (!this.Cancelling)
{
await this.SetErrorAsync(new()
switch (watchEvent.Resource.Status?.Phase)
{
case WorkflowInstanceStatusPhase.Cancelled:
if (!this.Cancelling)
{
Type = ErrorType.Runtime,
Status = ErrorStatus.Runtime,
Title = ErrorTitle.Runtime,
Detail = $"The execution of workflow instance '{this.Subflow.GetQualifiedName()}' has been cancelled"
}, cancellationToken).ConfigureAwait(false);
}
break;
case WorkflowInstanceStatusPhase.Faulted:
await this.SetErrorAsync(watchEvent.Resource.Status.Error!, cancellationToken).ConfigureAwait(false);
return;
case WorkflowInstanceStatusPhase.Completed:
var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content;
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
await this.SetErrorAsync(new()
{
Type = ErrorType.Runtime,
Status = ErrorStatus.Runtime,
Title = ErrorTitle.Runtime,
Detail = $"The execution of workflow instance '{this.Subflow.GetQualifiedName()}' has been cancelled"
}, cancellationToken).ConfigureAwait(false);
}
break;
case WorkflowInstanceStatusPhase.Faulted:
await this.SetErrorAsync(watchEvent.Resource.Status.Error!, cancellationToken).ConfigureAwait(false);
return;
case WorkflowInstanceStatusPhase.Completed:
var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content;
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
}
if (this.Cancelling) break;
}
if (this.Cancelling) break;
}
}

/// <inheritdoc/>
public override async Task CancelAsync(CancellationToken cancellationToken = default)
{
if(this.Subflow != null)
if (this.Subflow != null && this.Task.Definition.Run.Await != false)
{
try
{
Expand Down
2 changes: 1 addition & 1 deletion src/runner/Synapse.Runner/Synapse.Runner.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<ItemGroup>
<PackageReference Include="Docker.DotNet" Version="3.125.15" />
<PackageReference Include="DynamicGrpc" Version="1.4.0" />
<PackageReference Include="Google.Protobuf" Version="3.28.2" />
<PackageReference Include="Google.Protobuf" Version="3.28.3" />
<PackageReference Include="Grpc.Core" Version="2.46.6" />
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="8.0.10" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PackageReference Include="FluentAssertions" Version="6.12.1" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.10" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5" />
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5.1" />
<PackageReference Include="Testcontainers" Version="3.10.0" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
Expand Down
4 changes: 2 additions & 2 deletions tests/Synapse.UnitTests/Synapse.UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.15.8" />
<PackageReference Include="Neuroglia.Data.Infrastructure.Memory" Version="4.15.8" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.15.8" />
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5" />
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5.1" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
<PackageReference Include="Testcontainers" Version="3.10.0" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="Xunit.Gherkin.Quick" Version="4.5.0" />
Expand Down

0 comments on commit 192bdf8

Please sign in to comment.