Skip to content

Commit

Permalink
Added Examples to test the Workflow Execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Jithesh-poojary committed May 29, 2024
1 parent 99848f2 commit c649dff
Show file tree
Hide file tree
Showing 19 changed files with 638 additions and 172 deletions.
12 changes: 12 additions & 0 deletions Conductor/Client/Models/Task.cs
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,18 @@ public override int GetHashCode()
}
}

// Method to convert Task to TaskResult
public TaskResult ToTaskResult(TaskResult.StatusEnum status = TaskResult.StatusEnum.COMPLETED)
{
return new TaskResult
{
TaskId = this.TaskId,
WorkflowInstanceId = this.WorkflowInstanceId,
WorkerId = this.WorkerId,
Status = status
};
}

/// <summary>
/// To validate all properties of the instance
/// </summary>
Expand Down
35 changes: 17 additions & 18 deletions Conductor/Examples/Copilot/OpenAICopilot.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
using conductor.csharp.Client.Extensions;
using conductor.Examples;
using Conductor.Api;
Expand All @@ -33,6 +33,7 @@

namespace Conductor.Examples.Copilot
{
[WorkerTask]
public class OpenAICopilot
{
private readonly WorkflowResourceApi _workflowClient;
Expand All @@ -42,7 +43,6 @@ public class OpenAICopilot
private readonly ILogger _logger;

//consts

public const string FUNCTIONCHATBOX = "my_function_chatbot";
public const string FUNCTIONCHATBOXDESCRIPTION = "test_function_chatbot";

Expand All @@ -60,7 +60,7 @@ public OpenAICopilot()
//_metaDataClient = _orkesApiClient.GetClient<MetadataResourceApi>();
}

[WorkerTask("get_customer_list", 5, "taskDomain", 200, "workerId")]
[WorkerTask(taskType: "get_customer_list", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public List<Customer> GetCustomerList()
{
var customers = new List<Customer>();
Expand All @@ -80,29 +80,29 @@ public List<Customer> GetCustomerList()
return customers;
}

[WorkerTask("get_top_n", 5, "taskDomain", 200, "workerId")]
[WorkerTask(taskType: "get_top_n", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public List<Customer> GetTopNCustomers(int n, List<Customer> customers)
{
var sortedCustomers = customers.OrderByDescending(c => c.AnnualSpend).ToList();
var end = Math.Min(n + 1, sortedCustomers.Count);
return sortedCustomers.GetRange(1, end - 1);
}

[WorkerTask("generate_promo_code", 5, "taskDomain", 200, "workerId")]
[WorkerTask(taskType: "generate_promo_code", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public string GeneratePromoCode()
{
var random = new Random();
var promoCode = GenerateRandomString(random, ExampleConstants.RANDOMCHARACTERS, 5);
return promoCode;
}

[WorkerTask("send_email", 5, "taskDomain", 200, "workerId")]
[WorkerTask(taskType: "send_email", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public string SendEmail(List<Customer> customers, string promoCode)
{
return $"Sent {promoCode} to {customers.Count} customers";
}

[WorkerTask(taskType: "create_workflow", 5, "taskDomain", 520, "workerId")]
[WorkerTask(taskType: "create_workflow", batchSize: 5, pollIntervalMs: 520, workerId: "workerId")]
public Dictionary<string, object> CreateWorkflow(List<string> steps, Dictionary<string, object> inputs)
{
var workflow = new ConductorWorkflow()
Expand Down Expand Up @@ -169,7 +169,6 @@ public void OpenAICopilotTest()

Tasks subWorkFlow = new SubWorkflowTask("execute_workflow", new SubWorkflowParams(ExampleConstants.COPILOTEXECUTION));

//Pass task reference name once the annotation is in place
var registerWorkFlow = CreateWorkflow(steps: new List<string> { chatComplete.Output("function_parameters.steps") }, inputs: new Dictionary<string, object>
{
{ "step", "function_parameters.inputs" }
Expand Down
62 changes: 34 additions & 28 deletions Conductor/Examples/DynamicWorkflow.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
using conductor.Examples;
using Conductor.Api;
using Conductor.Client;
using Conductor.Client.Extensions;
using Conductor.Client.Models;
using Conductor.Client.Worker;
using Conductor.Definition;
using Conductor.Definition.TaskType;
using Conductor.Examples.Workers;
using Conductor.Executor;
using System.Collections.Generic;
using System.Threading;

namespace Conductor.Examples
{
[WorkerTask]
public class DynamicWorkflow
{
private readonly WorkflowResourceApi _workflowClient;
Expand All @@ -47,13 +48,13 @@ public DynamicWorkflow()
//_metaDataClient = _orkesApiClient.GetClient<MetadataResourceApi>();
}

[WorkerTask(taskType: "GetEmail", 5, "taskDomain", 520, "workerId")]
[WorkerTask(taskType: ExampleConstants.GetEmail, batchSize: 5, pollIntervalMs: 520, workerId: "workerId")]
public string GetUserEmail(string userId)
{
return $"{userId}@example.com";
}

[WorkerTask(taskType: "SendEmail", 5, "taskDomain", 520, "workerId")]
[WorkerTask(taskType: ExampleConstants.SendEmail, batchSize: 5, pollIntervalMs: 520, workerId: "workerId")]
public string SendEmail(string email, string subject, string body)
{
return $"sending email to {email} with subject {subject} and body {body}";
Expand All @@ -68,39 +69,44 @@ public void DynamicWorkFlowMain()

workflow.WithInputParameter("userId");

//Once the annotator is ready we have to remove this piece of code as the task creation will be taken care inside the wrapper method
var getEmailTask = new SimpleTask("GetEmail", "GetEmail").WithInput("InputVaraible", workflow.Input("userId"));
getEmailTask.Description = "Test Get email";
var getEmailTask = new SimpleTask(ExampleConstants.GetEmail, ExampleConstants.GetEmail).WithInput("userId", workflow.Input("userId"));
getEmailTask.Description = ExampleConstants.GetEmailDescription;
workflow.WithTask(getEmailTask);

var SendEmailTask = new SimpleTask("SendEmail", "Send_Email_refTask").WithInput("InputVaraible", workflow.Input("userId"));
SendEmailTask.Description = "Test Get email";
var SendEmailTask = new SimpleTask(ExampleConstants.SendEmail, ExampleConstants.SendEmail).WithInput("email", workflow.Input("email")).WithInput("subject", workflow.Input("subject")).WithInput("body", workflow.Input("body"));
SendEmailTask.Description = ExampleConstants.SendEmailDescription;
workflow.WithTask(SendEmailTask);

TaskDef taskDef = new TaskDef() { Description = "test", Name = "dynamic_workflow-task" };
List<TaskDef> taskDefs = new List<TaskDef>()
{
new TaskDef{Description = ExampleConstants.GetEmailDescription, Name = ExampleConstants.GetEmail },
new TaskDef{Description = ExampleConstants.SendEmailDescription,Name = ExampleConstants.SendEmail}
};

_metaDataClient.RegisterTaskDef(new List<TaskDef>() { taskDef });
_workflowExecutor.RegisterWorkflow(workflow, true);
_metaDataClient.RegisterTaskDef(taskDefs);
_metaDataClient.UpdateWorkflowDefinitions(new List<WorkflowDef>(1) { workflow });

var testInput = new Dictionary<string, object>
{
{ "userId", "Test" }
{ "userId", "Test" },
{ "email", "[email protected]" },
{ "subject", "SubjectTest" },
{ "body" , "BodyDescription" }
};

StartWorkflowRequest startWorkflow = new StartWorkflowRequest()
StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest()
{
Name = workflow.Name,
Input = testInput,
Version = workflow.Version,
WorkflowDef = workflow,
CreatedBy = Constants.OWNER_EMAIL
CreatedBy = Constants.OWNER_EMAIL,
};

var workflowTask = _workflowExecutor.StartWorkflow(startWorkflow);
_workflowClient.StartWorkflow(startWorkflowRequest);
var waitHandle = new ManualResetEvent(false);

//For testing purpose the worker is created manually for now. Once the annotation logic is in place we can getrid of this
var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle, new DynamicWorker("GetEmail")));
var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle));
waitHandle.WaitOne();
}
}
Expand Down
7 changes: 7 additions & 0 deletions Conductor/Examples/ExampleConstant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ 2. get_price_from_amazon(str: item) -> float (useful to get the price of an item
""type"": ""function"",
""function"": ""ACTUAL_.NET_FUNCTION_NAME_TO_CALL_WITHOUT_PARAMETERS""
""function_parameters"": ""PARAMETERS FOR THE FUNCTION as a JSON map with key as parameter name and value as parameter value""";

public const string GetEmail = "GetEmail";
public const string SendEmail = "SendEmail";
public const string GetEmailDescription = "Test Get email";
public const string SendEmailDescription = "Test send email";
public const string GreetDescription = "Greet Test";
public const string GreetTask = "Greet";
}
}

34 changes: 28 additions & 6 deletions Conductor/Examples/GreetingsMain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,50 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
using Conductor.Api;
using Conductor.Client;
using Conductor.Client.Extensions;
using Conductor.Client.Models;
using Conductor.Definition;
using Conductor.Examples.Workers;
using Conductor.Executor;
using System.Collections.Generic;
using System.Threading;

namespace Conductor.Examples
{
public class GreetingsMain
{
private readonly Configuration _configuration;
private readonly WorkflowResourceApi workflowClient;
private readonly MetadataResourceApi _metaDataClient;

public GreetingsMain()
{
_configuration = new Client.Configuration();
_metaDataClient = ApiExtensions.GetClient<MetadataResourceApi>();

//dev local testing
//_configuration = new Client.Configuration();
//var _orkesApiClient = new OrkesApiClient(_configuration, new OrkesAuthenticationSettings(Constants.KEY_ID, Constants.KEY_SECRET));
//_metaDataClient = _orkesApiClient.GetClient<MetadataResourceApi>();
}
public (ConductorWorkflow, string workflowId) RegisterWorkflow(WorkflowExecutor workflowExecutor)
{
GreetingsWorkflow greetingsWorkflow = new GreetingsWorkflow();
var workflow = greetingsWorkflow.CreateGreetingsWorkflow();
workflowExecutor.RegisterWorkflow(workflow, true);
var workflowId = workflowExecutor.StartWorkflow(workflow);
_metaDataClient.UpdateWorkflowDefinitions(new List<WorkflowDef>(1) { workflow });
var testInput = new Dictionary<string, object>
{
{ "name","test" }
};
StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest()
{
Name = workflow.Name,
Input = testInput,
Version = workflow.Version,
WorkflowDef = workflow,
CreatedBy = Constants.OWNER_EMAIL
};
var workflowId = workflowExecutor.StartWorkflow(startWorkflowRequest);
return (workflow, workflowId);
}

Expand All @@ -40,8 +63,7 @@ public void GreetingsMainMethod()
(ConductorWorkflow workflow, string workflowId) = RegisterWorkflow(workflowExecutor);

var waitHandle = new ManualResetEvent(false);
//Remove DynamicWorker once the annotation is in place
var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle, new DynamicWorker("greetings_task_test")));
var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Utils.WorkerUtil.StartBackGroundTask(waitHandle));
waitHandle.WaitOne();
}
}
Expand Down
1 change: 0 additions & 1 deletion Conductor/Examples/Orkes/OpenAIChatGpt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public void OpenAIChatGPTTest()
string collectorJs = ConversationCollector.GetConversation();
var collect = new JavascriptTask(taskReferenceName: "collect_ref", script: collectorJs);

//we have to add collector collectHistoryTask once the annotation implementation is done
WorkflowTask[] loopTasks = new WorkflowTask[] { chatComplete, followUpGen };
var chatLoop = new LoopTask(taskReferenceName: "loop", iterations: 3, loopOver: loopTasks);

Expand Down
1 change: 0 additions & 1 deletion Conductor/Examples/Orkes/OpenAIChatUserInput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void OpenAIChatGPTTest()
string collectorJs = ConversationCollector.GetConversation();
var collect = new JavascriptTask(taskReferenceName: "collect_ref", script: collectorJs);

//we have to add collector collectHistoryTask once the annotation implementation is done
WorkflowTask[] loopTasks = new WorkflowTask[] { userInput, chatComplete };
var chatLoop = new LoopTask(taskReferenceName: "loop", iterations: 5, loopOver: loopTasks);

Expand Down
6 changes: 3 additions & 3 deletions Conductor/Examples/Orkes/OpenAIFunctionExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

namespace Conductor.Examples.Orkes
{
[WorkerTask]
public class OpenAIFunctionExample
{
private readonly Client.Configuration _configuration;
Expand Down Expand Up @@ -65,13 +66,13 @@ public OpenAIFunctionExample()

}

[WorkerTask(taskType: "getWeather", batchSize: 5, domain: "taskDomain", pollIntervalMs: 200, workerId: "workerId")]
[WorkerTask(taskType: "getWeather", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public string GetWeather(string city)
{
return $"Weather in {city} today is rainy";
}

[WorkerTask(taskType: "getPriceFromAmazon", batchSize: 5, domain: "taskDomain", pollIntervalMs: 200, workerId: "workerId")]
[WorkerTask(taskType: "getPriceFromAmazon", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public float GetPriceFromAmazon(string products)
{
return 42.42F;
Expand Down Expand Up @@ -123,7 +124,6 @@ public void OpenAIFunctionExampleTest()
functionCall.InputParameters["inputs"] = chatComplete.Output("function_parameters");
functionCall.InputParameters[ExampleConstants.DYNAMICTASKINPUTPARAM] = ExampleConstants.INPUTS;

//we have to add collectHistoryTask once the annotation implementation is done
WorkflowTask[] loop_tasks = new WorkflowTask[] { user_input, chatComplete, functionCall };
var chat_loop = new LoopTask("loop", 3, loop_tasks);

Expand Down
Loading

0 comments on commit c649dff

Please sign in to comment.