From c649dffd49b893142bff8092aff7a7f2514dc440 Mon Sep 17 00:00:00 2001 From: Jithesh Poojary Date: Wed, 29 May 2024 08:26:55 +0530 Subject: [PATCH] Added Examples to test the Workflow Execution --- Conductor/Client/Models/Task.cs | 12 ++ Conductor/Examples/Copilot/OpenAICopilot.cs | 35 ++-- Conductor/Examples/DynamicWorkflow.cs | 62 ++++--- Conductor/Examples/ExampleConstant.cs | 7 + Conductor/Examples/GreetingsMain.cs | 34 +++- Conductor/Examples/Orkes/OpenAIChatGpt.cs | 1 - .../Examples/Orkes/OpenAIChatUserInput.cs | 1 - .../Examples/Orkes/OpenAIFunctionExample.cs | 6 +- Conductor/Examples/Orkes/OpenAIHelloworld.cs | 26 +-- .../Examples/Orkes/TaskStatusChangeAudit.cs | 75 +++++---- .../Examples/Orkes/VectorDbHelloWorld.cs | 3 +- Conductor/Examples/Orkes/WaitForWebhook.cs | 66 ++++---- .../Examples/Orkes/Workers/ChatWorkers.cs | 25 +-- .../Examples/Orkes/Workers/UserDetails.cs | 34 ++++ Conductor/Examples/ShellWorker.cs | 140 +++++++++++++++ Conductor/Examples/TaskConfigure.cs | 57 +++++++ Conductor/Examples/TaskWorkers.cs | 159 ++++++++++++++++++ Conductor/Examples/Utils/WorkerUtil.cs | 2 +- .../Examples/Workers/GreetingsWorkflow.cs | 65 ++++--- 19 files changed, 638 insertions(+), 172 deletions(-) create mode 100644 Conductor/Examples/Orkes/Workers/UserDetails.cs create mode 100644 Conductor/Examples/ShellWorker.cs create mode 100644 Conductor/Examples/TaskConfigure.cs create mode 100644 Conductor/Examples/TaskWorkers.cs diff --git a/Conductor/Client/Models/Task.cs b/Conductor/Client/Models/Task.cs index aee2bb8..e2537fc 100644 --- a/Conductor/Client/Models/Task.cs +++ b/Conductor/Client/Models/Task.cs @@ -801,6 +801,18 @@ public override int GetHashCode() } } + // Method to convert Task to TaskResult + public TaskResult ToTaskResult(TaskResult.StatusEnum status = TaskResult.StatusEnum.COMPLETED) + { + return new TaskResult + { + TaskId = this.TaskId, + WorkflowInstanceId = this.WorkflowInstanceId, + WorkerId = this.WorkerId, + Status = status + }; + } + /// /// To validate all properties of the instance /// diff --git a/Conductor/Examples/Copilot/OpenAICopilot.cs b/Conductor/Examples/Copilot/OpenAICopilot.cs index 365bd23..4c78418 100644 --- a/Conductor/Examples/Copilot/OpenAICopilot.cs +++ b/Conductor/Examples/Copilot/OpenAICopilot.cs @@ -1,15 +1,15 @@ /* - * Copyright 2024 Conductor Authors. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ using conductor.csharp.Client.Extensions; using conductor.Examples; using Conductor.Api; @@ -33,6 +33,7 @@ namespace Conductor.Examples.Copilot { + [WorkerTask] public class OpenAICopilot { private readonly WorkflowResourceApi _workflowClient; @@ -42,7 +43,6 @@ public class OpenAICopilot private readonly ILogger _logger; //consts - public const string FUNCTIONCHATBOX = "my_function_chatbot"; public const string FUNCTIONCHATBOXDESCRIPTION = "test_function_chatbot"; @@ -60,7 +60,7 @@ public OpenAICopilot() //_metaDataClient = _orkesApiClient.GetClient(); } - [WorkerTask("get_customer_list", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "get_customer_list", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public List GetCustomerList() { var customers = new List(); @@ -80,7 +80,7 @@ public List GetCustomerList() return customers; } - [WorkerTask("get_top_n", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "get_top_n", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public List GetTopNCustomers(int n, List customers) { var sortedCustomers = customers.OrderByDescending(c => c.AnnualSpend).ToList(); @@ -88,7 +88,7 @@ public List GetTopNCustomers(int n, List customers) return sortedCustomers.GetRange(1, end - 1); } - [WorkerTask("generate_promo_code", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "generate_promo_code", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public string GeneratePromoCode() { var random = new Random(); @@ -96,13 +96,13 @@ public string GeneratePromoCode() return promoCode; } - [WorkerTask("send_email", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "send_email", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public string SendEmail(List customers, string promoCode) { return $"Sent {promoCode} to {customers.Count} customers"; } - [WorkerTask(taskType: "create_workflow", 5, "taskDomain", 520, "workerId")] + [WorkerTask(taskType: "create_workflow", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] public Dictionary CreateWorkflow(List steps, Dictionary inputs) { var workflow = new ConductorWorkflow() @@ -169,7 +169,6 @@ public void OpenAICopilotTest() Tasks subWorkFlow = new SubWorkflowTask("execute_workflow", new SubWorkflowParams(ExampleConstants.COPILOTEXECUTION)); - //Pass task reference name once the annotation is in place var registerWorkFlow = CreateWorkflow(steps: new List { chatComplete.Output("function_parameters.steps") }, inputs: new Dictionary { { "step", "function_parameters.inputs" } diff --git a/Conductor/Examples/DynamicWorkflow.cs b/Conductor/Examples/DynamicWorkflow.cs index 9c6f1e0..3ad64f4 100644 --- a/Conductor/Examples/DynamicWorkflow.cs +++ b/Conductor/Examples/DynamicWorkflow.cs @@ -1,15 +1,16 @@ /* - * Copyright 2024 Conductor Authors. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ +using conductor.Examples; using Conductor.Api; using Conductor.Client; using Conductor.Client.Extensions; @@ -17,13 +18,13 @@ using Conductor.Client.Worker; using Conductor.Definition; using Conductor.Definition.TaskType; -using Conductor.Examples.Workers; using Conductor.Executor; using System.Collections.Generic; using System.Threading; namespace Conductor.Examples { + [WorkerTask] public class DynamicWorkflow { private readonly WorkflowResourceApi _workflowClient; @@ -47,13 +48,13 @@ public DynamicWorkflow() //_metaDataClient = _orkesApiClient.GetClient(); } - [WorkerTask(taskType: "GetEmail", 5, "taskDomain", 520, "workerId")] + [WorkerTask(taskType: ExampleConstants.GetEmail, batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] public string GetUserEmail(string userId) { return $"{userId}@example.com"; } - [WorkerTask(taskType: "SendEmail", 5, "taskDomain", 520, "workerId")] + [WorkerTask(taskType: ExampleConstants.SendEmail, batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] public string SendEmail(string email, string subject, string body) { return $"sending email to {email} with subject {subject} and body {body}"; @@ -68,39 +69,44 @@ public void DynamicWorkFlowMain() workflow.WithInputParameter("userId"); - //Once the annotator is ready we have to remove this piece of code as the task creation will be taken care inside the wrapper method - var getEmailTask = new SimpleTask("GetEmail", "GetEmail").WithInput("InputVaraible", workflow.Input("userId")); - getEmailTask.Description = "Test Get email"; + var getEmailTask = new SimpleTask(ExampleConstants.GetEmail, ExampleConstants.GetEmail).WithInput("userId", workflow.Input("userId")); + getEmailTask.Description = ExampleConstants.GetEmailDescription; workflow.WithTask(getEmailTask); - var SendEmailTask = new SimpleTask("SendEmail", "Send_Email_refTask").WithInput("InputVaraible", workflow.Input("userId")); - SendEmailTask.Description = "Test Get email"; + var SendEmailTask = new SimpleTask(ExampleConstants.SendEmail, ExampleConstants.SendEmail).WithInput("email", workflow.Input("email")).WithInput("subject", workflow.Input("subject")).WithInput("body", workflow.Input("body")); + SendEmailTask.Description = ExampleConstants.SendEmailDescription; workflow.WithTask(SendEmailTask); - TaskDef taskDef = new TaskDef() { Description = "test", Name = "dynamic_workflow-task" }; + List taskDefs = new List() +{ +new TaskDef{Description = ExampleConstants.GetEmailDescription, Name = ExampleConstants.GetEmail }, +new TaskDef{Description = ExampleConstants.SendEmailDescription,Name = ExampleConstants.SendEmail} +}; - _metaDataClient.RegisterTaskDef(new List() { taskDef }); - _workflowExecutor.RegisterWorkflow(workflow, true); + _metaDataClient.RegisterTaskDef(taskDefs); + _metaDataClient.UpdateWorkflowDefinitions(new List(1) { workflow }); var testInput = new Dictionary { -{ "userId", "Test" } +{ "userId", "Test" }, +{ "email", "email@gmail.com" }, +{ "subject", "SubjectTest" }, +{ "body" , "BodyDescription" } }; - StartWorkflowRequest startWorkflow = new StartWorkflowRequest() + StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest() { Name = workflow.Name, Input = testInput, Version = workflow.Version, WorkflowDef = workflow, - CreatedBy = Constants.OWNER_EMAIL + CreatedBy = Constants.OWNER_EMAIL, }; - var workflowTask = _workflowExecutor.StartWorkflow(startWorkflow); + _workflowClient.StartWorkflow(startWorkflowRequest); var waitHandle = new ManualResetEvent(false); - //For testing purpose the worker is created manually for now. Once the annotation logic is in place we can getrid of this - var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle, new DynamicWorker("GetEmail"))); + var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle)); waitHandle.WaitOne(); } } diff --git a/Conductor/Examples/ExampleConstant.cs b/Conductor/Examples/ExampleConstant.cs index e3b4970..80043b9 100644 --- a/Conductor/Examples/ExampleConstant.cs +++ b/Conductor/Examples/ExampleConstant.cs @@ -133,6 +133,13 @@ 2. get_price_from_amazon(str: item) -> float (useful to get the price of an item ""type"": ""function"", ""function"": ""ACTUAL_.NET_FUNCTION_NAME_TO_CALL_WITHOUT_PARAMETERS"" ""function_parameters"": ""PARAMETERS FOR THE FUNCTION as a JSON map with key as parameter name and value as parameter value"""; + + public const string GetEmail = "GetEmail"; + public const string SendEmail = "SendEmail"; + public const string GetEmailDescription = "Test Get email"; + public const string SendEmailDescription = "Test send email"; + public const string GreetDescription = "Greet Test"; + public const string GreetTask = "Greet"; } } diff --git a/Conductor/Examples/GreetingsMain.cs b/Conductor/Examples/GreetingsMain.cs index 7f11592..9e57478 100644 --- a/Conductor/Examples/GreetingsMain.cs +++ b/Conductor/Examples/GreetingsMain.cs @@ -10,10 +10,13 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ +using Conductor.Api; using Conductor.Client; +using Conductor.Client.Extensions; +using Conductor.Client.Models; using Conductor.Definition; -using Conductor.Examples.Workers; using Conductor.Executor; +using System.Collections.Generic; using System.Threading; namespace Conductor.Examples @@ -21,16 +24,36 @@ namespace Conductor.Examples public class GreetingsMain { private readonly Configuration _configuration; + private readonly WorkflowResourceApi workflowClient; + private readonly MetadataResourceApi _metaDataClient; + public GreetingsMain() { - _configuration = new Client.Configuration(); + _metaDataClient = ApiExtensions.GetClient(); + + //dev local testing + //_configuration = new Client.Configuration(); + //var _orkesApiClient = new OrkesApiClient(_configuration, new OrkesAuthenticationSettings(Constants.KEY_ID, Constants.KEY_SECRET)); + //_metaDataClient = _orkesApiClient.GetClient(); } public (ConductorWorkflow, string workflowId) RegisterWorkflow(WorkflowExecutor workflowExecutor) { GreetingsWorkflow greetingsWorkflow = new GreetingsWorkflow(); var workflow = greetingsWorkflow.CreateGreetingsWorkflow(); - workflowExecutor.RegisterWorkflow(workflow, true); - var workflowId = workflowExecutor.StartWorkflow(workflow); + _metaDataClient.UpdateWorkflowDefinitions(new List(1) { workflow }); + var testInput = new Dictionary +{ +{ "name","test" } +}; + StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest() + { + Name = workflow.Name, + Input = testInput, + Version = workflow.Version, + WorkflowDef = workflow, + CreatedBy = Constants.OWNER_EMAIL + }; + var workflowId = workflowExecutor.StartWorkflow(startWorkflowRequest); return (workflow, workflowId); } @@ -40,8 +63,7 @@ public void GreetingsMainMethod() (ConductorWorkflow workflow, string workflowId) = RegisterWorkflow(workflowExecutor); var waitHandle = new ManualResetEvent(false); - //Remove DynamicWorker once the annotation is in place - var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle, new DynamicWorker("greetings_task_test"))); + var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle)); waitHandle.WaitOne(); } } diff --git a/Conductor/Examples/Orkes/OpenAIChatGpt.cs b/Conductor/Examples/Orkes/OpenAIChatGpt.cs index df7422a..ad9cf4e 100644 --- a/Conductor/Examples/Orkes/OpenAIChatGpt.cs +++ b/Conductor/Examples/Orkes/OpenAIChatGpt.cs @@ -90,7 +90,6 @@ public void OpenAIChatGPTTest() string collectorJs = ConversationCollector.GetConversation(); var collect = new JavascriptTask(taskReferenceName: "collect_ref", script: collectorJs); - //we have to add collector collectHistoryTask once the annotation implementation is done WorkflowTask[] loopTasks = new WorkflowTask[] { chatComplete, followUpGen }; var chatLoop = new LoopTask(taskReferenceName: "loop", iterations: 3, loopOver: loopTasks); diff --git a/Conductor/Examples/Orkes/OpenAIChatUserInput.cs b/Conductor/Examples/Orkes/OpenAIChatUserInput.cs index e713540..633b101 100644 --- a/Conductor/Examples/Orkes/OpenAIChatUserInput.cs +++ b/Conductor/Examples/Orkes/OpenAIChatUserInput.cs @@ -77,7 +77,6 @@ public void OpenAIChatGPTTest() string collectorJs = ConversationCollector.GetConversation(); var collect = new JavascriptTask(taskReferenceName: "collect_ref", script: collectorJs); - //we have to add collector collectHistoryTask once the annotation implementation is done WorkflowTask[] loopTasks = new WorkflowTask[] { userInput, chatComplete }; var chatLoop = new LoopTask(taskReferenceName: "loop", iterations: 5, loopOver: loopTasks); diff --git a/Conductor/Examples/Orkes/OpenAIFunctionExample.cs b/Conductor/Examples/Orkes/OpenAIFunctionExample.cs index 6599ba7..c769ac3 100644 --- a/Conductor/Examples/Orkes/OpenAIFunctionExample.cs +++ b/Conductor/Examples/Orkes/OpenAIFunctionExample.cs @@ -31,6 +31,7 @@ namespace Conductor.Examples.Orkes { + [WorkerTask] public class OpenAIFunctionExample { private readonly Client.Configuration _configuration; @@ -65,13 +66,13 @@ public OpenAIFunctionExample() } - [WorkerTask(taskType: "getWeather", batchSize: 5, domain: "taskDomain", pollIntervalMs: 200, workerId: "workerId")] + [WorkerTask(taskType: "getWeather", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public string GetWeather(string city) { return $"Weather in {city} today is rainy"; } - [WorkerTask(taskType: "getPriceFromAmazon", batchSize: 5, domain: "taskDomain", pollIntervalMs: 200, workerId: "workerId")] + [WorkerTask(taskType: "getPriceFromAmazon", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public float GetPriceFromAmazon(string products) { return 42.42F; @@ -123,7 +124,6 @@ public void OpenAIFunctionExampleTest() functionCall.InputParameters["inputs"] = chatComplete.Output("function_parameters"); functionCall.InputParameters[ExampleConstants.DYNAMICTASKINPUTPARAM] = ExampleConstants.INPUTS; - //we have to add collectHistoryTask once the annotation implementation is done WorkflowTask[] loop_tasks = new WorkflowTask[] { user_input, chatComplete, functionCall }; var chat_loop = new LoopTask("loop", 3, loop_tasks); diff --git a/Conductor/Examples/Orkes/OpenAIHelloworld.cs b/Conductor/Examples/Orkes/OpenAIHelloworld.cs index c41b529..530e1e3 100644 --- a/Conductor/Examples/Orkes/OpenAIHelloworld.cs +++ b/Conductor/Examples/Orkes/OpenAIHelloworld.cs @@ -1,15 +1,15 @@ /* - * Copyright 2024 Conductor Authors. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ using conductor.csharp.Client.Extensions; using conductor.Examples; using Conductor.Api; @@ -29,6 +29,7 @@ namespace Conductor.Examples.Orkes { + [WorkerTask] public class OpenAIHelloworld { private readonly Client.Configuration _configuration; @@ -57,7 +58,7 @@ public OpenAIHelloworld() //_workflowClient = _orkesApiClient.GetClient(); } - [WorkerTask("get_friends_name", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "get_friends_name", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public string GetFriendName() { string name = Environment.UserName; @@ -79,7 +80,6 @@ public void OpenAIHelloworldTest() promptTemplateTestRequest.Prompt = ExampleConstants.OPENAIPROMPTTEXT; _orchestrator.TestPromptTemplate(promptTemplateTestRequest); - //Update this logic to use GetFriendName() once the annotaion is in place var getName = new SimpleTask("get_friends_name", "get_friends_name_ref"); var textComplete = new LlmTextComplete(taskRefName: "say_hi_ref", llmProvider: llmProvider, model: textCompleteModel, promptName: ExampleConstants.OPENAIPROMPTNAME); textComplete.PromptVariable("friend_name", getName.Output("result")); diff --git a/Conductor/Examples/Orkes/TaskStatusChangeAudit.cs b/Conductor/Examples/Orkes/TaskStatusChangeAudit.cs index cad4013..1f7156c 100644 --- a/Conductor/Examples/Orkes/TaskStatusChangeAudit.cs +++ b/Conductor/Examples/Orkes/TaskStatusChangeAudit.cs @@ -1,15 +1,15 @@ /* - * Copyright 2024 Conductor Authors. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ using conductor.csharp.Client.Extensions; using Conductor.Api; using Conductor.Client.Extensions; @@ -23,6 +23,7 @@ namespace csharp_examples { + [WorkerTask] public class TaskStatusChangeAudit { private readonly Conductor.Client.Configuration _configuration; @@ -52,19 +53,19 @@ public TaskStatusChangeAudit() //_metaDataClient = _orkesApiClient.GetClient(); } - [WorkerTask("audit_log", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "audit_log", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public void AuditLog(object workflowInput, string status, string name) { _logger.LogInformation($"task {name} is in {status} status, with workflow input as {workflowInput}"); } - [WorkerTask("simple_task_1", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "simple_task_1", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public static string SimpleTask1(Task task) { return "OK"; } - [WorkerTask("simple_task_2", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "simple_task_2", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public static TaskResult SimpleTask2(Task task) { return new TaskResult { Status = TaskResult.StatusEnum.FAILEDWITHTERMINALERROR }; @@ -72,15 +73,15 @@ public static TaskResult SimpleTask2(Task task) public void TaskStatusChangeAuditTest() { - var workflowDef = new WorkflowDef() { Name = WORKFLOW_DEF_NAME, Version = 1 }; + var workflowDef = new WorkflowDef() { Name = WORKFLOW_DEF_NAME, Version = 1, Description = "test" }; // Create an instance of StateChangeEvent StateChangeEvent stateChangeEvent = new StateChangeEvent( type: TYPE, payload: new Dictionary { - { "workflow_input", "${workflow.input}" }, - { "status", "${simple_task_1_ref.status}" }, - { "name", SIMPLE_TASK1_REF_NAME } +{ "workflow_input", "${workflow.input}" }, +{ "status", "${simple_task_1_ref.status}" }, +{ "name", SIMPLE_TASK1_REF_NAME } }); var task1 = new WorkflowTask() @@ -89,10 +90,10 @@ public void TaskStatusChangeAuditTest() Name = SIMPLETASK1, TaskReferenceName = SIMPLE_TASK1_REF_NAME, OnStateChange = new Dictionary(){ - { - "", new StateChangeConfig(eventType: new List { StateChangeEventType.OnStart }, events: new List() { stateChangeEvent }) - } - } +{ +"", new StateChangeConfig(eventType: new List { StateChangeEventType.OnStart }, events: new List() { stateChangeEvent }) +} +} }; var task_def = new TaskDef(); @@ -103,9 +104,9 @@ public void TaskStatusChangeAuditTest() type: TYPE, payload: new Dictionary { - { "workflow_input", "${workflow.input}" }, - { "status", "${simple_task_2_ref.status}" }, - { "name", SIMPLE_TASK2_REF_NAME } +{ "workflow_input", "${workflow.input}" }, +{ "status", "${simple_task_2_ref.status}" }, +{ "name", SIMPLE_TASK2_REF_NAME } }); var task2 = new WorkflowTask() { @@ -114,14 +115,14 @@ public void TaskStatusChangeAuditTest() TaskReferenceName = SIMPLE_TASK2_REF_NAME, TaskDefinition = task_def, OnStateChange = new Dictionary(){ - { - "", new StateChangeConfig(eventType: new List(){ - StateChangeEventType.OnStart, - StateChangeEventType.OnFailed, - StateChangeEventType.OnScheduled, - },events: new List() { stateChangeEvent2 }) - } - } +{ +"", new StateChangeConfig(eventType: new List(){ +StateChangeEventType.OnStart, +StateChangeEventType.OnFailed, +StateChangeEventType.OnScheduled, +},events: new List() { stateChangeEvent2 }) +} +} }; workflowDef.Tasks.Add(task1); @@ -138,10 +139,10 @@ public void TaskStatusChangeAuditTest() Name = workflowDef.Name, Version = workflowDef.Version, Input = { - { "a", "aa" }, - { "b", "bb" }, - { "c", 42 } - } +{ "a", "aa" }, +{ "b", "bb" }, +{ "c", 42 } +} }; var workflowId = executor.StartWorkflow(startReq); diff --git a/Conductor/Examples/Orkes/VectorDbHelloWorld.cs b/Conductor/Examples/Orkes/VectorDbHelloWorld.cs index 8d56bd6..cd78267 100644 --- a/Conductor/Examples/Orkes/VectorDbHelloWorld.cs +++ b/Conductor/Examples/Orkes/VectorDbHelloWorld.cs @@ -29,6 +29,7 @@ namespace Conductor.Examples.Orkes { + [WorkerTask] public class VectorDBHelloWorld { private readonly Conductor.Client.Configuration _configuration; @@ -53,7 +54,7 @@ public VectorDBHelloWorld() //_workflowClient = _orkesApiClient.GetClient(); } - [WorkerTask("get_friends_name", 5, "taskDomain", 200, "workerId")] + [WorkerTask(taskType: "get_friends_name", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")] public string GetFriendName() { string name = Environment.UserName; diff --git a/Conductor/Examples/Orkes/WaitForWebhook.cs b/Conductor/Examples/Orkes/WaitForWebhook.cs index 035eb6e..1e191dc 100644 --- a/Conductor/Examples/Orkes/WaitForWebhook.cs +++ b/Conductor/Examples/Orkes/WaitForWebhook.cs @@ -1,16 +1,17 @@ /* - * Copyright 2024 Conductor Authors. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ using conductor.csharp.Client.Extensions; +using conductor.Examples; using Conductor.Api; using Conductor.Client; using Conductor.Client.Ai; @@ -28,6 +29,7 @@ namespace Conductor.Examples { + [WorkerTask] public class WaitForWebhook { private readonly Client.Configuration _configuration; @@ -39,8 +41,8 @@ public class WaitForWebhook private readonly ILogger _logger; //Const - private const string WORKFLOWNAME = "dynamic_workflow"; - private const string WORKFLOWDESC = "test_dynamic_workflow"; + private const string WORKFLOWNAME = "wait_for_webHook"; + private const string WORKFLOWDESC = "test_wait for webhook"; public WaitForWebhook() { @@ -56,13 +58,13 @@ public WaitForWebhook() //_metaDataClient = _orkesApiClient.GetClient(); } - [WorkerTask(taskType: "GetEmail", 5, "taskDomain", 520, "workerId")] + [WorkerTask(taskType: ExampleConstants.GetEmail, batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] public string GetUserEmail(string userId) { return $"{userId}@example.com"; } - [WorkerTask(taskType: "SendEmail", 5, "taskDomain", 520, "workerId")] + [WorkerTask(taskType: ExampleConstants.SendEmail, batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] public string SendEmail(string email, string subject, string body) { return $"sending email to {email} with subject {subject} and body {body}"; @@ -71,37 +73,37 @@ public string SendEmail(string email, string subject, string body) public void WaitForWebhookTest() { ConductorWorkflow workflow = new ConductorWorkflow() - .WithName(WORKFLOWNAME) - .WithDescription(WORKFLOWDESC) - .WithVersion(1); + .WithName(WORKFLOWNAME) + .WithDescription(WORKFLOWDESC) + .WithVersion(1); workflow.WithInputParameter("userId"); - //Update this line to use GetUserEmail() once the annotation is in place - var getEmailTask = new SimpleTask("GetEmail", "GetEmail").WithInput("userId", workflow.Input("userId")); + var getEmailTask = new SimpleTask(ExampleConstants.GetEmail, ExampleConstants.GetEmail).WithInput("userId", workflow.Input("userId")); getEmailTask.Description = "Test Get email"; workflow.WithTask(getEmailTask); - ////Update this line to use SendEmail() once the annotation is in place - var SendEmailTask = new SimpleTask("SendEmail", "Send_Email_refTask") - .WithInput("email", getEmailTask.Output("userId")) - .WithInput("subject", "Hello from Orkes") - .WithInput("body", "Test Email"); + var SendEmailTask = new SimpleTask("SendEmail", "SendEmail") + .WithInput("email", workflow.Input("email")) + .WithInput("subject", workflow.Input("subject")) + .WithInput("body", workflow.Input("body")); workflow.WithTask(SendEmailTask); var WaitForWebhookTask = new WaitForWebHookTask("wait_ref", new Dictionary { { "type", "customer" }, { "id", workflow.Input("userId") } }); workflow.WithTask(WaitForWebhookTask); - _workflowExecutor.RegisterWorkflow(workflow, true); var testInput = new Dictionary - { - { "userId", "Test" } - }; +{ +{ "userId", "Test" }, +{"email","email test" }, +{"body","body test" }, +{"subject","subject test" } +}; - StartWorkflowRequest startWorkflow = new StartWorkflowRequest() + StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest() { Name = workflow.Name, Input = testInput, @@ -110,9 +112,9 @@ public void WaitForWebhookTest() CreatedBy = Constants.OWNER_EMAIL }; - var workflowRun = _workflowClient.ExecuteWorkflow(startWorkflow, "1234", startWorkflow.Name, 1); + var workflowRun = _workflowClient.ExecuteWorkflow(startWorkflowRequest, "1234", startWorkflowRequest.Name, 1); var waitHandle = new ManualResetEvent(false); - var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle, new DynamicWorker("GetEmail"))); + var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle)); waitHandle.WaitOne(); _logger.LogInformation($"\nworkflow execution {workflowRun.WorkflowId}\n"); } diff --git a/Conductor/Examples/Orkes/Workers/ChatWorkers.cs b/Conductor/Examples/Orkes/Workers/ChatWorkers.cs index c5f1616..6355ac7 100644 --- a/Conductor/Examples/Orkes/Workers/ChatWorkers.cs +++ b/Conductor/Examples/Orkes/Workers/ChatWorkers.cs @@ -1,27 +1,28 @@ /* - * Copyright 2024 Conductor Authors. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ using Conductor.Client.Worker; using Conductor.Definition.TaskType.LlmTasks; using System.Collections.Generic; namespace Conductor.Examples.Orkes.Workers { + [WorkerTask] public class ChatWorkers { public const string USERROLE = "user"; public const string ASSISTANTROLE = "assistant"; - [WorkerTask(taskType: "prep", 5, "taskDomain", 2000, "workerId")] + [WorkerTask(taskType: "prep", batchSize: 5, pollIntervalMs: 2000, workerId: "workerId")] public static List CollectHistory(string userInput, string seedQuestion, string assistantResponse, List history) { var allHistory = new List(); diff --git a/Conductor/Examples/Orkes/Workers/UserDetails.cs b/Conductor/Examples/Orkes/Workers/UserDetails.cs new file mode 100644 index 0000000..544d1aa --- /dev/null +++ b/Conductor/Examples/Orkes/Workers/UserDetails.cs @@ -0,0 +1,34 @@ +/* +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ +using System.Collections.Generic; + +namespace Conductor.Examples.Orkes.Workers +{ + public class UserDetails + { + public string Name { get; set; } + public string UserId { get; set; } + public string Phone { get; set; } + public string Email { get; set; } + public List Addresses { get; set; } + + public UserDetails(string name, string userId, List addresses, string phone = default, string email = default) + { + Name = name; + UserId = userId; + Phone = phone; + Email = email; + Addresses = addresses; + } + } +} diff --git a/Conductor/Examples/ShellWorker.cs b/Conductor/Examples/ShellWorker.cs new file mode 100644 index 0000000..510d4fc --- /dev/null +++ b/Conductor/Examples/ShellWorker.cs @@ -0,0 +1,140 @@ +/* +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ +using conductor.Examples; +using Conductor.Api; +using Conductor.Client; +using Conductor.Client.Extensions; +using Conductor.Client.Models; +using Conductor.Client.Worker; +using Conductor.Definition; +using Conductor.Definition.TaskType; +using Conductor.Executor; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; + +namespace Conductor.Examples +{ + [WorkerTask] + public class ShellWorker + { + private readonly WorkflowResourceApi _workflowClient; + private readonly MetadataResourceApi _metaDataClient; + private readonly WorkflowExecutor _workflowExecutor; + + //const + private const string WorkflowName = "shellWorker"; + private const string WorkflowDescription = "test_shell_worker"; + + public ShellWorker() + { + Configuration configuration = new Configuration(); + _workflowClient = ApiExtensions.GetClient(); + _metaDataClient = ApiExtensions.GetClient(); + + //For local testing + //var _orkesApiClient = new OrkesApiClient(configuration, new OrkesAuthenticationSettings(Constants.KEY_ID, Constants.KEY_SECRET)); + //_workflowClient = _orkesApiClient.GetClient(); + //_metaDataClient = _orkesApiClient.GetClient(); + } + + [WorkerTask(taskType: "ExecuteShell1", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] + public string Executeshell1(string command, string[] args) + { + if (command == null) + { + throw new ArgumentNullException(nameof(command), "Command cannot be null."); + } + + if (args == null) + { + throw new ArgumentNullException(nameof(args), "Arguments cannot be null."); + } + var processStartInfo = new ProcessStartInfo + { + FileName = "", // Add a application fileName based on OS. Ex: cmd.exe + RedirectStandardOutput = true, + UseShellExecute = false, + Arguments = $"/c dir {string.Join(" ", args)}" + }; + + var process = new Process + { + StartInfo = processStartInfo + }; + + process.Start(); + + StringBuilder output = new StringBuilder(); + while (!process.StandardOutput.EndOfStream) + { + output.AppendLine(process.StandardOutput.ReadLine()); + } + + return output.ToString(); + } + + [WorkerTask(taskType: "ExecuteShell", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] + public string ExecuteShell() + { + return "hello"; + } + + public void ShellWorkerMain() + { + ConductorWorkflow workflow = new ConductorWorkflow() + .WithName(WorkflowName) + .WithDescription(WorkflowDescription) + .WithVersion(1); + + var getEmailTask = new SimpleTask("ExecuteShell1", "ExecuteShell1").WithInput("command", workflow.Input("command")).WithInput("args", workflow.Input("args")); + getEmailTask.Description = ExampleConstants.GetEmailDescription; + workflow.WithTask(getEmailTask); + + var SendEmailTask = new SimpleTask("ExecuteShell", "ExecuteShell"); + SendEmailTask.Description = ExampleConstants.SendEmailDescription; + workflow.WithTask(SendEmailTask); + List taskDefs = new List() +{ +new TaskDef{Description = "test", Name = ExampleConstants.GetEmail }, +new TaskDef{Description = "test", Name = ExampleConstants.SendEmail } + +}; + + _metaDataClient.RegisterTaskDef(taskDefs); + _metaDataClient.UpdateWorkflowDefinitions(new List(1) { workflow }); + var testInput = new Dictionary + { + //{ "command", "ls" }, // uncomment this line and change the command according to the context. + //{ "args", new[] { "/c", "dir" } } // uncomment this line and change the args according to the given context. + }; + + StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest() + { + Name = workflow.Name, + Input = testInput, + Version = workflow.Version, + WorkflowDef = workflow, + CreatedBy = Constants.OWNER_EMAIL + }; + + _workflowClient.StartWorkflow(startWorkflowRequest); + var waitHandle = new ManualResetEvent(false); + + var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Conductor.Examples.Utils.WorkerUtil.StartBackGroundTask(waitHandle)); + waitHandle.WaitOne(); + } + } +} diff --git a/Conductor/Examples/TaskConfigure.cs b/Conductor/Examples/TaskConfigure.cs new file mode 100644 index 0000000..6771143 --- /dev/null +++ b/Conductor/Examples/TaskConfigure.cs @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +using conductor.csharp.Client.Extensions; +using Conductor.Api; +using Conductor.Client.Models; +using Microsoft.Extensions.Logging; +using System.Collections.Generic; + +namespace Conductor.Examples +{ + public class TaskConfigure + { + private readonly MetadataResourceApi _metaDataClient; + private readonly ILogger logger; + private const string url = "https://sdkdev.orkesconductor.io/taskDef/"; + public TaskConfigure() + { + //metaDataClient = ApiExtensions.GetClient(); + logger = ApplicationLogging.CreateLogger(); + + // dev local testing + //Configuration configuration = new Configuration(); + //var _orkesApiClient = new OrkesApiClient(configuration, new OrkesAuthenticationSettings(Constants.KEY_ID, Constants.KEY_SECRET)); + //_metaDataClient = _orkesApiClient.GetClient(); + } + TaskDef taskDef = new TaskDef() + { + Name = "task_with_retries", + Description = "task_with_retries description", + RetryCount = 3, + RetryLogic = TaskDef.RetryLogicEnum.LINEARBACKOFF, + RetryDelaySeconds = 1, + ConcurrentExecLimit = 3, + PollTimeoutSeconds = 60, + TimeoutSeconds = 120, + ResponseTimeoutSeconds = 60, + RateLimitPerFrequency = 100, + RateLimitFrequencyInSeconds = 10 + }; + public void RegisterTaskDef() + { + _metaDataClient.RegisterTaskDef(new List() { taskDef }); + logger.LogInformation($"Registered the task -- see the details in {url}/{taskDef.Name}"); + + } + } +} diff --git a/Conductor/Examples/TaskWorkers.cs b/Conductor/Examples/TaskWorkers.cs new file mode 100644 index 0000000..e99408d --- /dev/null +++ b/Conductor/Examples/TaskWorkers.cs @@ -0,0 +1,159 @@ +/* +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ +using Conductor.Client; +using Conductor.Client.Models; +using Conductor.Client.Worker; +using Conductor.Definition.TaskType; +using Conductor.Definition; +using Conductor.Examples.Orkes.Workers; +using System; +using System.Collections.Generic; +using System.Threading; +using Conductor.Api; +using Conductor.Client.Extensions; +using Conductor.Executor; +using Task = Conductor.Client.Models.Task; + +namespace Conductor.Examples +{ + public class OrderInfo + { + public int OrderId { get; set; } + public string Sku { get; set; } + public int Quantity { get; set; } + public float SkuPrice { get; set; } + } + + [WorkerTask] + public class TaskWorkers + { + private readonly MetadataResourceApi _metaDataClient; + private readonly WorkflowResourceApi _workflowClient; + private readonly WorkflowExecutor _workflowExecutor; + + //const + private const string WorkflowName = "Task_Workers"; + private const string WorkflowDescription = "Task workers decription"; + + public TaskWorkers() + { + var config = new Configuration(); + _workflowExecutor = new WorkflowExecutor(config); + _workflowClient = ApiExtensions.GetClient(); + _metaDataClient = ApiExtensions.GetClient(); + + //For local testing + //var _orkesApiClient = new OrkesApiClient(config, new OrkesAuthenticationSettings(Constants.KEY_ID, Constants.KEY_SECRET)); + //_workflowClient = _orkesApiClient.GetClient(); + //_metaDataClient = _orkesApiClient.GetClient(); + } + + [WorkerTask(taskType: "get_user_info", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] + public static UserDetails GetUserInfo(string userId) + { + if (userId == null) + { + userId = "none"; + } + return new UserDetails("user_" + userId, userId, new List +{ +new +{ +street = "21 jump street", +city = "new york" +} +}); + } + + [WorkerTask(taskType: "save_order", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] + public static OrderInfo SaveOrder(OrderInfo orderDetails) + { + orderDetails.SkuPrice = orderDetails.Quantity * orderDetails.SkuPrice; + return orderDetails; + } + + [WorkerTask(taskType: "process_task", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] + public static TaskResult ProcessTask(Task task) + { + { + TaskResult taskResult = task.ToTaskResult(TaskResult.StatusEnum.COMPLETED); + taskResult.OutputData["name"] = "orkes"; + taskResult.OutputData["complex"] = new UserDetails(name: "u1", addresses: new List(), userId: "5"); + taskResult.OutputData["time"] = DateTime.Now; + return taskResult; + } + } + + [WorkerTask(taskType: "failure", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] + public static void AlwaysFail() + { + // Raising NonRetryableException updates the task with FAILED_WITH_TERMINAL_ERROR status + throw new InvalidOperationException("This worker task will always have a terminal failure"); + } + + [WorkerTask(taskType: "fail_but_retry", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")] + public static int FailButRetry() + { + var random = new Random(); + var numx = random.Next(0, 11); + if (numx < 8) + { + // Raising NonRetryableException updates the task with FAILED_WITH_TERMINAL_ERROR status + throw new Exception($"Number {numx} is less than 8. I am going to fail this task and retry"); + } + return numx; + } + + public void TaskWorkersFlowMain() + { + ConductorWorkflow workflow = new ConductorWorkflow() + .WithName(WorkflowName) + .WithDescription(WorkflowDescription) + .WithVersion(1); + + workflow.WithInputParameter("userId"); + + var GetUserInfo = new SimpleTask("get_user_info", "get_user_info").WithInput("userId", workflow.Input("userId")); + GetUserInfo.Description = "Get User Info decription"; + workflow.WithTask(GetUserInfo); + + List taskDefs = new List() +{ +new TaskDef{Description = "Get user info", Name = "GetUserinfo" }, +}; + + _metaDataClient.RegisterTaskDef(taskDefs); + _metaDataClient.UpdateWorkflowDefinitions(new List(1) { workflow }); + + var testInput = new Dictionary +{ +{ "userId", "Test" } +}; + + StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest() + { + Name = workflow.Name, + Input = testInput, + Version = workflow.Version, + WorkflowDef = workflow, + CreatedBy = Constants.OWNER_EMAIL + }; + + _workflowClient.StartWorkflow(startWorkflowRequest); + var waitHandle = new ManualResetEvent(false); + + var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle)); + waitHandle.WaitOne(); + } + } +} diff --git a/Conductor/Examples/Utils/WorkerUtil.cs b/Conductor/Examples/Utils/WorkerUtil.cs index 3870be8..00bfe50 100644 --- a/Conductor/Examples/Utils/WorkerUtil.cs +++ b/Conductor/Examples/Utils/WorkerUtil.cs @@ -30,7 +30,7 @@ public static async Task StartBackGroundTask(ManualResetEvent waitHandle, { try { - var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Information, workers); + var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Information); await host.StartAsync(); Thread.Sleep(20000); waitHandle.Set(); diff --git a/Conductor/Examples/Workers/GreetingsWorkflow.cs b/Conductor/Examples/Workers/GreetingsWorkflow.cs index 66a2dbd..b9dfc8a 100644 --- a/Conductor/Examples/Workers/GreetingsWorkflow.cs +++ b/Conductor/Examples/Workers/GreetingsWorkflow.cs @@ -1,27 +1,44 @@ /* - * Copyright 2024 Conductor Authors. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ +* Copyright 2024 Conductor Authors. +*

+* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +*

+* http://www.apache.org/licenses/LICENSE-2.0 +*

+* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ +using conductor.Examples; +using Conductor.Api; +using Conductor.Client.Extensions; +using Conductor.Client.Models; using Conductor.Client.Worker; using Conductor.Definition; using Conductor.Definition.TaskType; +using System.Collections.Generic; namespace Conductor.Examples { + [WorkerTask] public class GreetingsWorkflow { + private readonly MetadataResourceApi _metaDataClient; private const string WorkflowName = "Test_Workflow_Greeting"; private const string WorkflowDescription = "test description"; - [WorkerTask("greetings_task", 5, "taskDomain", 420, "workerId")] + public GreetingsWorkflow() + { + _metaDataClient = ApiExtensions.GetClient(); + + //dev local testing + //Configuration configuration = new Configuration(); + //var _orkesApiClient = new OrkesApiClient(configuration, new OrkesAuthenticationSettings(Constants.KEY_ID, Constants.KEY_SECRET)); + //_metaDataClient = _orkesApiClient.GetClient(); + + } + [WorkerTask(taskType: "Greet", batchSize: 5, pollIntervalMs: 420, workerId: "workerId")] public string Greet(string name) { return $"Hello {name}"; @@ -29,13 +46,23 @@ public string Greet(string name) public ConductorWorkflow CreateGreetingsWorkflow() { - var wf = new ConductorWorkflow() - .WithName(WorkflowName) - .WithDescription(WorkflowDescription) - .WithVersion(1) - //Here call Greet() instead of creating SimpleTask manually. - .WithTask(new SimpleTask("greetings_task_test", "greet_ref_test")); - return wf; + var workflow = new ConductorWorkflow() + .WithName(WorkflowName) + .WithDescription(WorkflowDescription) + .WithVersion(1); + + var greetTask = new SimpleTask(ExampleConstants.GreetTask, ExampleConstants.GreetTask).WithInput("name", workflow.Input("name")); + greetTask.Description = ExampleConstants.GreetDescription; + workflow.WithTask(greetTask); + + TaskDef taskDefs = new TaskDef() + { + Description = "test", + Name = ExampleConstants.GreetTask + }; + + _metaDataClient.RegisterTaskDef(new List() { taskDefs }); + return workflow; } } } \ No newline at end of file