Skip to content

Commit

Permalink
Experimental user metadata support
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Aug 16, 2024
1 parent 1f0296c commit 0c43849
Show file tree
Hide file tree
Showing 20 changed files with 403 additions and 20 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ jobs:
env:
# TODO(bergundy): Remove this flag once server 1.25.0 is out.
DISABLE_NEXUS_TESTS: "1"
# TODO(cretz): Remove this flag once server 1.25.0 is out.
DISABLE_USER_METADATA_TESTS: "1"
working-directory: ./internal/cmd/build

cloud-test:
Expand Down
20 changes: 20 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const (
// QueryTypeOpenSessions is the build in query type for Client.QueryWorkflow() call. Use this query type to get all open
// sessions in the workflow. The result will be a list of SessionInfo encoded in the EncodedValue.
QueryTypeOpenSessions string = "__open_sessions"

// QueryTypeWorkflowMetadata is the query name for the workflow metadata.
QueryTypeWorkflowMetadata string = "__temporal_workflow_metadata"
)

type (
Expand Down Expand Up @@ -707,6 +710,23 @@ type (
// Cannot be set the same time as a CronSchedule.
StartDelay time.Duration

// Summary - Single-line summary for this workflow execution that will appear in UI/CLI. This can be in
// single-line Temporal markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Summary string

// Details - General details for this workflow execution that will appear in UI/CLI. This can be in Temporal
// markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be updated.
// For details that can be updated, use SetCurrentDetails within the workflow.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Details string

// request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
requestID string
// workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
Expand Down
3 changes: 3 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ type WorkflowOutboundInterceptor interface {
// NewTimer intercepts workflow.NewTimer.
NewTimer(ctx Context, d time.Duration) Future

// NewTimer intercepts workflow.NewTimerWithOptions.
NewTimerWithOptions(ctx Context, d time.Duration, options TimerOptions) Future

// Sleep intercepts workflow.Sleep.
Sleep(ctx Context, d time.Duration) (err error)

Expand Down
9 changes: 9 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ func (w *WorkflowOutboundInterceptorBase) NewTimer(ctx Context, d time.Duration)
return w.Next.NewTimer(ctx, d)
}

// NewTimerWithOptions implements WorkflowOutboundInterceptor.NewTimerWithOptions.
func (w *WorkflowOutboundInterceptorBase) NewTimerWithOptions(
ctx Context,
d time.Duration,
options TimerOptions,
) Future {
return w.Next.NewTimerWithOptions(ctx, d, options)
}

// Sleep implements WorkflowOutboundInterceptor.Sleep.
func (w *WorkflowOutboundInterceptorBase) Sleep(ctx Context, d time.Duration) (err error) {
return w.Next.Sleep(ctx, d)
Expand Down
26 changes: 23 additions & 3 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/sdk/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/util"
Expand Down Expand Up @@ -90,6 +91,7 @@ type (
timerCommandStateMachine struct {
*commandStateMachineBase
attributes *commandpb.StartTimerCommandAttributes
summary *commonpb.Payload
}

cancelTimerCommandStateMachine struct {
Expand Down Expand Up @@ -385,11 +387,15 @@ func (h *commandsHelper) newRequestCancelNexusOperationStateMachine(attributes *
}
}

func (h *commandsHelper) newTimerCommandStateMachine(attributes *commandpb.StartTimerCommandAttributes) *timerCommandStateMachine {
func (h *commandsHelper) newTimerCommandStateMachine(
attributes *commandpb.StartTimerCommandAttributes,
summary *commonpb.Payload,
) *timerCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeTimer, attributes.GetTimerId())
return &timerCommandStateMachine{
commandStateMachineBase: base,
attributes: attributes,
summary: summary,
}
}

Expand Down Expand Up @@ -692,6 +698,9 @@ func (d *timerCommandStateMachine) getCommand() *commandpb.Command {
case commandStateCreated, commandStateCanceledBeforeSent:
command := createNewCommand(enumspb.COMMAND_TYPE_START_TIMER)
command.Attributes = &commandpb.Command_StartTimerCommandAttributes{StartTimerCommandAttributes: d.attributes}
if d.summary != nil {
command.UserMetadata = &sdk.UserMetadata{Summary: d.summary}
}
return command
default:
return nil
Expand Down Expand Up @@ -1556,8 +1565,19 @@ func (h *commandsHelper) getSignalID(initiatedEventID int64) string {
return signalID
}

func (h *commandsHelper) startTimer(attributes *commandpb.StartTimerCommandAttributes) commandStateMachine {
command := h.newTimerCommandStateMachine(attributes)
func (h *commandsHelper) startTimer(
attributes *commandpb.StartTimerCommandAttributes,
options TimerOptions,
dc converter.DataConverter,
) commandStateMachine {
var summary *commonpb.Payload
if options.Summary != "" {
var err error
if summary, err = dc.ToPayload(options.Summary); err != nil {
panic(err)
}
}
command := h.newTimerCommandStateMachine(attributes, summary)
h.addCommand(command)
return command
}
Expand Down
10 changes: 5 additions & 5 deletions internal/internal_command_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_TimerStateMachine_CancelBeforeSent(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
h.cancelTimer(TimerID{timerID})
require.Equal(t, commandStateCanceledBeforeSent, d.getState())
Expand All @@ -60,7 +60,7 @@ func Test_TimerStateMachine_CancelAfterInitiated(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand All @@ -86,7 +86,7 @@ func Test_TimerStateMachine_CompletedAfterCancel(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand Down Expand Up @@ -114,7 +114,7 @@ func Test_TimerStateMachine_CompleteWithoutCancel(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand All @@ -135,7 +135,7 @@ func Test_TimerCancelEventOrdering(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand Down
9 changes: 7 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func (wc *workflowEnvironmentImpl) Now() time.Time {
return wc.currentReplayTime
}

func (wc *workflowEnvironmentImpl) NewTimer(d time.Duration, callback ResultHandler) *TimerID {
func (wc *workflowEnvironmentImpl) NewTimer(d time.Duration, options TimerOptions, callback ResultHandler) *TimerID {
if d < 0 {
callback(nil, fmt.Errorf("negative duration provided %v", d))
return nil
Expand All @@ -839,7 +839,7 @@ func (wc *workflowEnvironmentImpl) NewTimer(d time.Duration, callback ResultHand
startTimerAttr.TimerId = timerID
startTimerAttr.StartToFireTimeout = durationpb.New(d)

command := wc.commandsHelper.startTimer(startTimerAttr)
command := wc.commandsHelper.startTimer(startTimerAttr, options, wc.GetDataConverter())
command.setData(&scheduledTimer{callback: callback})

wc.logger.Debug("NewTimer",
Expand Down Expand Up @@ -1392,6 +1392,11 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(
return weh.encodeArg(weh.StackTrace())
case QueryTypeOpenSessions:
return weh.encodeArg(weh.getOpenSessions())
case QueryTypeWorkflowMetadata:
// We are intentionally not handling this here but rather in the
// normal handler so it has access to the options/context as
// needed.
fallthrough
default:
result, err := weh.queryHandler(queryType, queryArgs, header)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ func convertToPBScheduleAction(
return nil, err
}

userMetadata, err := buildUserMetadata(action.Summary, action.Details, dataConverter)
if err != nil {
return nil, err
}

return &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
Expand All @@ -651,6 +656,7 @@ func convertToPBScheduleAction(
Memo: memo,
SearchAttributes: searchAttrs,
Header: header,
UserMetadata: userMetadata,
},
},
}, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type (

// NewTimer - Creates a new timer that will fire callback after d(resolution is in seconds).
// The callback indicates the error(TimerCanceledError) if the timer is canceled.
NewTimer(d time.Duration, callback ResultHandler) *TimerID
NewTimer(d time.Duration, options TimerOptions, callback ResultHandler) *TimerID

// RequestCancelTimer - Requests cancel of a timer, this one doesn't wait for cancellation request
// to complete, instead invokes the ResultHandler with TimerCanceledError
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type (
validateFn interface{}
name string
unfinishedPolicy HandlerUnfinishedPolicy
description string
}
)

Expand Down Expand Up @@ -372,6 +373,7 @@ func newUpdateHandler(
validateFn: validateFn,
name: updateName,
unfinishedPolicy: opts.UnfinishedPolicy,
description: opts.Description,
}, nil
}

Expand Down
90 changes: 89 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"fmt"
"reflect"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
Expand All @@ -42,6 +43,7 @@ import (

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/sdk/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
Expand Down Expand Up @@ -226,6 +228,8 @@ type (
// runningUpdatesHandles is a map of update handlers that are currently running.
runningUpdatesHandles map[string]UpdateInfo
VersioningIntent VersioningIntent
// currentDetails is the user-set string returned on metadata query.
currentDetails string
}

// ExecuteWorkflowParams parameters of the workflow invocation
Expand Down Expand Up @@ -599,12 +603,23 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common
return nil, err
}

// As a special case, we handle __temporal_workflow_metadata query
// here instead of in workflowExecutionEventHandlerImpl.ProcessQuery
// because we need the context environment to do so.
if queryType == QueryTypeWorkflowMetadata {
if result, err := getWorkflowMetadata(rootCtx); err != nil {
return nil, err
} else {
return encodeArg(getDataConverterFromWorkflowContext(rootCtx), result)
}
}

eo := getWorkflowEnvOptions(rootCtx)
// A handler must be present since it is needed for argument decoding,
// even if the interceptor intercepts query handling
handler, ok := eo.queryHandlers[queryType]
if !ok {
keys := []string{QueryTypeStackTrace, QueryTypeOpenSessions}
keys := []string{QueryTypeStackTrace, QueryTypeOpenSessions, QueryTypeWorkflowMetadata}
for k := range eo.queryHandlers {
keys = append(keys, k)
}
Expand Down Expand Up @@ -1567,6 +1582,79 @@ func GetUnhandledSignalNames(ctx Context) []string {
return getWorkflowEnvOptions(ctx).getUnhandledSignalNames()
}

// GetCurrentDetails gets the previously-set current details.
//
// NOTE: Experimental
func GetCurrentDetails(ctx Context) string {
return getWorkflowEnvOptions(ctx).currentDetails
}

// SetCurrentDetails sets the current details.
//
// NOTE: Experimental
func SetCurrentDetails(ctx Context, details string) {
getWorkflowEnvOptions(ctx).currentDetails = details
}

func getWorkflowMetadata(ctx Context) (*sdk.WorkflowMetadata, error) {
info := GetWorkflowInfo(ctx)
eo := getWorkflowEnvOptions(ctx)
ret := &sdk.WorkflowMetadata{
Definition: &sdk.WorkflowDefinition{
Type: info.WorkflowType.Name,
QueryDefinitions: []*sdk.WorkflowInteractionDefinition{
{
Name: QueryTypeStackTrace,
Description: "Current stack trace",
},
{
Name: QueryTypeOpenSessions,
Description: "Open sessions on the workflow",
},
{
Name: QueryTypeWorkflowMetadata,
Description: "Metadata about the workflow",
},
},
},
CurrentDetails: eo.currentDetails,
}
// Queries
for k := range eo.queryHandlers {
ret.Definition.QueryDefinitions = append(ret.Definition.QueryDefinitions, &sdk.WorkflowInteractionDefinition{
Name: k,
// TODO(cretz): Allow query descriptions?
// Description: ,
})
}
// Signals
// TODO(cretz): This is all signal channels asked for in workflow _and_ all ever sent, is that what we want? Or do
// we only want all ever asked for?
for k := range eo.signalChannels {
ret.Definition.SignalDefinitions = append(ret.Definition.SignalDefinitions, &sdk.WorkflowInteractionDefinition{
Name: k,
// TODO(cretz): Allow signal descriptions?
// Description: ,
})
}
// Updates
for k, v := range eo.updateHandlers {
ret.Definition.UpdateDefinitions = append(ret.Definition.UpdateDefinitions, &sdk.WorkflowInteractionDefinition{
Name: k,
Description: v.description,
})
}
// Sort interaction definitions
sortWorkflowInteractionDefinitions(ret.Definition.QueryDefinitions)
sortWorkflowInteractionDefinitions(ret.Definition.SignalDefinitions)
sortWorkflowInteractionDefinitions(ret.Definition.UpdateDefinitions)
return ret, nil
}

func sortWorkflowInteractionDefinitions(defns []*sdk.WorkflowInteractionDefinition) {
sort.Slice(defns, func(i, j int) bool { return defns[i].Name < defns[j].Name })
}

// getUnhandledSignalNames returns signal names that have unconsumed signals.
func (w *WorkflowOptions) getUnhandledSignalNames() []string {
var unhandledSignals []string
Expand Down
Loading

0 comments on commit 0c43849

Please sign in to comment.