Skip to content

Commit

Permalink
Testing enhancements (#98)
Browse files Browse the repository at this point in the history
Fixes and enhancements based on the testing
  • Loading branch information
v1r3n authored Jan 2, 2024
1 parent ac99179 commit aaf6405
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 161 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Continuous Integration
name: CI Build

on: pull_request
on: [push, pull_request,workflow_dispatch]

jobs:
lint:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ build
**/*.jfm
**/*.user
**/key-*.xml
csharp-examples/csharp-examples.sln
4 changes: 2 additions & 2 deletions Conductor/Client/Authentication/TokenHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ private string GetTokenFromServer(OrkesAuthenticationSettings authenticationSett
{
return tokenClient.GenerateToken(tokenRequest)._token;
}
catch (ApiException e)
catch (Exception e)
{
Console.WriteLine($"Failed to refresh authentication token, reason: {e}, request: {tokenRequest.ToJson()}");
Console.WriteLine($"Failed to refresh authentication token, attempt = {attempt}, error = {e.Message}");
}
}
throw new Exception("Failed to refresh authentication token");
Expand Down
9 changes: 1 addition & 8 deletions Conductor/Client/Extensions/DependencyInjectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,10 @@ namespace Conductor.Client.Extensions
{
public static class DependencyInjectionExtensions
{
public static IServiceCollection AddConductorWorkflowTask<T>(this IServiceCollection services) where T : IWorkflowTask
{
services.AddTransient(typeof(IWorkflowTask), typeof(T));
services.AddTransient(typeof(T));
return services;
}

public static IServiceCollection AddConductorWorkflowTask<T>(this IServiceCollection services, T worker) where T : IWorkflowTask
{
services.AddTransient(typeof(IWorkflowTask), typeof(T));
services.AddTransient(typeof(T));
services.AddSingleton<IWorkflowTask>(worker);
return services;
}

Expand Down
98 changes: 64 additions & 34 deletions Conductor/Client/Worker/WorkflowTaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public WorkflowTaskExecutor(

public System.Threading.Tasks.Task Start(CancellationToken token)
{

if (token != CancellationToken.None)
token.ThrowIfCancellationRequested();

Expand All @@ -68,7 +67,6 @@ public System.Threading.Tasks.Task Start(CancellationToken token)

private void Work4Ever(CancellationToken token)
{

while (true)
{
try
Expand All @@ -78,9 +76,21 @@ private void Work4Ever(CancellationToken token)

WorkOnce(token);
}
catch (System.OperationCanceledException canceledException)
{
//Do nothing the operation was cancelled
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Operation Cancelled: {canceledException.Message}"
+ $", taskName: {_worker.TaskType}"
+ $", domain: {_worker.WorkerSettings.Domain}"
+ $", batchSize: {_workerSettings.BatchSize}"
);
Sleep(SLEEP_FOR_TIME_SPAN_ON_WORKER_ERROR);
}
catch (Exception e)
{
_logger.LogDebug(

_logger.LogError(
$"[{_workerSettings.WorkerId}] worker error: {e.Message}"
+ $", taskName: {_worker.TaskType}"
+ $", domain: {_worker.WorkerSettings.Domain}"
Expand All @@ -102,18 +112,19 @@ private async void WorkOnce(CancellationToken token)
Sleep(_workerSettings.PollInterval);
return;
}

var uniqueBatchId = Guid.NewGuid();
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Processing tasks batch"
+ $", Task batch unique Id: {uniqueBatchId}"
);
$"[{_workerSettings.WorkerId}] Processing tasks batch"
+ $", Task batch unique Id: {uniqueBatchId}"
);

await System.Threading.Tasks.Task.Run(() => ProcessTasks(tasks, token));

_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Completed tasks batch"
+ $", Task batch unique Id: {uniqueBatchId}"
);
$"[{_workerSettings.WorkerId}] Completed tasks batch"
+ $", Task batch unique Id: {uniqueBatchId}"
);
}

private List<Models.Task> PollTasks()
Expand All @@ -127,30 +138,47 @@ private async void WorkOnce(CancellationToken token)
var availableWorkerCounter = _workerSettings.BatchSize - _workflowTaskMonitor.GetRunningWorkers();
if (availableWorkerCounter < 1)
{
throw new Exception("no worker available");
_logger.LogDebug("All workers are busy");
return new List<Task>();
}
var tasks = _taskClient.PollTask(_worker.TaskType, _workerSettings.WorkerId, _workerSettings.Domain, availableWorkerCounter);
if (tasks == null)

try
{
tasks = new List<Models.Task>();
var tasks = _taskClient.PollTask(_worker.TaskType, _workerSettings.WorkerId, _workerSettings.Domain,
availableWorkerCounter);
if (tasks == null)
{
tasks = new List<Models.Task>();
}

_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Polled {tasks.Count} tasks"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", batchSize: {_workerSettings.BatchSize}"
);
return tasks;
}
catch (Exception e)
{
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Polling error: {e.Message} "
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", batchSize: {_workerSettings.BatchSize}"
);
return new List<Task>();
}
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Polled {tasks.Count} tasks"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", batchSize: {_workerSettings.BatchSize}"
);
return tasks;
}

private async void ProcessTasks(List<Models.Task> tasks, CancellationToken token)
{

List<System.Threading.Tasks.Task> threads = new List<System.Threading.Tasks.Task>();
if (tasks == null || tasks.Count == 0)
{
return;
}

foreach (var task in tasks)
{
if (token != CancellationToken.None)
Expand All @@ -159,6 +187,7 @@ private async void ProcessTasks(List<Models.Task> tasks, CancellationToken token
_workflowTaskMonitor.IncrementRunningWorker();
threads.Add(System.Threading.Tasks.Task.Run(() => ProcessTask(task, token)));
}

await System.Threading.Tasks.Task.WhenAll(threads);
}

Expand All @@ -178,25 +207,26 @@ private async void ProcessTask(Models.Task task, CancellationToken token)

try
{
TaskResult taskResult = new TaskResult(taskId: task.TaskId, workflowInstanceId: task.WorkflowInstanceId);
TaskResult taskResult =
new TaskResult(taskId: task.TaskId, workflowInstanceId: task.WorkflowInstanceId);

if (token == CancellationToken.None)
taskResult = _worker.Execute(task);
else
taskResult = await _worker.Execute(task, token);
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Done processing task for worker"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
+ $", CancelToken: {token}"
);
$"[{_workerSettings.WorkerId}] Done processing task for worker"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
+ $", CancelToken: {token}"
);
UpdateTask(taskResult);
}
catch (Exception e)
{
_logger.LogDebug(
_logger.LogError(
$"[{_workerSettings.WorkerId}] Failed to process task for worker, reason: {e.Message}"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
Expand All @@ -206,7 +236,6 @@ private async void ProcessTask(Models.Task task, CancellationToken token)
);
var taskResult = task.Failed(e.Message);
UpdateTask(taskResult);

}
finally
{
Expand All @@ -228,6 +257,7 @@ private void UpdateTask(Models.TaskResult taskResult)
{
Sleep(TimeSpan.FromSeconds(1 << attemptCounter));
}

_taskClient.UpdateTask(taskResult);
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Done updating task"
Expand All @@ -240,7 +270,7 @@ private void UpdateTask(Models.TaskResult taskResult)
}
catch (Exception e)
{
_logger.LogTrace(
_logger.LogError(
$"[{_workerSettings.WorkerId}] Failed to update task, reason: {e.Message}"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
Expand All @@ -249,6 +279,7 @@ private void UpdateTask(Models.TaskResult taskResult)
);
}
}

throw new Exception("Failed to update task after retries");
}

Expand All @@ -260,7 +291,6 @@ private void Sleep(TimeSpan timeSpan)

private void LogInfo()
{

}
}
}
}
7 changes: 3 additions & 4 deletions Tests/Worker/WorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async System.Threading.Tasks.Task TestWorkflowAsyncExecution()
{
var workflow = GetConductorWorkflow();
ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(workflow, true);
var workflowIdList = await StartWorkflows(workflow, quantity: 35);
var workflowIdList = await StartWorkflows(workflow, quantity: 15);
await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20));
await ValidateWorkflowCompletion(workflowIdList.ToArray());
}
Expand All @@ -50,16 +50,15 @@ private async System.Threading.Tasks.Task<ConcurrentBag<string>> StartWorkflows(
var startedWorkflows = await WorkflowExtensions.StartWorkflows(
_workflowClient,
startWorkflowRequest,
maxAllowedInParallel: 10,
maxAllowedInParallel: 3,
total: quantity
);
return startedWorkflows;
}

private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout)
{
var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug);
host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug, new ClassWorker());
var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Information, new ClassWorker());
await host.StartAsync();
Thread.Sleep(workflowCompletionTimeout);
await host.StopAsync();
Expand Down
1 change: 0 additions & 1 deletion Tests/Worker/Workers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public static TaskResult SimpleWorker(Conductor.Client.Models.Task task)
public TaskResult LazyWorker(Conductor.Client.Models.Task task)
{
var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048));
Console.WriteLine($"Lazy worker is going to rest for {timeSpan.Milliseconds} ms");
System.Threading.Tasks.Task.Delay(timeSpan).GetAwaiter().GetResult();
return task.Completed();
}
Expand Down
11 changes: 6 additions & 5 deletions csharp-examples/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
FROM mcr.microsoft.com/dotnet/runtime:8.0-nanoserver-1809 AS base
FROM mcr.microsoft.com/dotnet/runtime:8.0.0-alpine3.18-amd64 AS base
RUN mkdir /app
WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:8.0-nanoserver-1809 AS build
FROM mcr.microsoft.com/dotnet/sdk:8.0.100-1-alpine3.18-amd64 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src

COPY ["csharp-examples/csharp-examples.csproj", "csharp-examples/"]
COPY ["Conductor/conductor-csharp.csproj", "Conductor/"]
RUN dotnet restore "./csharp-examples/./csharp-examples.csproj"
COPY . .
WORKDIR "/src/csharp-examples"
WORKDIR "csharp-examples"
RUN dotnet build "./csharp-examples.csproj" -c %BUILD_CONFIGURATION% -o /app/build

FROM build AS publish
Expand All @@ -18,4 +19,4 @@ RUN dotnet publish "./csharp-examples.csproj" -c %BUILD_CONFIGURATION% -o /app/p
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "csharp-examples.dll"]
ENTRYPOINT ["dotnet", "csharp-examples.dll"]
22 changes: 22 additions & 0 deletions csharp-examples/DockerfileMacArm
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM mcr.microsoft.com/dotnet/runtime:8.0.0-alpine3.18-arm64v8 AS base
RUN mkdir /app
WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:8.0.100-alpine3.18-arm64v8 AS build
ARG BUILD_CONFIGURATION=Release

COPY ["csharp-examples/csharp-examples.csproj", "csharp-examples/"]
COPY ["Conductor/conductor-csharp.csproj", "Conductor/"]
RUN dotnet restore "./csharp-examples/./csharp-examples.csproj"
COPY . .
WORKDIR "csharp-examples"
RUN dotnet build "./csharp-examples.csproj" -c %BUILD_CONFIGURATION% -o /app/build

FROM build AS publish
ARG BUILD_CONFIGURATION=Release
RUN dotnet publish "./csharp-examples.csproj" -c %BUILD_CONFIGURATION% -o /app/publish /p:UseAppHost=false

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "csharp-examples.dll"]
11 changes: 4 additions & 7 deletions csharp-examples/Program.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
using csharp_examples;

class Program
internal class Program
{
static void Main()
private static void Main()
{
Runner runner = new Runner();

// call specific method based on testing scenario
runner.RunMultiSimpleTask();
//runner.RunSimpleTask();
var runner = new Runner();
runner.StartTasks();
}
}
Loading

0 comments on commit aaf6405

Please sign in to comment.