Skip to content

Commit

Permalink
bug: don't create multiple runs when rerunning workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuildthecloud committed Nov 21, 2024
1 parent e5fc89a commit b920c70
Show file tree
Hide file tree
Showing 14 changed files with 508 additions and 66 deletions.
10 changes: 9 additions & 1 deletion apiclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,11 @@ func toStream[T any](resp *http.Response) chan T {
go func() {
defer resp.Body.Close()
defer close(ch)
var eventName string
lines := bufio.NewScanner(resp.Body)
for lines.Scan() {
var obj T
if data, ok := strings.CutPrefix(lines.Text(), "data: "); ok {
if data, ok := strings.CutPrefix(lines.Text(), "data: "); ok && eventName == "" || eventName == "message" {
if log.IsDebug() {
log.Fields("data", data).Debugf("Received data")
}
Expand All @@ -169,6 +170,13 @@ func toStream[T any](resp *http.Response) chan T {
ch <- obj
}
}
} else if event, ok := strings.CutPrefix(lines.Text(), "event: "); ok {
if log.IsDebug() {
log.Fields("event", event).Debugf("Received event")
}
eventName = event
} else if strings.TrimSpace(lines.Text()) == "" {
eventName = ""
}
}
}()
Expand Down
26 changes: 26 additions & 0 deletions apiclient/types/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package types

type Task struct {
Metadata
TaskManifest
}

type TaskList List[Task]

type TaskManifest struct {
Name string `json:"name"`
Description string `json:"description"`
Steps []TaskStep `json:"steps"`
}

type TaskStep struct {
ID string `json:"id,omitempty"`
If *TaskIf `json:"if,omitempty"`
Step string `json:"step,omitempty"`
}

type TaskIf struct {
Condition string `json:"condition,omitempty"`
Steps []TaskStep `json:"steps,omitempty"`
Else []TaskStep `json:"else,omitempty"`
}
12 changes: 6 additions & 6 deletions pkg/api/handlers/assistants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func NewAssistantHandler(invoker *invoke.Invoker, events *events.Emitter, gptScr
}
}

func getAgent(req api.Context, id string) (*v1.Agent, error) {
func getAssistant(req api.Context, id string) (*v1.Agent, error) {
var agent v1.Agent
if err := alias.Get(req.Context(), req.Storage, &agent, req.Namespace(), id); err != nil {
if err := alias.Get(req.Context(), req.Storage, &agent, "", id); err != nil {
return nil, err
}
return &agent, nil
Expand All @@ -47,7 +47,7 @@ func (a *AssistantHandler) Invoke(req api.Context) error {
id = req.PathValue("id")
)

agent, err := getAgent(req, id)
agent, err := getAssistant(req, id)
if err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func getUserThread(req api.Context, agentID string) (*v1.Thread, error) {
return &thread, nil
}

agent, err := getAgent(req, agentID)
agent, err := getAssistant(req, agentID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -373,7 +373,7 @@ func (a *AssistantHandler) AddTool(req api.Context) error {
tool = req.PathValue("tool")
)

agent, err := getAgent(req, id)
agent, err := getAssistant(req, id)
if err != nil {
return err
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (a *AssistantHandler) Tools(req api.Context) error {
id = req.PathValue("id")
)

agent, err := getAgent(req, id)
agent, err := getAssistant(req, id)
if err != nil {
return err
}
Expand Down
293 changes: 293 additions & 0 deletions pkg/api/handlers/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package handlers

import (
"net/http"
"slices"

"github.com/otto8-ai/otto8/apiclient/types"
"github.com/otto8-ai/otto8/pkg/api"
"github.com/otto8-ai/otto8/pkg/events"
"github.com/otto8-ai/otto8/pkg/invoke"
v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1"
"github.com/otto8-ai/otto8/pkg/system"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type TaskHandler struct {
invoker *invoke.Invoker
events *events.Emitter
}

func NewTaskHandler(invoker *invoke.Invoker, events *events.Emitter) *TaskHandler {
return &TaskHandler{
invoker: invoker,
events: events,
}
}

func (t *TaskHandler) Events(req api.Context) error {
var (
follow = req.URL.Query().Get("follow") == "true"
)

workflow, err := t.getTask(req)
if err != nil {
return err
}

var thread v1.Thread
if err := req.Get(&thread, req.PathValue("thread_id")); kclient.IgnoreNotFound(err) != nil {
return err
}

if thread.Spec.WorkflowName != workflow.Name {
return types.NewErrHttp(http.StatusForbidden, "thread does not belong to the task")
}

_, events, err := t.events.Watch(req.Context(), req.Namespace(), events.WatchOptions{
History: true,
MaxRuns: 100,
ThreadName: thread.Name,
Follow: true,
FollowWorkflowExecutions: follow,
})
if err != nil {
return err
}

return req.WriteEvents(events)
}

func (t *TaskHandler) Run(req api.Context) error {
var (
threadID = req.Request.URL.Query().Get("thread")
stepID = req.Request.URL.Query().Get("step")
)

workflow, err := t.getTask(req)
if err != nil {
return err
}

resp, err := t.invoker.Workflow(req.Context(), req.Storage, workflow, "", invoke.WorkflowOptions{
ThreadName: threadID,
StepID: stepID,
})
if err != nil {
return err
}

return req.WriteCreated(map[string]any{
"threadID": resp.Thread.Name,
})
}

func (t *TaskHandler) Delete(req api.Context) error {
workflow, err := t.getTask(req)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

return req.Delete(workflow)
}

func (t *TaskHandler) Update(req api.Context) error {
workflow, err := t.getTask(req)
if err != nil {
return err
}

_, manifest, err := t.getAssistantAndManifestFromRequest(req)
if err != nil {
return err
}

workflow.Spec.Manifest = manifest
if err := req.Update(workflow); err != nil {
return err
}

return req.Write(convertTask(*workflow))
}

func (t *TaskHandler) getAssistantAndManifestFromRequest(req api.Context) (*v1.Agent, types.WorkflowManifest, error) {
assistantID := req.PathValue("assistant_id")

assistant, err := getAssistant(req, assistantID)
if err != nil {
return nil, types.WorkflowManifest{}, err
}

thread, err := getUserThread(req, assistantID)
if err != nil {
return nil, types.WorkflowManifest{}, err
}

var manifest types.TaskManifest
if err := req.Read(&manifest); err != nil {
return nil, types.WorkflowManifest{}, err
}

if manifest.Name == "" {
manifest.Name = "New Task"
}

return assistant, toWorkflowManifest(assistant, thread, manifest), nil
}

func (t *TaskHandler) Create(req api.Context) error {
assistant, workflowManifest, err := t.getAssistantAndManifestFromRequest(req)
if err != nil {
return err
}

workflow := v1.Workflow{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.WorkflowPrefix,
Namespace: req.Namespace(),
},
Spec: v1.WorkflowSpec{
AgentName: assistant.Name,
UserID: req.User.GetUID(),
Manifest: workflowManifest,
},
}

if err := req.Create(&workflow); err != nil {
return err
}

return req.WriteCreated(convertTask(workflow))
}

func toWorkflowManifest(agent *v1.Agent, thread *v1.Thread, manifest types.TaskManifest) types.WorkflowManifest {
workflowManifest := types.WorkflowManifest{
AgentManifest: agent.Spec.Manifest,
}

for _, tool := range thread.Spec.Manifest.Tools {
if !slices.Contains(workflowManifest.Tools, tool) {
workflowManifest.Tools = append(workflowManifest.Tools, tool)
}
}

workflowManifest.Steps = toWorkflowSteps(manifest.Steps)
workflowManifest.Name = manifest.Name
workflowManifest.Description = manifest.Description
return workflowManifest
}

func toWorkflowSteps(steps []types.TaskStep) []types.Step {
workflowSteps := make([]types.Step, 0, len(steps))
for _, step := range steps {
workflowSteps = append(workflowSteps, types.Step{
ID: step.ID,
Step: step.Step,
If: toWorkflowIf(step.If),
})
}
return workflowSteps
}

func toWorkflowIf(ifStep *types.TaskIf) *types.If {
if ifStep == nil {
return nil
}
return &types.If{
Condition: ifStep.Condition,
Steps: toWorkflowSteps(ifStep.Steps),
Else: toWorkflowSteps(ifStep.Else),
}
}

func (t *TaskHandler) Get(req api.Context) error {
task, err := t.getTask(req)
if err != nil {
return err
}

return req.Write(convertTask(*task))
}

func (t *TaskHandler) getTask(req api.Context) (*v1.Workflow, error) {
assistantID := req.PathValue("assistant_id")

var workflow v1.Workflow
if err := req.Get(&workflow, req.PathValue("id")); err != nil {
return nil, err
}

assistant, err := getAssistant(req, assistantID)
if err != nil {
return nil, err
}

if workflow.Spec.AgentName != assistant.Name || workflow.Spec.UserID != req.User.GetUID() {
return nil, types.NewErrHttp(http.StatusForbidden, "task does not belong to the user")
}

return &workflow, nil
}

func (t *TaskHandler) List(req api.Context) error {
assistant, err := getAssistant(req, req.PathValue("assistant_id"))
if err != nil {
return err
}

var workflows v1.WorkflowList
if err := req.List(&workflows, kclient.MatchingFields{
"spec.agentName": assistant.Name,
"spec.userID": req.User.GetUID(),
}); err != nil {
return err
}

taskList := types.TaskList{Items: make([]types.Task, 0, len(workflows.Items))}

for _, workflow := range workflows.Items {
taskList.Items = append(taskList.Items, convertTask(workflow))
}

return req.Write(taskList)
}

func convertTask(workflow v1.Workflow) types.Task {
task := types.Task{
Metadata: MetadataFrom(&workflow),
TaskManifest: types.TaskManifest{
Name: workflow.Spec.Manifest.Name,
Description: workflow.Spec.Manifest.Description,
},
}
task.Steps = toTaskSteps(workflow.Spec.Manifest.Steps)
return task
}

func toTaskSteps(steps []types.Step) []types.TaskStep {
taskSteps := make([]types.TaskStep, 0, len(steps))
for _, step := range steps {
taskSteps = append(taskSteps, types.TaskStep{
ID: step.ID,
Step: step.Step,
If: toIf(step.If),
})
}
return taskSteps
}

func toIf(ifStep *types.If) *types.TaskIf {
if ifStep == nil {
return nil
}
return &types.TaskIf{
Condition: ifStep.Condition,
Steps: toTaskSteps(ifStep.Steps),
Else: toTaskSteps(ifStep.Else),
}
}
Loading

0 comments on commit b920c70

Please sign in to comment.