From aaf64058e86ef28cdc07d052a0fc8bac0098d9a2 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Mon, 1 Jan 2024 21:19:45 -0800 Subject: [PATCH] Testing enhancements (#98) Fixes and enhancements based on the testing --- .github/workflows/pull_request.yml | 4 +- .gitignore | 1 + .../Client/Authentication/TokenHandler.cs | 4 +- .../DependencyInjectionExtensions.cs | 9 +- .../Client/Worker/WorkflowTaskExecutor.cs | 98 ++++++++++++------- Tests/Worker/WorkerTests.cs | 7 +- Tests/Worker/Workers.cs | 1 - csharp-examples/Dockerfile | 11 ++- csharp-examples/DockerfileMacArm | 22 +++++ csharp-examples/Program.cs | 11 +-- csharp-examples/Runner.cs | 67 +++++++------ csharp-examples/SimpleTask1.cs | 32 ------ csharp-examples/SimpleTask2.cs | 32 ------ csharp-examples/TestWorker.cs | 58 +++++++++++ 14 files changed, 196 insertions(+), 161 deletions(-) create mode 100644 csharp-examples/DockerfileMacArm delete mode 100644 csharp-examples/SimpleTask1.cs delete mode 100644 csharp-examples/SimpleTask2.cs create mode 100644 csharp-examples/TestWorker.cs diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 8ac8f365..9ce8a6ee 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -1,6 +1,6 @@ -name: Continuous Integration +name: CI Build -on: pull_request +on: [push, pull_request,workflow_dispatch] jobs: lint: diff --git a/.gitignore b/.gitignore index 16d0138b..2289566a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ build **/*.jfm **/*.user **/key-*.xml +csharp-examples/csharp-examples.sln diff --git a/Conductor/Client/Authentication/TokenHandler.cs b/Conductor/Client/Authentication/TokenHandler.cs index bdbc6869..01ff275d 100644 --- a/Conductor/Client/Authentication/TokenHandler.cs +++ b/Conductor/Client/Authentication/TokenHandler.cs @@ -48,9 +48,9 @@ private string GetTokenFromServer(OrkesAuthenticationSettings authenticationSett { return tokenClient.GenerateToken(tokenRequest)._token; } - catch (ApiException e) + catch (Exception e) { - Console.WriteLine($"Failed to refresh authentication token, reason: {e}, request: {tokenRequest.ToJson()}"); + Console.WriteLine($"Failed to refresh authentication token, attempt = {attempt}, error = {e.Message}"); } } throw new Exception("Failed to refresh authentication token"); diff --git a/Conductor/Client/Extensions/DependencyInjectionExtensions.cs b/Conductor/Client/Extensions/DependencyInjectionExtensions.cs index 5a18a243..0d89366f 100644 --- a/Conductor/Client/Extensions/DependencyInjectionExtensions.cs +++ b/Conductor/Client/Extensions/DependencyInjectionExtensions.cs @@ -9,17 +9,10 @@ namespace Conductor.Client.Extensions { public static class DependencyInjectionExtensions { - public static IServiceCollection AddConductorWorkflowTask(this IServiceCollection services) where T : IWorkflowTask - { - services.AddTransient(typeof(IWorkflowTask), typeof(T)); - services.AddTransient(typeof(T)); - return services; - } public static IServiceCollection AddConductorWorkflowTask(this IServiceCollection services, T worker) where T : IWorkflowTask { - services.AddTransient(typeof(IWorkflowTask), typeof(T)); - services.AddTransient(typeof(T)); + services.AddSingleton(worker); return services; } diff --git a/Conductor/Client/Worker/WorkflowTaskExecutor.cs b/Conductor/Client/Worker/WorkflowTaskExecutor.cs index 6599b50b..c9b54473 100644 --- a/Conductor/Client/Worker/WorkflowTaskExecutor.cs +++ b/Conductor/Client/Worker/WorkflowTaskExecutor.cs @@ -47,7 +47,6 @@ public WorkflowTaskExecutor( public System.Threading.Tasks.Task Start(CancellationToken token) { - if (token != CancellationToken.None) token.ThrowIfCancellationRequested(); @@ -68,7 +67,6 @@ public System.Threading.Tasks.Task Start(CancellationToken token) private void Work4Ever(CancellationToken token) { - while (true) { try @@ -78,9 +76,21 @@ private void Work4Ever(CancellationToken token) WorkOnce(token); } + catch (System.OperationCanceledException canceledException) + { + //Do nothing the operation was cancelled + _logger.LogTrace( + $"[{_workerSettings.WorkerId}] Operation Cancelled: {canceledException.Message}" + + $", taskName: {_worker.TaskType}" + + $", domain: {_worker.WorkerSettings.Domain}" + + $", batchSize: {_workerSettings.BatchSize}" + ); + Sleep(SLEEP_FOR_TIME_SPAN_ON_WORKER_ERROR); + } catch (Exception e) { - _logger.LogDebug( + + _logger.LogError( $"[{_workerSettings.WorkerId}] worker error: {e.Message}" + $", taskName: {_worker.TaskType}" + $", domain: {_worker.WorkerSettings.Domain}" @@ -102,18 +112,19 @@ private async void WorkOnce(CancellationToken token) Sleep(_workerSettings.PollInterval); return; } + var uniqueBatchId = Guid.NewGuid(); _logger.LogTrace( - $"[{_workerSettings.WorkerId}] Processing tasks batch" - + $", Task batch unique Id: {uniqueBatchId}" - ); + $"[{_workerSettings.WorkerId}] Processing tasks batch" + + $", Task batch unique Id: {uniqueBatchId}" + ); await System.Threading.Tasks.Task.Run(() => ProcessTasks(tasks, token)); _logger.LogTrace( - $"[{_workerSettings.WorkerId}] Completed tasks batch" - + $", Task batch unique Id: {uniqueBatchId}" - ); + $"[{_workerSettings.WorkerId}] Completed tasks batch" + + $", Task batch unique Id: {uniqueBatchId}" + ); } private List PollTasks() @@ -127,30 +138,47 @@ private async void WorkOnce(CancellationToken token) var availableWorkerCounter = _workerSettings.BatchSize - _workflowTaskMonitor.GetRunningWorkers(); if (availableWorkerCounter < 1) { - throw new Exception("no worker available"); + _logger.LogDebug("All workers are busy"); + return new List(); } - var tasks = _taskClient.PollTask(_worker.TaskType, _workerSettings.WorkerId, _workerSettings.Domain, availableWorkerCounter); - if (tasks == null) + + try { - tasks = new List(); + var tasks = _taskClient.PollTask(_worker.TaskType, _workerSettings.WorkerId, _workerSettings.Domain, + availableWorkerCounter); + if (tasks == null) + { + tasks = new List(); + } + + _logger.LogTrace( + $"[{_workerSettings.WorkerId}] Polled {tasks.Count} tasks" + + $", taskType: {_worker.TaskType}" + + $", domain: {_workerSettings.Domain}" + + $", batchSize: {_workerSettings.BatchSize}" + ); + return tasks; + } + catch (Exception e) + { + _logger.LogTrace( + $"[{_workerSettings.WorkerId}] Polling error: {e.Message} " + + $", taskType: {_worker.TaskType}" + + $", domain: {_workerSettings.Domain}" + + $", batchSize: {_workerSettings.BatchSize}" + ); + return new List(); } - _logger.LogTrace( - $"[{_workerSettings.WorkerId}] Polled {tasks.Count} tasks" - + $", taskType: {_worker.TaskType}" - + $", domain: {_workerSettings.Domain}" - + $", batchSize: {_workerSettings.BatchSize}" - ); - return tasks; } private async void ProcessTasks(List tasks, CancellationToken token) { - List threads = new List(); if (tasks == null || tasks.Count == 0) { return; } + foreach (var task in tasks) { if (token != CancellationToken.None) @@ -159,6 +187,7 @@ private async void ProcessTasks(List tasks, CancellationToken token _workflowTaskMonitor.IncrementRunningWorker(); threads.Add(System.Threading.Tasks.Task.Run(() => ProcessTask(task, token))); } + await System.Threading.Tasks.Task.WhenAll(threads); } @@ -178,25 +207,26 @@ private async void ProcessTask(Models.Task task, CancellationToken token) try { - TaskResult taskResult = new TaskResult(taskId: task.TaskId, workflowInstanceId: task.WorkflowInstanceId); + TaskResult taskResult = + new TaskResult(taskId: task.TaskId, workflowInstanceId: task.WorkflowInstanceId); if (token == CancellationToken.None) taskResult = _worker.Execute(task); else taskResult = await _worker.Execute(task, token); _logger.LogTrace( - $"[{_workerSettings.WorkerId}] Done processing task for worker" - + $", taskType: {_worker.TaskType}" - + $", domain: {_workerSettings.Domain}" - + $", taskId: {task.TaskId}" - + $", workflowId: {task.WorkflowInstanceId}" - + $", CancelToken: {token}" - ); + $"[{_workerSettings.WorkerId}] Done processing task for worker" + + $", taskType: {_worker.TaskType}" + + $", domain: {_workerSettings.Domain}" + + $", taskId: {task.TaskId}" + + $", workflowId: {task.WorkflowInstanceId}" + + $", CancelToken: {token}" + ); UpdateTask(taskResult); } catch (Exception e) { - _logger.LogDebug( + _logger.LogError( $"[{_workerSettings.WorkerId}] Failed to process task for worker, reason: {e.Message}" + $", taskType: {_worker.TaskType}" + $", domain: {_workerSettings.Domain}" @@ -206,7 +236,6 @@ private async void ProcessTask(Models.Task task, CancellationToken token) ); var taskResult = task.Failed(e.Message); UpdateTask(taskResult); - } finally { @@ -228,6 +257,7 @@ private void UpdateTask(Models.TaskResult taskResult) { Sleep(TimeSpan.FromSeconds(1 << attemptCounter)); } + _taskClient.UpdateTask(taskResult); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Done updating task" @@ -240,7 +270,7 @@ private void UpdateTask(Models.TaskResult taskResult) } catch (Exception e) { - _logger.LogTrace( + _logger.LogError( $"[{_workerSettings.WorkerId}] Failed to update task, reason: {e.Message}" + $", taskType: {_worker.TaskType}" + $", domain: {_workerSettings.Domain}" @@ -249,6 +279,7 @@ private void UpdateTask(Models.TaskResult taskResult) ); } } + throw new Exception("Failed to update task after retries"); } @@ -260,7 +291,6 @@ private void Sleep(TimeSpan timeSpan) private void LogInfo() { - } } -} +} \ No newline at end of file diff --git a/Tests/Worker/WorkerTests.cs b/Tests/Worker/WorkerTests.cs index 6695b9b9..e67546b5 100644 --- a/Tests/Worker/WorkerTests.cs +++ b/Tests/Worker/WorkerTests.cs @@ -30,7 +30,7 @@ public async System.Threading.Tasks.Task TestWorkflowAsyncExecution() { var workflow = GetConductorWorkflow(); ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(workflow, true); - var workflowIdList = await StartWorkflows(workflow, quantity: 35); + var workflowIdList = await StartWorkflows(workflow, quantity: 15); await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20)); await ValidateWorkflowCompletion(workflowIdList.ToArray()); } @@ -50,7 +50,7 @@ private async System.Threading.Tasks.Task> StartWorkflows( var startedWorkflows = await WorkflowExtensions.StartWorkflows( _workflowClient, startWorkflowRequest, - maxAllowedInParallel: 10, + maxAllowedInParallel: 3, total: quantity ); return startedWorkflows; @@ -58,8 +58,7 @@ private async System.Threading.Tasks.Task> StartWorkflows( private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout) { - var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug); - host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug, new ClassWorker()); + var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Information, new ClassWorker()); await host.StartAsync(); Thread.Sleep(workflowCompletionTimeout); await host.StopAsync(); diff --git a/Tests/Worker/Workers.cs b/Tests/Worker/Workers.cs index 9bb7808d..0eb68054 100644 --- a/Tests/Worker/Workers.cs +++ b/Tests/Worker/Workers.cs @@ -30,7 +30,6 @@ public static TaskResult SimpleWorker(Conductor.Client.Models.Task task) public TaskResult LazyWorker(Conductor.Client.Models.Task task) { var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048)); - Console.WriteLine($"Lazy worker is going to rest for {timeSpan.Milliseconds} ms"); System.Threading.Tasks.Task.Delay(timeSpan).GetAwaiter().GetResult(); return task.Completed(); } diff --git a/csharp-examples/Dockerfile b/csharp-examples/Dockerfile index 1c67d39c..d7c7e733 100644 --- a/csharp-examples/Dockerfile +++ b/csharp-examples/Dockerfile @@ -1,14 +1,15 @@ -FROM mcr.microsoft.com/dotnet/runtime:8.0-nanoserver-1809 AS base +FROM mcr.microsoft.com/dotnet/runtime:8.0.0-alpine3.18-amd64 AS base +RUN mkdir /app WORKDIR /app -FROM mcr.microsoft.com/dotnet/sdk:8.0-nanoserver-1809 AS build +FROM mcr.microsoft.com/dotnet/sdk:8.0.100-1-alpine3.18-amd64 AS build ARG BUILD_CONFIGURATION=Release -WORKDIR /src + COPY ["csharp-examples/csharp-examples.csproj", "csharp-examples/"] COPY ["Conductor/conductor-csharp.csproj", "Conductor/"] RUN dotnet restore "./csharp-examples/./csharp-examples.csproj" COPY . . -WORKDIR "/src/csharp-examples" +WORKDIR "csharp-examples" RUN dotnet build "./csharp-examples.csproj" -c %BUILD_CONFIGURATION% -o /app/build FROM build AS publish @@ -18,4 +19,4 @@ RUN dotnet publish "./csharp-examples.csproj" -c %BUILD_CONFIGURATION% -o /app/p FROM base AS final WORKDIR /app COPY --from=publish /app/publish . -ENTRYPOINT ["dotnet", "csharp-examples.dll"] \ No newline at end of file +ENTRYPOINT ["dotnet", "csharp-examples.dll"] diff --git a/csharp-examples/DockerfileMacArm b/csharp-examples/DockerfileMacArm new file mode 100644 index 00000000..1ad58a3e --- /dev/null +++ b/csharp-examples/DockerfileMacArm @@ -0,0 +1,22 @@ +FROM mcr.microsoft.com/dotnet/runtime:8.0.0-alpine3.18-arm64v8 AS base +RUN mkdir /app +WORKDIR /app + +FROM mcr.microsoft.com/dotnet/sdk:8.0.100-alpine3.18-arm64v8 AS build +ARG BUILD_CONFIGURATION=Release + +COPY ["csharp-examples/csharp-examples.csproj", "csharp-examples/"] +COPY ["Conductor/conductor-csharp.csproj", "Conductor/"] +RUN dotnet restore "./csharp-examples/./csharp-examples.csproj" +COPY . . +WORKDIR "csharp-examples" +RUN dotnet build "./csharp-examples.csproj" -c %BUILD_CONFIGURATION% -o /app/build + +FROM build AS publish +ARG BUILD_CONFIGURATION=Release +RUN dotnet publish "./csharp-examples.csproj" -c %BUILD_CONFIGURATION% -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "csharp-examples.dll"] diff --git a/csharp-examples/Program.cs b/csharp-examples/Program.cs index d5ea4630..db2a02dc 100644 --- a/csharp-examples/Program.cs +++ b/csharp-examples/Program.cs @@ -1,13 +1,10 @@ using csharp_examples; -class Program +internal class Program { - static void Main() + private static void Main() { - Runner runner = new Runner(); - - // call specific method based on testing scenario - runner.RunMultiSimpleTask(); - //runner.RunSimpleTask(); + var runner = new Runner(); + runner.StartTasks(); } } \ No newline at end of file diff --git a/csharp-examples/Runner.cs b/csharp-examples/Runner.cs index 1f604023..43ed4fbd 100644 --- a/csharp-examples/Runner.cs +++ b/csharp-examples/Runner.cs @@ -1,46 +1,45 @@ -using Conductor.Client.Authentication; -using Conductor.Client.Extensions; +using System.Collections; using Conductor.Client; +using Conductor.Client.Authentication; +using Conductor.Client.Extensions; using csharp.examples; using Microsoft.Extensions.Logging; -namespace csharp_examples +namespace csharp_examples; + +public class Runner { - public class Runner + /// + /// Running multiple task as background services + /// + public async void StartTasks() { + var key = Environment.GetEnvironmentVariable("KEY"); + var secret = Environment.GetEnvironmentVariable("SECRET"); + var url = Environment.GetEnvironmentVariable("CONDUCTOR_SERVER_URL"); - /// - /// Running multiple task as background services - /// - public async void RunMultiSimpleTask() + var configuration = new Configuration { - var configuration = new Configuration() - { - AuthenticationSettings = new OrkesAuthenticationSettings("8bea4294-e16d-4437-85f0-327c29a1378c", "skZjhMzNNNm9EJDLLxD6JKYs6ADHqQ0xh1iUCqm6mOoCktHn") - }; - var host = WorkflowTaskHost.CreateWorkerHost(configuration, LogLevel.Information, new SimpleTask1()); - var ct = new CancellationTokenSource(); - await host.StartAsync(ct.Token); - var host1 = WorkflowTaskHost.CreateWorkerHost(configuration, LogLevel.Information, new SimpleTask2()); - var ct1 = new CancellationTokenSource(); - await host1.StartAsync(ct.Token); - Thread.Sleep(TimeSpan.FromSeconds(100)); // after 100 seconds will stop the service - } - - /// - /// Run single task as background service - /// - public async void RunSimpleTask() + BasePath = url, + AuthenticationSettings = new OrkesAuthenticationSettings(key, secret) + }; + var num = 5; + for (var i = 1; i <= num; i++) { - var configuration = new Configuration() - { - AuthenticationSettings = new OrkesAuthenticationSettings("8bea4294-e16d-4437-85f0-327c29a1378c", "skZjhMzNNNm9EJDLLxD6JKYs6ADHqQ0xh1iUCqm6mOoCktHn") - }; - - var host = WorkflowTaskHost.CreateWorkerHost(configuration, LogLevel.Information, new SimpleTask1()); + var host = WorkflowTaskHost.CreateWorkerHost(configuration, LogLevel.Information, + new TestWorker("csharp_task_" + i)); var ct = new CancellationTokenSource(); - await host.StartAsync(); - Thread.Sleep(TimeSpan.FromSeconds(100)); // after 100 seconds will stop the service + try + { + await host.StartAsync(ct.Token); + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } } + + while (true) Thread.Sleep(TimeSpan.FromDays(1)); // after 1 year will stop the service } -} +} \ No newline at end of file diff --git a/csharp-examples/SimpleTask1.cs b/csharp-examples/SimpleTask1.cs deleted file mode 100644 index 4efee01a..00000000 --- a/csharp-examples/SimpleTask1.cs +++ /dev/null @@ -1,32 +0,0 @@ -using Conductor.Client.Interfaces; -using Conductor.Client.Models; -using Conductor.Client.Worker; -using Conductor.Client.Extensions; - -namespace csharp.examples -{ - public class SimpleTask1 : IWorkflowTask - { - public string TaskType { get; } - public WorkflowTaskExecutorConfiguration WorkerSettings { get; } - - public SimpleTask1(string taskType = "SimpleTask_1") - { - TaskType = taskType; - WorkerSettings = new WorkflowTaskExecutorConfiguration(); - } - - public async Task Execute(Conductor.Client.Models.Task task, CancellationToken token) - { - if (token != CancellationToken.None && token.IsCancellationRequested) - return task.Failed("Token request Cancel"); - - return await System.Threading.Tasks.Task.Run(() => task.Completed()); - } - - public TaskResult Execute(Conductor.Client.Models.Task task) - { - throw new NotImplementedException(); - } - } -} diff --git a/csharp-examples/SimpleTask2.cs b/csharp-examples/SimpleTask2.cs deleted file mode 100644 index 568caab8..00000000 --- a/csharp-examples/SimpleTask2.cs +++ /dev/null @@ -1,32 +0,0 @@ -using Conductor.Client.Interfaces; -using Conductor.Client.Models; -using Conductor.Client.Worker; -using Conductor.Client.Extensions; - -namespace csharp.examples -{ - public class SimpleTask2 : IWorkflowTask - { - public string TaskType { get; } - public WorkflowTaskExecutorConfiguration WorkerSettings { get; } - - public SimpleTask2(string taskType = "SimpleTask_2") - { - TaskType = taskType; - WorkerSettings = new WorkflowTaskExecutorConfiguration(); - } - - public async Task Execute(Conductor.Client.Models.Task task, CancellationToken token) - { - if (token != CancellationToken.None && token.IsCancellationRequested) - return task.Failed("Token request Cancel"); - - return await System.Threading.Tasks.Task.Run(() => task.Completed()); - } - - public TaskResult Execute(Conductor.Client.Models.Task task) - { - throw new NotImplementedException(); - } - } -} diff --git a/csharp-examples/TestWorker.cs b/csharp-examples/TestWorker.cs new file mode 100644 index 00000000..477760dc --- /dev/null +++ b/csharp-examples/TestWorker.cs @@ -0,0 +1,58 @@ +using Conductor.Client.Extensions; +using Conductor.Client.Interfaces; +using Conductor.Client.Models; +using Conductor.Client.Worker; +using Task = Conductor.Client.Models.Task; + +namespace csharp.examples; + +public class TestWorker(string taskType) : IWorkflowTask +{ + private readonly Random rnd = new(); + + public string TaskType { get; } = taskType; + public WorkflowTaskExecutorConfiguration WorkerSettings { get; } = new WorkflowTaskExecutorConfiguration() + { + BatchSize = 20 + }; + + public async Task Execute(Task task, CancellationToken token) + { + if (token != CancellationToken.None && token.IsCancellationRequested) + return task.Failed("Token request Cancel"); + + + return await System.Threading.Tasks.Task.Run(() => + { + var num = rnd.Next(5, 20); + var result = task.Completed(); + result.OutputData = new Dictionary(); + for (var i = 0; i < num; i++) + { + result.OutputData["Key_" + i] = CreateString(12); + result.OutputData["Num_" + i] = rnd.NextDouble(); + } + + //Simulate work - once in a while + var sleepTime = rnd.Next(0, 2); + Thread.Sleep(TimeSpan.FromSeconds(sleepTime)); + + return result; + }); + } + + public TaskResult Execute(Task task) + { + throw new NotImplementedException(); + } + + internal string CreateString(int stringLength) + { + const string allowedChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz0123456789!@$?_-"; + var chars = new char[stringLength]; + + for (var i = 0; i < stringLength; i++) chars[i] = allowedChars[rnd.Next(0, allowedChars.Length)]; + + return new string(chars); + } +} \ No newline at end of file