Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: worker issues with Orkes Conductor API & other issues #123

Merged
merged 12 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
name: CI Build

on: [push, pull_request,workflow_dispatch]
on:
push:
branches:
- main
pull_request:
branches:
- main
workflow_dispatch:

jobs:
lint:
Expand Down
8 changes: 6 additions & 2 deletions Conductor/Api/EnvironmentResourceApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,13 @@ public ApiResponse<Dictionary<string, string>> GetAllWithHttpInfo()
if (exception != null) throw exception;
}

var list = (List<Dictionary<string, string>>)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(List<Dictionary<string, string>>));
var dictionary = list
.Where(item => item.ContainsKey("name") && item.ContainsKey("value"))
.ToDictionary(item => item["name"], item => item["value"]);
return new ApiResponse<Dictionary<string, string>>(localVarStatusCode,
v1r3n marked this conversation as resolved.
Show resolved Hide resolved
localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)),
(Dictionary<string, string>)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Dictionary<string, string>)));
localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)),
dictionary);
}

/// <summary>
Expand Down
10 changes: 9 additions & 1 deletion Conductor/Client/Authentication/TokenHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ private string GetTokenFromServer(OrkesAuthenticationSettings authenticationSett
{
try
{
return tokenClient.GenerateToken(tokenRequest)._token;
var token = tokenClient.GenerateToken(tokenRequest);
return token._token;
}
catch (ApiException e)
{
if (e.ErrorCode == 405 || e.ErrorCode == 404)
{
throw new Exception($"Error while getting authentication token. Is the config BasePath correct? {tokenClient.Configuration.BasePath}");
}
}
catch (Exception e)
{
Expand Down
21 changes: 2 additions & 19 deletions Conductor/Client/Models/WorkflowTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
Expand Down Expand Up @@ -225,24 +224,8 @@ public enum WorkflowTaskTypeEnum
/// <param name="workflowTaskType">workflowTaskType.</param>
public WorkflowTask(bool? asyncComplete = default(bool?), string caseExpression = default(string), string caseValueParam = default(string), Dictionary<string, List<WorkflowTask>> decisionCases = default(Dictionary<string, List<WorkflowTask>>), List<WorkflowTask> defaultCase = default(List<WorkflowTask>), List<string> defaultExclusiveJoinTask = default(List<string>), string description = default(string), string dynamicForkJoinTasksParam = default(string), string dynamicForkTasksInputParamName = default(string), string dynamicForkTasksParam = default(string), string dynamicTaskNameParam = default(string), string evaluatorType = default(string), string expression = default(string), List<List<WorkflowTask>> forkTasks = default(List<List<WorkflowTask>>), Dictionary<string, Object> inputParameters = default(Dictionary<string, Object>), List<string> joinOn = default(List<string>), string loopCondition = default(string), List<WorkflowTask> loopOver = default(List<WorkflowTask>), string name = default(string), bool? optional = default(bool?), bool? rateLimited = default(bool?), int? retryCount = default(int?), string scriptExpression = default(string), string sink = default(string), int? startDelay = default(int?), SubWorkflowParams subWorkflowParam = default(SubWorkflowParams), TaskDef taskDefinition = default(TaskDef), string taskReferenceName = default(string), string type = default(string), WorkflowTaskTypeEnum? workflowTaskType = default(WorkflowTaskTypeEnum?), Dictionary<string, StateChangeConfig> onStateChange = default(Dictionary<string, StateChangeConfig>))
{
// to ensure "name" is required (not null)
if (name == null)
{
throw new InvalidDataException("name is a required property for WorkflowTask and cannot be null");
}
else
{
this.Name = name;
}
// to ensure "taskReferenceName" is required (not null)
if (taskReferenceName == null)
{
throw new InvalidDataException("taskReferenceName is a required property for WorkflowTask and cannot be null");
}
else
{
this.TaskReferenceName = taskReferenceName;
}
this.TaskReferenceName = taskReferenceName;
v1r3n marked this conversation as resolved.
Show resolved Hide resolved
this.Name = name;
this.AsyncComplete = asyncComplete;
this.CaseExpression = caseExpression;
this.CaseValueParam = caseValueParam;
Expand Down
2 changes: 1 addition & 1 deletion Conductor/conductor-csharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="System.ComponentModel.Annotations" Version="5.0.0" />
<PackageReference Include="System.Net.Http.Json" Version="6.0.0" />
<PackageReference Include="RestSharp.Serializers.NewtonsoftJson" Version="110.2.0" />
<PackageReference Include="RestSharp.Serializers.NewtonsoftJson" Version="110.2.0" />
<None Include="/package/Conductor/README.md" Pack="true" PackagePath="/" />
</ItemGroup>
</Project>
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ARG CONDUCTOR_SERVER_URL
ENV KEY=${KEY}
ENV SECRET=${SECRET}
ENV CONDUCTOR_SERVER_URL=${CONDUCTOR_SERVER_URL}
COPY /csharp-examples /package/csharp-examples
COPY /Tests /package/Tests
WORKDIR /package/Tests
RUN dotnet test -l "console;verbosity=normal"
Expand Down
2 changes: 1 addition & 1 deletion Tests/Worker/WorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async System.Threading.Tasks.Task TestWorkflowAsyncExecution()
var workflow = GetConductorWorkflow();
ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(workflow, true);
var workflowIdList = await StartWorkflows(workflow, quantity: 15);
await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20));
await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(40));
jmigueprieto marked this conversation as resolved.
Show resolved Hide resolved
await ValidateWorkflowCompletion(workflowIdList.ToArray());
}

Expand Down
10 changes: 5 additions & 5 deletions Tests/Worker/Workers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ static FunctionalWorkers()
_random = new Random();
}

// Polls for 5 task every 200ms
[WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")]
// Polls for 5 task every 100ms
[WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 100, "simpleWorker")]
public static TaskResult SimpleWorker(Conductor.Client.Models.Task task)
{
return task.Completed();
}

// Polls for 12 tasks every 420ms
[WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")]
// Polls for 5 tasks every 420ms
[WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 420, "lazyWorker")]
public TaskResult LazyWorker(Conductor.Client.Models.Task task)
{
var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048));
var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(100, 900));
System.Threading.Tasks.Task.Delay(timeSpan).GetAwaiter().GetResult();
return task.Completed();
}
Expand Down
4 changes: 2 additions & 2 deletions Tests/conductor-csharp.test.csproj
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp6.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../Conductor/conductor-csharp.csproj" />
<ProjectReference Include="../csharp-examples/csharp-examples.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="TestData\integration_data.json">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public void AuditLog(object workflowInput, string status, string name)
}

[WorkerTask(taskType: "simple_task_1", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public static string SimpleTask1(Task task)
public static string SimpleTask1(Conductor.Client.Models.Task task)
{
return "OK";
}

[WorkerTask(taskType: "simple_task_2", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public static TaskResult SimpleTask2(Task task)
public static TaskResult SimpleTask2(Conductor.Client.Models.Task task)
{
return new TaskResult { Status = TaskResult.StatusEnum.FAILEDWITHTERMINALERROR };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static async Task<bool> StartBackGroundTask(ManualResetEvent waitHandle,
{
var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Information);
await host.StartAsync();
Thread.Sleep(20000);
Thread.Sleep(60000);
waitHandle.Set();
await host.StopAsync();
return true;
Expand Down
12 changes: 10 additions & 2 deletions csharp-examples/TestWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@

namespace csharp.examples;

public class TestWorker(string taskType) : IWorkflowTask
public class TestWorker : IWorkflowTask
{

private readonly Random rnd = new();

public string TaskType { get; } = taskType;
private readonly string taskType;

public TestWorker(string taskType)
{
this.taskType = taskType;
}

public string TaskType => taskType;
public WorkflowTaskExecutorConfiguration WorkerSettings { get; } = new WorkflowTaskExecutorConfiguration()
{
BatchSize = 20
Expand Down
4 changes: 2 additions & 2 deletions csharp-examples/csharp-examples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>csharp_examples</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
Expand All @@ -14,7 +14,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Conductor\conductor-csharp.csproj" />
<ProjectReference Include="../Conductor/conductor-csharp.csproj" />
</ItemGroup>

</Project>
Loading