Skip to content

Commit

Permalink
refactor all (#5)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting authored Nov 27, 2024
1 parent 4a96b17 commit 465f2ac
Show file tree
Hide file tree
Showing 12 changed files with 370 additions and 353 deletions.
257 changes: 0 additions & 257 deletions api/helpers/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,267 +4,10 @@ import (
"reflect"
"strconv"
"strings"
"time"

"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/dapr/durabletask-go/api/protos"
)

func NewExecutionTerminatedEvent(rawReason *wrapperspb.StringValue, recurse bool) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.Now(),
EventType: &protos.HistoryEvent_ExecutionTerminated{
ExecutionTerminated: &protos.ExecutionTerminatedEvent{
Input: rawReason,
Recurse: recurse,
},
},
}
}

func NewEventRaisedEvent(name string, rawInput *wrapperspb.StringValue) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_EventRaised{
EventRaised: &protos.EventRaisedEvent{Name: name, Input: rawInput},
},
}
}

func NewTaskScheduledEvent(
taskID int32,
name string,
version *wrapperspb.StringValue,
rawInput *wrapperspb.StringValue,
tc *protos.TraceContext,
) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: taskID,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_TaskScheduled{
TaskScheduled: &protos.TaskScheduledEvent{
Name: name,
Version: version,
Input: rawInput,
ParentTraceContext: tc,
},
},
}
}

func NewTaskCompletedEvent(taskID int32, result *wrapperspb.StringValue) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_TaskCompleted{
TaskCompleted: &protos.TaskCompletedEvent{
TaskScheduledId: taskID,
Result: result,
},
},
}
}

func NewTimerCreatedEvent(eventID int32, fireAt *timestamppb.Timestamp) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: eventID,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_TimerCreated{
TimerCreated: &protos.TimerCreatedEvent{FireAt: fireAt},
},
}
}

func NewTimerFiredEvent(
timerID int32,
fireAt *timestamppb.Timestamp,
parentTraceContext *protos.TraceContext,
) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_TimerFired{
TimerFired: &protos.TimerFiredEvent{
TimerId: timerID,
FireAt: fireAt,
},
},
}
}

func NewSubOrchestrationCreatedEvent(
eventID int32,
name string,
version *wrapperspb.StringValue,
rawInput *wrapperspb.StringValue,
instanceID string,
parentTraceContext *protos.TraceContext,
) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: eventID,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_SubOrchestrationInstanceCreated{
SubOrchestrationInstanceCreated: &protos.SubOrchestrationInstanceCreatedEvent{
Name: name,
Version: version,
Input: rawInput,
InstanceId: instanceID,
ParentTraceContext: parentTraceContext,
},
},
}
}

func NewSendEventEvent(eventID int32, instanceID string, name string, rawInput *wrapperspb.StringValue) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: eventID,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_EventSent{
EventSent: &protos.EventSentEvent{
InstanceId: instanceID,
Name: name,
Input: rawInput,
},
},
}
}

func NewSuspendOrchestrationEvent(reason string) *protos.HistoryEvent {
var input *wrapperspb.StringValue
if reason != "" {
input = wrapperspb.String(reason)
}
return &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_ExecutionSuspended{
ExecutionSuspended: &protos.ExecutionSuspendedEvent{
Input: input,
},
},
}
}

func NewResumeOrchestrationEvent(reason string) *protos.HistoryEvent {
var input *wrapperspb.StringValue
if reason != "" {
input = wrapperspb.String(reason)
}
return &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_ExecutionResumed{
ExecutionResumed: &protos.ExecutionResumedEvent{
Input: input,
},
},
}
}

func NewParentInfo(taskID int32, name string, iid string) *protos.ParentInstanceInfo {
return &protos.ParentInstanceInfo{
TaskScheduledId: taskID,
Name: wrapperspb.String(name),
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: iid},
}
}

func NewScheduleTaskAction(taskID int32, name string, input *wrapperspb.StringValue) *protos.OrchestratorAction {
return &protos.OrchestratorAction{
Id: taskID,
OrchestratorActionType: &protos.OrchestratorAction_ScheduleTask{
ScheduleTask: &protos.ScheduleTaskAction{Name: name, Input: input},
},
}
}

func NewCreateTimerAction(taskID int32, fireAt time.Time) *protos.OrchestratorAction {
return &protos.OrchestratorAction{
Id: taskID,
OrchestratorActionType: &protos.OrchestratorAction_CreateTimer{
CreateTimer: &protos.CreateTimerAction{FireAt: timestamppb.New(fireAt)},
},
}
}

func NewSendEventAction(iid string, name string, data *wrapperspb.StringValue) *protos.OrchestratorAction {
return &protos.OrchestratorAction{
Id: -1,
OrchestratorActionType: &protos.OrchestratorAction_SendEvent{
SendEvent: &protos.SendEventAction{
Instance: &protos.OrchestrationInstance{InstanceId: iid},
Name: name,
Data: data,
},
},
}
}

func NewCreateSubOrchestrationAction(
taskID int32,
name string,
iid string,
input *wrapperspb.StringValue,
) *protos.OrchestratorAction {
return &protos.OrchestratorAction{
Id: taskID,
OrchestratorActionType: &protos.OrchestratorAction_CreateSubOrchestration{
CreateSubOrchestration: &protos.CreateSubOrchestrationAction{
Name: name,
Input: input,
InstanceId: iid,
},
},
}
}

func NewCompleteOrchestrationAction(
taskID int32,
status protos.OrchestrationStatus,
rawResult *wrapperspb.StringValue,
carryoverEvents []*protos.HistoryEvent,
failureDetails *protos.TaskFailureDetails,
) *protos.OrchestratorAction {
return &protos.OrchestratorAction{
Id: taskID,
OrchestratorActionType: &protos.OrchestratorAction_CompleteOrchestration{
CompleteOrchestration: &protos.CompleteOrchestrationAction{
OrchestrationStatus: status,
Result: rawResult,
CarryoverEvents: carryoverEvents,
FailureDetails: failureDetails,
},
},
}
}

func NewTerminateOrchestrationAction(taskID int32, iid string, recurse bool, rawReason *wrapperspb.StringValue) *protos.OrchestratorAction {
return &protos.OrchestratorAction{
Id: taskID,
OrchestratorActionType: &protos.OrchestratorAction_TerminateOrchestration{
TerminateOrchestration: &protos.TerminateOrchestrationAction{
InstanceId: iid,
Recurse: recurse,
Reason: rawReason,
},
},
}
}

func NewTaskFailureDetails(err error) *protos.TaskFailureDetails {
if err == nil {
return nil
}
return &protos.TaskFailureDetails{
ErrorType: reflect.TypeOf(err).String(),
ErrorMessage: err.Error(),
}
}

func HistoryListSummary(list []*protos.HistoryEvent) string {
var sb strings.Builder
sb.WriteString("[")
Expand Down
13 changes: 11 additions & 2 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"fmt"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
Expand Down Expand Up @@ -169,7 +169,16 @@ func terminateSubOrchestrationInstances(ctx context.Context, be Backend, iid api
}
subOrchestrationInstances := getSubOrchestrationInstances(state.OldEvents(), state.NewEvents())
for _, subOrchestrationInstance := range subOrchestrationInstances {
e := helpers.NewExecutionTerminatedEvent(et.Input, et.Recurse)
e := &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.Now(),
EventType: &protos.HistoryEvent_ExecutionTerminated{
ExecutionTerminated: &protos.ExecutionTerminatedEvent{
Input: et.Input,
Recurse: et.Recurse,
},
},
}
// Adding terminate event to sub-orchestration instance
if err := be.AddNewOrchestrationEvent(ctx, subOrchestrationInstance, e); err != nil {
return fmt.Errorf("failed to submit termination request to sub-orchestration: %w", err)
Expand Down
47 changes: 43 additions & 4 deletions backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,16 @@ func (c *backendClient) TerminateOrchestration(ctx context.Context, id api.Insta
return fmt.Errorf("failed to configure termination request: %w", err)
}
}
e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
e := &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.Now(),
EventType: &protos.HistoryEvent_ExecutionTerminated{
ExecutionTerminated: &protos.ExecutionTerminatedEvent{
Input: req.Output,
Recurse: req.Recursive,
},
},
}
if err := c.be.AddNewOrchestrationEvent(ctx, id, e); err != nil {
return fmt.Errorf("failed to submit termination request:: %w", err)
}
Expand All @@ -179,7 +188,13 @@ func (c *backendClient) RaiseEvent(ctx context.Context, id api.InstanceID, event
}
}

e := helpers.NewEventRaisedEvent(req.Name, req.Input)
e := &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_EventRaised{
EventRaised: &protos.EventRaisedEvent{Name: req.Name, Input: req.Input},
},
}
if err := c.be.AddNewOrchestrationEvent(ctx, id, e); err != nil {
return fmt.Errorf("failed to raise event: %w", err)
}
Expand All @@ -190,7 +205,19 @@ func (c *backendClient) RaiseEvent(ctx context.Context, id api.InstanceID, event
//
// Note that suspended orchestrations are still considered to be "running" even though they will not process events.
func (c *backendClient) SuspendOrchestration(ctx context.Context, id api.InstanceID, reason string) error {
e := helpers.NewSuspendOrchestrationEvent(reason)
var input *wrapperspb.StringValue
if reason != "" {
input = wrapperspb.String(reason)
}
e := &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_ExecutionSuspended{
ExecutionSuspended: &protos.ExecutionSuspendedEvent{
Input: input,
},
},
}
if err := c.be.AddNewOrchestrationEvent(ctx, id, e); err != nil {
return fmt.Errorf("failed to suspend orchestration: %w", err)
}
Expand All @@ -199,7 +226,19 @@ func (c *backendClient) SuspendOrchestration(ctx context.Context, id api.Instanc

// ResumeOrchestration resumes an orchestration instance that was previously suspended.
func (c *backendClient) ResumeOrchestration(ctx context.Context, id api.InstanceID, reason string) error {
e := helpers.NewResumeOrchestrationEvent(reason)
var input *wrapperspb.StringValue
if reason != "" {
input = wrapperspb.String(reason)
}
e := &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_ExecutionResumed{
ExecutionResumed: &protos.ExecutionResumedEvent{
Input: input,
},
},
}
if err := c.be.AddNewOrchestrationEvent(ctx, id, e); err != nil {
return fmt.Errorf("failed to resume orchestration: %w", err)
}
Expand Down
Loading

0 comments on commit 465f2ac

Please sign in to comment.