Skip to content

Commit

Permalink
fix(Runner): Fixed an issue with the correlation handler, which was n…
Browse files Browse the repository at this point in the history
…ot filtering events based on their data

fix(Runner): Fixed an issue with the correlation handler, which failed to correlate two events when no keys were defined in the active correlation context
fix(Runner): Fixed critical issues with task executors,which were disposing of child executors before they could complete their execution, resulting in withholding the execution of completed tasks and flows
  • Loading branch information
cdavernas committed Aug 12, 2024
1 parent dc1b1be commit 9eec5a4
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 17 deletions.
28 changes: 24 additions & 4 deletions src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
if (contextCorrelationResult == default)
{
this.Logger.LogInformation("Failed to find a matching correlation context");
if (this.Correlation.Resource.Status.Contexts.Count != 0) throw new Exception("Failed to correlate event"); //should not happen
if (this.Correlation.Resource.Status.Contexts.Count != 0) throw new Exception("Failed to correlate event: a context with unmatched keys has already been associated with the ephemeral correlation");
this.Logger.LogInformation("Creating a new correlation context...");
context = new CorrelationContext()
{
Expand Down Expand Up @@ -202,9 +202,28 @@ protected virtual async Task<bool> TryFilterEventAsync(EventFilterDefinition fil
foreach(var attribute in filter.With)
{
if (!e.TryGetAttribute(attribute.Key, out var value) || value == null) return false;
var valueStr = attribute.Value.ToString();
if (valueStr?.IsRuntimeExpression() == true && !await this.ExpressionEvaluator.EvaluateConditionAsync(valueStr, e, cancellationToken: cancellationToken)) return false;
else if (!string.IsNullOrWhiteSpace(valueStr) && !Regex.IsMatch(value.ToString() ?? string.Empty, valueStr, RegexOptions.IgnoreCase)) return false;
if (attribute.Key == CloudEventAttributes.Data && value != null && value is not string)
{
var valueDictionary = attribute.Value.ConvertTo<IDictionary<string, object>>()!;
foreach(var property in valueDictionary)
{
var valueStr = property.Value.ToString();
if (valueStr?.IsRuntimeExpression() == true)
{
if (!await this.ExpressionEvaluator.EvaluateConditionAsync(valueStr, e, cancellationToken: cancellationToken)) return false;
}
else if (!string.IsNullOrWhiteSpace(valueStr) && !Regex.IsMatch(value.ToString() ?? string.Empty, valueStr, RegexOptions.IgnoreCase)) return false;
}
}
else
{
var valueStr = attribute.Value.ToString();
if (valueStr?.IsRuntimeExpression() == true)
{
if (!await this.ExpressionEvaluator.EvaluateConditionAsync(valueStr, e, cancellationToken: cancellationToken)) return false;
}
else if (!string.IsNullOrWhiteSpace(valueStr) && !Regex.IsMatch(value.ToString() ?? string.Empty, valueStr, RegexOptions.IgnoreCase)) return false;
}
}
}
if (filter.Correlate?.Count > 0)
Expand Down Expand Up @@ -255,6 +274,7 @@ protected virtual async Task<bool> TryFilterEventAsync(EventFilterDefinition fil
}
break;
}
else return (context, true, correlationKeys, filter.Key);
}
return (context, false, null, null);
}
Expand Down
3 changes: 0 additions & 3 deletions src/operator/Synapse.Operator/Services/WorkflowController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// limitations under the License.

using Json.Patch;
using Neuroglia.Data.Infrastructure.ResourceOriented;
using Neuroglia.Reactive;
using System.Threading;

namespace Synapse.Operator.Services;

Expand Down
3 changes: 2 additions & 1 deletion src/runner/Synapse.Runner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//if (args.Length != 0 && args.Contains("--debug") && !Debugger.IsAttached) Debugger.Launch(); //todo: uncomment
if (args.Length != 0 && args.Contains("--debug") && !Debugger.IsAttached) Debugger.Launch(); //todo: uncomment

var builder = Host.CreateDefaultBuilder()
.ConfigureAppConfiguration((context, config) =>
Expand Down Expand Up @@ -65,6 +65,7 @@
services.AddSingleton<ITaskExecutorFactory, TaskExecutorFactory>();
services.AddSingleton<ISchemaHandlerProvider, SchemaHandlerProvider>();
services.AddSingleton<ISchemaHandler, JsonSchemaHandler>();
services.AddSingleton<IExternalResourceProvider, ExternalResourceProvider>();
services.AddHostedService<RunnerApplication>();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ protected virtual async Task OnSubTaskFaultAsync(ITaskExecutor executor, Cancell
ArgumentNullException.ThrowIfNull(executor);
var error = executor.Task.Instance.Error ?? throw new NullReferenceException();
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -104,7 +103,6 @@ protected virtual async Task OnSubtaskCompletedAsync(ITaskExecutor executor, Can
var output = executor.Task.Output!;
var nextDefinition = this.Task.Definition.Do.GetTaskAfter(last);
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
if (nextDefinition == null || nextDefinition.Value == null)
{
await this.SetResultAsync(output, last.Next == FlowDirective.End || last.Next == FlowDirective.Exit ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ protected virtual async Task OnTryFaultedAsync(ITaskExecutor executor, Exception
return;
}
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
if (this.Task.Definition.Catch.Retry != null)
{
var limit = this.Task.Definition.Catch.Retry.Limit;
Expand Down Expand Up @@ -162,7 +161,6 @@ protected virtual async Task OnTryCompletedAsync(ITaskExecutor executor, Cancell
var last = executor.Task.Instance;
var output = executor.Task.Output!;
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
await this.SetResultAsync(output, last.Next == FlowDirective.End ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -178,7 +176,6 @@ protected virtual async Task OnHandlerFaultAsync(ITaskExecutor executor, Excepti
ArgumentNullException.ThrowIfNull(executor);
var error = executor.Task.Instance.Error ?? throw new NullReferenceException();
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -194,7 +191,6 @@ protected virtual async Task OnHandlerCompletedAsync(ITaskExecutor executor, Can
var last = executor.Task.Instance;
var output = executor.Task.Output!;
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
await this.SetResultAsync(output, last.Next == FlowDirective.End ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
}

Expand Down
4 changes: 2 additions & 2 deletions src/runner/Synapse.Runner/Services/TaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public virtual async Task ExecuteAsync(CancellationToken cancellationToken = def
if (this.Task.Definition.Timeout?.After != null)
{
var duration = this.Task.Definition.Timeout.After.ToTimeSpan();
this.CancellationTokenSource.CancelAfter(duration);
this.Timer = new(this.OnTimeoutAsync, null, duration, Timeout.InfiniteTimeSpan);
}
try
Expand Down Expand Up @@ -275,6 +274,7 @@ public virtual async Task SetErrorAsync(Error error, CancellationToken cancellat
this.Subject.OnNext(new TaskLifeCycleEvent(TaskLifeCycleEventType.Faulted));
this.Subject.OnError(new ErrorRaisedException(error));
if (!this.TaskCompletionSource.Task.IsCompleted) this.TaskCompletionSource.SetResult();
if (this.CancellationTokenSource != null) await this.CancellationTokenSource.CancelAsync().ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -416,7 +416,7 @@ await this.SetErrorAsync(new Error()
Status = (int)HttpStatusCode.RequestTimeout,
Type = ErrorType.Timeout,
Title = ErrorTitle.Timeout,
Detail = $"The task with name '{this.Task.Instance.Name}' at '{this.Task.Instance.Reference}' has timed out after {this.Task.Definition.Timeout!.After.Milliseconds} milliseconds"
Detail = $"The task with name '{this.Task.Instance.Name}' at '{this.Task.Instance.Reference}' has timed out after {this.Task.Definition.Timeout!.After.TotalMilliseconds} milliseconds"
}, this.CancellationTokenSource?.Token ?? default).ConfigureAwait(false);
}

Expand Down
1 change: 0 additions & 1 deletion src/runner/Synapse.Runner/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ protected virtual async Task OnTaskFaultedAsync(ITaskExecutor executor, Cancella
{
await this.SetErrorAsync(executor.Task.Instance.Error ?? throw new Exception("Faulted tasks must document an error"), cancellationToken).ConfigureAwait(false);
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
}

/// <summary>
Expand Down

0 comments on commit 9eec5a4

Please sign in to comment.