diff --git a/internal/interceptor.go b/internal/interceptor.go index c7848ad2b..41d3fb0c2 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -213,6 +213,11 @@ type WorkflowOutboundInterceptor interface { // AwaitWithTimeout intercepts workflow.AwaitWithTimeout. AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error) + // AwaitWithOptions intercepts workflow.AwaitWithOptions. + // + // NOTE: Experimental + AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (bool, error) + // ExecuteActivity intercepts workflow.ExecuteActivity. // interceptor.WorkflowHeader will return a non-nil map for this context. ExecuteActivity(ctx Context, activityType string, args ...interface{}) Future diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index f99295707..7ce4758ed 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -205,6 +205,13 @@ func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout return w.Next.AwaitWithTimeout(ctx, timeout, condition) } +// AwaitWithOptions implements WorkflowOutboundInterceptor.AwaitWithOptions. +// +// NOTE: Experimental +func (w *WorkflowOutboundInterceptorBase) AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (bool, error) { + return w.Next.AwaitWithOptions(ctx, options, condition) +} + // ExecuteLocalActivity implements WorkflowOutboundInterceptor.ExecuteLocalActivity. func (w *WorkflowOutboundInterceptorBase) ExecuteLocalActivity( ctx Context, diff --git a/internal/workflow.go b/internal/workflow.go index 55c27c543..0f9b3e542 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -458,6 +458,20 @@ type ( // NOTE: Experimental Summary string } + + // AwaitOptions are options set when creating an await. + // + // NOTE: Experimental + AwaitOptions struct { + // Timeout is the await timeout if the await condition is not met. + // + // NOTE: Experimental + Timeout time.Duration + // TimerOptions are options set for the underlying timer created. + // + // NOTE: Experimental + TimerOptions TimerOptions + } ) // Await blocks the calling thread until condition() returns true @@ -485,34 +499,51 @@ func (wc *workflowEnvironmentInterceptor) Await(ctx Context, condition func() bo return nil } -// AwaitWithTimeout blocks the calling thread until condition() returns true -// Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled. -func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { - assertNotInReadOnlyState(ctx) - state := getState(ctx) - return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, condition) -} - -func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { +func (wc *workflowEnvironmentInterceptor) awaitWithOptions(ctx Context, options AwaitOptions, condition func() bool, functionName string) (ok bool, err error) { state := getState(ctx) defer state.unblocked() - timer := NewTimer(ctx, timeout) + timer := NewTimerWithOptions(ctx, options.Timeout, options.TimerOptions) for !condition() { doneCh := ctx.Done() // TODO: Consider always returning a channel if doneCh != nil { if _, more := doneCh.ReceiveAsyncWithMoreFlag(nil); !more { - return false, NewCanceledError("AwaitWithTimeout context canceled") + return false, NewCanceledError("%s context canceled", functionName) } } if timer.IsReady() { return false, nil } - state.yield("AwaitWithTimeout") + state.yield(functionName) } return true, nil } +// AwaitWithTimeout blocks the calling thread until condition() returns true +// Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled. +func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { + assertNotInReadOnlyState(ctx) + state := getState(ctx) + return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, condition) +} + +func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { + options := AwaitOptions{Timeout: timeout, TimerOptions: TimerOptions{Summary: "AwaitWithTimeout"}} + return wc.awaitWithOptions(ctx, options, condition, "AwaitWithTimeout") +} + +// AwaitWithOptions blocks the calling thread until condition() returns true +// Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled. +func AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) { + assertNotInReadOnlyState(ctx) + state := getState(ctx) + return state.dispatcher.interceptor.AwaitWithOptions(ctx, options, condition) +} + +func (wc *workflowEnvironmentInterceptor) AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) { + return wc.awaitWithOptions(ctx, options, condition, "AwaitWithOptions") +} + // NewChannel create new Channel instance func NewChannel(ctx Context) Channel { state := getState(ctx) @@ -1331,7 +1362,7 @@ func Sleep(ctx Context, d time.Duration) (err error) { } func (wc *workflowEnvironmentInterceptor) Sleep(ctx Context, d time.Duration) (err error) { - t := NewTimer(ctx, d) + t := NewTimerWithOptions(ctx, d, TimerOptions{Summary: "Sleep"}) err = t.Get(ctx, nil) return } diff --git a/test/integration_test.go b/test/integration_test.go index a163b04f8..fb64f2093 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6266,6 +6266,37 @@ func (ts *IntegrationTestSuite) TestUserMetadata() { ts.Equal("my-timer", str) } +func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + var str string + + // Start workflow + opts := ts.startWorkflowOptions("test-await-options" + uuid.New()) + run, err := ts.client.ExecuteWorkflow(ctx, opts, + ts.workflows.AwaitWithOptions) + ts.NoError(err) + + // Confirm workflow has completed + ts.NoError(run.Get(ctx, nil)) + + // Confirm AwaitWithOptions's underlying timer has fired properly + iter := ts.client.GetWorkflowHistory(ctx, opts.ID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + var timerEvent *historypb.HistoryEvent + for iter.HasNext() { + event, err1 := iter.Next() + ts.NoError(err1) + if event.GetTimerStartedEventAttributes() != nil { + ts.Nil(timerEvent) + timerEvent = event + } + } + ts.NotNil(timerEvent) + ts.NoError(converter.GetDefaultDataConverter().FromPayload( + timerEvent.UserMetadata.Summary, &str)) + ts.Equal("await-timer", str) +} + // executeWorkflow executes a given workflow and waits for the result func (ts *IntegrationTestSuite) executeWorkflow( wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}, diff --git a/test/workflow_test.go b/test/workflow_test.go index 04153a834..aa042b05f 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -3119,6 +3119,18 @@ func (w *Workflows) UserMetadata(ctx workflow.Context) error { ).Get(ctx, nil) } +func (w *Workflows) AwaitWithOptions(ctx workflow.Context) (bool, error) { + options := workflow.AwaitOptions{ + Timeout: 1 * time.Millisecond, + TimerOptions: workflow.TimerOptions{Summary: "await-timer"}, + } + + return workflow.AwaitWithOptions(ctx, options, func() bool { + return true + }) + +} + func (w *Workflows) RunsLocalAndNonlocalActsWithRetries(ctx workflow.Context, numOfEachActKind int, actFailTimes int) error { var activities *Activities futures := make([]workflow.Future, 0) @@ -3268,6 +3280,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.UpdateWithMutex) worker.RegisterWorkflow(w.UpdateWithSemaphore) worker.RegisterWorkflow(w.UserMetadata) + worker.RegisterWorkflow(w.AwaitWithOptions) worker.RegisterWorkflow(w.WorkflowWithRejectableUpdate) worker.RegisterWorkflow(w.child) diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index dbe8dde3c..8e3f75026 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -72,6 +72,11 @@ type ( // // NOTE: Experimental TimerOptions = internal.TimerOptions + + // AwaitOptions are options for [AwaitWithOptions] + // + // NOTE: Experimental + AwaitOptions = internal.AwaitOptions ) // Await blocks the calling thread until condition() returns true. @@ -111,6 +116,21 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) return internal.AwaitWithTimeout(ctx, timeout, condition) } +// AwaitWithOptions blocks the calling thread until condition() returns true +// or blocking time exceeds the passed timeout value. +// Returns ok=false if timed out, and err CanceledError if the ctx is canceled. +// The following code will block until the captured count +// variable is set to 5, or one hour passes. +// +// workflow.AwaitWithOptions(ctx, AwaitOptions{Timeout: time.Hour, TimerOptions: TimerOptions{Summary:"Example"}}, func() bool { +// return count == 5 +// }) +// +// NOTE: Experimental +func AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) { + return internal.AwaitWithOptions(ctx, options, condition) +} + // NewChannel creates a new Channel instance func NewChannel(ctx Context) Channel { return internal.NewChannel(ctx)