From 9eec5a41f8a5648c5ea64cbf5da2ad305fde761e Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Mon, 12 Aug 2024 15:35:12 +0200 Subject: [PATCH] fix(Runner): Fixed an issue with the correlation handler, which was not filtering events based on their data fix(Runner): Fixed an issue with the correlation handler, which failed to correlate two events when no keys were defined in the active correlation context fix(Runner): Fixed critical issues with task executors,which were disposing of child executors before they could complete their execution, resulting in withholding the execution of completed tasks and flows --- .../Services/CorrelationHandler.cs | 28 ++++++++++++++++--- .../Services/WorkflowController.cs | 3 -- src/runner/Synapse.Runner/Program.cs | 3 +- .../Services/Executors/DoTaskExecutor.cs | 2 -- .../Services/Executors/TryTaskExecutor.cs | 4 --- .../Synapse.Runner/Services/TaskExecutor.cs | 4 +-- .../Services/WorkflowExecutor.cs | 1 - 7 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs index b0f329b59..6e8136558 100644 --- a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs +++ b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs @@ -117,7 +117,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken if (contextCorrelationResult == default) { this.Logger.LogInformation("Failed to find a matching correlation context"); - if (this.Correlation.Resource.Status.Contexts.Count != 0) throw new Exception("Failed to correlate event"); //should not happen + if (this.Correlation.Resource.Status.Contexts.Count != 0) throw new Exception("Failed to correlate event: a context with unmatched keys has already been associated with the ephemeral correlation"); this.Logger.LogInformation("Creating a new correlation context..."); context = new CorrelationContext() { @@ -202,9 +202,28 @@ protected virtual async Task TryFilterEventAsync(EventFilterDefinition fil foreach(var attribute in filter.With) { if (!e.TryGetAttribute(attribute.Key, out var value) || value == null) return false; - var valueStr = attribute.Value.ToString(); - if (valueStr?.IsRuntimeExpression() == true && !await this.ExpressionEvaluator.EvaluateConditionAsync(valueStr, e, cancellationToken: cancellationToken)) return false; - else if (!string.IsNullOrWhiteSpace(valueStr) && !Regex.IsMatch(value.ToString() ?? string.Empty, valueStr, RegexOptions.IgnoreCase)) return false; + if (attribute.Key == CloudEventAttributes.Data && value != null && value is not string) + { + var valueDictionary = attribute.Value.ConvertTo>()!; + foreach(var property in valueDictionary) + { + var valueStr = property.Value.ToString(); + if (valueStr?.IsRuntimeExpression() == true) + { + if (!await this.ExpressionEvaluator.EvaluateConditionAsync(valueStr, e, cancellationToken: cancellationToken)) return false; + } + else if (!string.IsNullOrWhiteSpace(valueStr) && !Regex.IsMatch(value.ToString() ?? string.Empty, valueStr, RegexOptions.IgnoreCase)) return false; + } + } + else + { + var valueStr = attribute.Value.ToString(); + if (valueStr?.IsRuntimeExpression() == true) + { + if (!await this.ExpressionEvaluator.EvaluateConditionAsync(valueStr, e, cancellationToken: cancellationToken)) return false; + } + else if (!string.IsNullOrWhiteSpace(valueStr) && !Regex.IsMatch(value.ToString() ?? string.Empty, valueStr, RegexOptions.IgnoreCase)) return false; + } } } if (filter.Correlate?.Count > 0) @@ -255,6 +274,7 @@ protected virtual async Task TryFilterEventAsync(EventFilterDefinition fil } break; } + else return (context, true, correlationKeys, filter.Key); } return (context, false, null, null); } diff --git a/src/operator/Synapse.Operator/Services/WorkflowController.cs b/src/operator/Synapse.Operator/Services/WorkflowController.cs index 7cd888013..01ed47305 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowController.cs @@ -12,9 +12,6 @@ // limitations under the License. using Json.Patch; -using Neuroglia.Data.Infrastructure.ResourceOriented; -using Neuroglia.Reactive; -using System.Threading; namespace Synapse.Operator.Services; diff --git a/src/runner/Synapse.Runner/Program.cs b/src/runner/Synapse.Runner/Program.cs index 4424e2413..87d6c080a 100644 --- a/src/runner/Synapse.Runner/Program.cs +++ b/src/runner/Synapse.Runner/Program.cs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//if (args.Length != 0 && args.Contains("--debug") && !Debugger.IsAttached) Debugger.Launch(); //todo: uncomment +if (args.Length != 0 && args.Contains("--debug") && !Debugger.IsAttached) Debugger.Launch(); //todo: uncomment var builder = Host.CreateDefaultBuilder() .ConfigureAppConfiguration((context, config) => @@ -65,6 +65,7 @@ services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddHostedService(); }); diff --git a/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs index 5401caf88..6522ae9b5 100644 --- a/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs @@ -87,7 +87,6 @@ protected virtual async Task OnSubTaskFaultAsync(ITaskExecutor executor, Cancell ArgumentNullException.ThrowIfNull(executor); var error = executor.Task.Instance.Error ?? throw new NullReferenceException(); this.Executors.Remove(executor); - await executor.DisposeAsync().ConfigureAwait(false); await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false); } @@ -104,7 +103,6 @@ protected virtual async Task OnSubtaskCompletedAsync(ITaskExecutor executor, Can var output = executor.Task.Output!; var nextDefinition = this.Task.Definition.Do.GetTaskAfter(last); this.Executors.Remove(executor); - await executor.DisposeAsync().ConfigureAwait(false); if (nextDefinition == null || nextDefinition.Value == null) { await this.SetResultAsync(output, last.Next == FlowDirective.End || last.Next == FlowDirective.Exit ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); diff --git a/src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs index 596739708..374c083e1 100644 --- a/src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs @@ -111,7 +111,6 @@ protected virtual async Task OnTryFaultedAsync(ITaskExecutor executor, Exception return; } this.Executors.Remove(executor); - await executor.DisposeAsync().ConfigureAwait(false); if (this.Task.Definition.Catch.Retry != null) { var limit = this.Task.Definition.Catch.Retry.Limit; @@ -162,7 +161,6 @@ protected virtual async Task OnTryCompletedAsync(ITaskExecutor executor, Cancell var last = executor.Task.Instance; var output = executor.Task.Output!; this.Executors.Remove(executor); - await executor.DisposeAsync().ConfigureAwait(false); await this.SetResultAsync(output, last.Next == FlowDirective.End ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } @@ -178,7 +176,6 @@ protected virtual async Task OnHandlerFaultAsync(ITaskExecutor executor, Excepti ArgumentNullException.ThrowIfNull(executor); var error = executor.Task.Instance.Error ?? throw new NullReferenceException(); this.Executors.Remove(executor); - await executor.DisposeAsync().ConfigureAwait(false); await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false); } @@ -194,7 +191,6 @@ protected virtual async Task OnHandlerCompletedAsync(ITaskExecutor executor, Can var last = executor.Task.Instance; var output = executor.Task.Output!; this.Executors.Remove(executor); - await executor.DisposeAsync().ConfigureAwait(false); await this.SetResultAsync(output, last.Next == FlowDirective.End ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } diff --git a/src/runner/Synapse.Runner/Services/TaskExecutor.cs b/src/runner/Synapse.Runner/Services/TaskExecutor.cs index 6e108b9b2..0114c942e 100644 --- a/src/runner/Synapse.Runner/Services/TaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/TaskExecutor.cs @@ -134,7 +134,6 @@ public virtual async Task ExecuteAsync(CancellationToken cancellationToken = def if (this.Task.Definition.Timeout?.After != null) { var duration = this.Task.Definition.Timeout.After.ToTimeSpan(); - this.CancellationTokenSource.CancelAfter(duration); this.Timer = new(this.OnTimeoutAsync, null, duration, Timeout.InfiniteTimeSpan); } try @@ -275,6 +274,7 @@ public virtual async Task SetErrorAsync(Error error, CancellationToken cancellat this.Subject.OnNext(new TaskLifeCycleEvent(TaskLifeCycleEventType.Faulted)); this.Subject.OnError(new ErrorRaisedException(error)); if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult(); + if (this.CancellationTokenSource != null) await this.CancellationTokenSource.CancelAsync().ConfigureAwait(false); } /// @@ -416,7 +416,7 @@ await this.SetErrorAsync(new Error() Status = (int)HttpStatusCode.RequestTimeout, Type = ErrorType.Timeout, Title = ErrorTitle.Timeout, - Detail = $"The task with name '{this.Task.Instance.Name}' at '{this.Task.Instance.Reference}' has timed out after {this.Task.Definition.Timeout!.After.Milliseconds} milliseconds" + Detail = $"The task with name '{this.Task.Instance.Name}' at '{this.Task.Instance.Reference}' has timed out after {this.Task.Definition.Timeout!.After.TotalMilliseconds} milliseconds" }, this.CancellationTokenSource?.Token ?? default).ConfigureAwait(false); } diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs index dd034b699..92fcbff76 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs @@ -253,7 +253,6 @@ protected virtual async Task OnTaskFaultedAsync(ITaskExecutor executor, Cancella { await this.SetErrorAsync(executor.Task.Instance.Error ?? throw new Exception("Faulted tasks must document an error"), cancellationToken).ConfigureAwait(false); this.Executors.Remove(executor); - await executor.DisposeAsync().ConfigureAwait(false); } ///