Skip to content

Commit

Permalink
Update internal timers to use NewTimerWithOptions (#1618)
Browse files Browse the repository at this point in the history
* switch internal timers to use options, expose StaticSummary and StaticDetails

* plumb options through AwaitWithTimeout

* create new API, don't break existing API

* missed a spot to remove API change

* add experimental tag, fix test

* take out StaticSummary StaticDetails changes

* missed a few spots

* remove duplicate code

* cleaner code share

* AwaitOptions

* alias AwaitOptions in public package

* add unit test

* wip

* test works with prettifyString logging

* clean up

* no need for unit test now that we have better E2E test

* remove print
  • Loading branch information
yuandrew authored Sep 10, 2024
1 parent fa54195 commit 0fc83e9
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 13 deletions.
5 changes: 5 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ type WorkflowOutboundInterceptor interface {
// AwaitWithTimeout intercepts workflow.AwaitWithTimeout.
AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error)

// AwaitWithOptions intercepts workflow.AwaitWithOptions.
//
// NOTE: Experimental
AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (bool, error)

// ExecuteActivity intercepts workflow.ExecuteActivity.
// interceptor.WorkflowHeader will return a non-nil map for this context.
ExecuteActivity(ctx Context, activityType string, args ...interface{}) Future
Expand Down
7 changes: 7 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout
return w.Next.AwaitWithTimeout(ctx, timeout, condition)
}

// AwaitWithOptions implements WorkflowOutboundInterceptor.AwaitWithOptions.
//
// NOTE: Experimental
func (w *WorkflowOutboundInterceptorBase) AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (bool, error) {
return w.Next.AwaitWithOptions(ctx, options, condition)
}

// ExecuteLocalActivity implements WorkflowOutboundInterceptor.ExecuteLocalActivity.
func (w *WorkflowOutboundInterceptorBase) ExecuteLocalActivity(
ctx Context,
Expand Down
57 changes: 44 additions & 13 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,20 @@ type (
// NOTE: Experimental
Summary string
}

// AwaitOptions are options set when creating an await.
//
// NOTE: Experimental
AwaitOptions struct {
// Timeout is the await timeout if the await condition is not met.
//
// NOTE: Experimental
Timeout time.Duration
// TimerOptions are options set for the underlying timer created.
//
// NOTE: Experimental
TimerOptions TimerOptions
}
)

// Await blocks the calling thread until condition() returns true
Expand Down Expand Up @@ -485,34 +499,51 @@ func (wc *workflowEnvironmentInterceptor) Await(ctx Context, condition func() bo
return nil
}

// AwaitWithTimeout blocks the calling thread until condition() returns true
// Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled.
func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) {
assertNotInReadOnlyState(ctx)
state := getState(ctx)
return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, condition)
}

func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) {
func (wc *workflowEnvironmentInterceptor) awaitWithOptions(ctx Context, options AwaitOptions, condition func() bool, functionName string) (ok bool, err error) {
state := getState(ctx)
defer state.unblocked()
timer := NewTimer(ctx, timeout)
timer := NewTimerWithOptions(ctx, options.Timeout, options.TimerOptions)
for !condition() {
doneCh := ctx.Done()
// TODO: Consider always returning a channel
if doneCh != nil {
if _, more := doneCh.ReceiveAsyncWithMoreFlag(nil); !more {
return false, NewCanceledError("AwaitWithTimeout context canceled")
return false, NewCanceledError("%s context canceled", functionName)
}
}
if timer.IsReady() {
return false, nil
}
state.yield("AwaitWithTimeout")
state.yield(functionName)
}
return true, nil
}

// AwaitWithTimeout blocks the calling thread until condition() returns true
// Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled.
func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) {
assertNotInReadOnlyState(ctx)
state := getState(ctx)
return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, condition)
}

func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) {
options := AwaitOptions{Timeout: timeout, TimerOptions: TimerOptions{Summary: "AwaitWithTimeout"}}
return wc.awaitWithOptions(ctx, options, condition, "AwaitWithTimeout")
}

// AwaitWithOptions blocks the calling thread until condition() returns true
// Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled.
func AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) {
assertNotInReadOnlyState(ctx)
state := getState(ctx)
return state.dispatcher.interceptor.AwaitWithOptions(ctx, options, condition)
}

func (wc *workflowEnvironmentInterceptor) AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) {
return wc.awaitWithOptions(ctx, options, condition, "AwaitWithOptions")
}

// NewChannel create new Channel instance
func NewChannel(ctx Context) Channel {
state := getState(ctx)
Expand Down Expand Up @@ -1331,7 +1362,7 @@ func Sleep(ctx Context, d time.Duration) (err error) {
}

func (wc *workflowEnvironmentInterceptor) Sleep(ctx Context, d time.Duration) (err error) {
t := NewTimer(ctx, d)
t := NewTimerWithOptions(ctx, d, TimerOptions{Summary: "Sleep"})
err = t.Get(ctx, nil)
return
}
Expand Down
31 changes: 31 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6266,6 +6266,37 @@ func (ts *IntegrationTestSuite) TestUserMetadata() {
ts.Equal("my-timer", str)
}

func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var str string

// Start workflow
opts := ts.startWorkflowOptions("test-await-options" + uuid.New())
run, err := ts.client.ExecuteWorkflow(ctx, opts,
ts.workflows.AwaitWithOptions)
ts.NoError(err)

// Confirm workflow has completed
ts.NoError(run.Get(ctx, nil))

// Confirm AwaitWithOptions's underlying timer has fired properly
iter := ts.client.GetWorkflowHistory(ctx, opts.ID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
var timerEvent *historypb.HistoryEvent
for iter.HasNext() {
event, err1 := iter.Next()
ts.NoError(err1)
if event.GetTimerStartedEventAttributes() != nil {
ts.Nil(timerEvent)
timerEvent = event
}
}
ts.NotNil(timerEvent)
ts.NoError(converter.GetDefaultDataConverter().FromPayload(
timerEvent.UserMetadata.Summary, &str))
ts.Equal("await-timer", str)
}

// executeWorkflow executes a given workflow and waits for the result
func (ts *IntegrationTestSuite) executeWorkflow(
wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{},
Expand Down
13 changes: 13 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3119,6 +3119,18 @@ func (w *Workflows) UserMetadata(ctx workflow.Context) error {
).Get(ctx, nil)
}

func (w *Workflows) AwaitWithOptions(ctx workflow.Context) (bool, error) {
options := workflow.AwaitOptions{
Timeout: 1 * time.Millisecond,
TimerOptions: workflow.TimerOptions{Summary: "await-timer"},
}

return workflow.AwaitWithOptions(ctx, options, func() bool {
return true
})

}

func (w *Workflows) RunsLocalAndNonlocalActsWithRetries(ctx workflow.Context, numOfEachActKind int, actFailTimes int) error {
var activities *Activities
futures := make([]workflow.Future, 0)
Expand Down Expand Up @@ -3268,6 +3280,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.UpdateWithMutex)
worker.RegisterWorkflow(w.UpdateWithSemaphore)
worker.RegisterWorkflow(w.UserMetadata)
worker.RegisterWorkflow(w.AwaitWithOptions)
worker.RegisterWorkflow(w.WorkflowWithRejectableUpdate)

worker.RegisterWorkflow(w.child)
Expand Down
20 changes: 20 additions & 0 deletions workflow/deterministic_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type (
//
// NOTE: Experimental
TimerOptions = internal.TimerOptions

// AwaitOptions are options for [AwaitWithOptions]
//
// NOTE: Experimental
AwaitOptions = internal.AwaitOptions
)

// Await blocks the calling thread until condition() returns true.
Expand Down Expand Up @@ -111,6 +116,21 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool)
return internal.AwaitWithTimeout(ctx, timeout, condition)
}

// AwaitWithOptions blocks the calling thread until condition() returns true
// or blocking time exceeds the passed timeout value.
// Returns ok=false if timed out, and err CanceledError if the ctx is canceled.
// The following code will block until the captured count
// variable is set to 5, or one hour passes.
//
// workflow.AwaitWithOptions(ctx, AwaitOptions{Timeout: time.Hour, TimerOptions: TimerOptions{Summary:"Example"}}, func() bool {
// return count == 5
// })
//
// NOTE: Experimental
func AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) {
return internal.AwaitWithOptions(ctx, options, condition)
}

// NewChannel creates a new Channel instance
func NewChannel(ctx Context) Channel {
return internal.NewChannel(ctx)
Expand Down

0 comments on commit 0fc83e9

Please sign in to comment.