Skip to content

Commit

Permalink
Fix timer firing even after signal has been received
Browse files Browse the repository at this point in the history
There should not be any impact from the timer firing,
but for a cleaner event history, we should short-circuit,
and clear any pending futures.

The workflow (parent / caller) should also define a child context
that can be cancelled should the timer fire / timeout (though,
this is not necessary).

We are using `ReceiveAsync` after the `Select` to handle potential
race conditions in case the worker dies, and both branches passes.
  • Loading branch information
MrSaints committed Nov 17, 2020
1 parent 8aaafc4 commit e532120
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 14 deletions.
20 changes: 12 additions & 8 deletions channel/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,26 @@ import (
func ReceiveWithTimeout(ctx workflow.Context, sigCh workflow.ReceiveChannel, valuePtr interface{}, timeout time.Duration) bool {
selector := workflow.NewSelector(ctx)

var hasTimedOut bool
childCtx, cancel := workflow.WithCancel(ctx)
defer cancel()

selector.AddFuture(
workflow.NewTimer(ctx, timeout),
func(f workflow.Future) {
hasTimedOut = true
},
workflow.NewTimer(childCtx, timeout),
func(f workflow.Future) {},
)
selector.AddReceive(
sigCh,
func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, valuePtr)
},
func(c workflow.ReceiveChannel, more bool) {},
)

selector.Select(ctx)

var hasTimedOut bool
if sigCh.ReceiveAsync(valuePtr) {
hasTimedOut = false
} else {
hasTimedOut = true
}

return hasTimedOut
}
30 changes: 26 additions & 4 deletions channel/signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestUnitTestSuite(t *testing.T) {
suite.Run(t, new(UnitTestSuite))
}

func (s *UnitTestSuite) Test_BasicReceiveWithTimeoutWorkflow_Success() {
func (s *UnitTestSuite) Test_BasicReceiveWithTimeoutWorkflow__WithPayload__Success() {
env := s.NewTestWorkflowEnvironment()

env.RegisterDelayedCallback(
Expand All @@ -27,7 +27,7 @@ func (s *UnitTestSuite) Test_BasicReceiveWithTimeoutWorkflow_Success() {
},
time.Minute,
)
env.ExecuteWorkflow(BasicReceiveWithTimeoutWorkflow)
env.ExecuteWorkflow(BasicReceiveWithTimeoutWorkflow__WithPayload)

s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
Expand All @@ -40,10 +40,32 @@ func (s *UnitTestSuite) Test_BasicReceiveWithTimeoutWorkflow_Success() {
env.AssertExpectations(s.T())
}

func (s *UnitTestSuite) Test_BasicReceiveWithTimeoutWorkflow_TimedOut() {
func (s *UnitTestSuite) Test_BasicReceiveWithTimeoutWorkflow__NoPayload__Success() {
env := s.NewTestWorkflowEnvironment()

env.ExecuteWorkflow(BasicReceiveWithTimeoutWorkflow)
env.RegisterDelayedCallback(
func() {
env.SignalWorkflow("signal-receive-with-timeout", nil)
},
time.Minute,
)
env.ExecuteWorkflow(BasicReceiveWithTimeoutWorkflow__NoPayload)

s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())

var result BasicReceiveWithTimeoutWorkflowResult
s.NoError(env.GetWorkflowResult(&result))
s.False(result.HasTimedOut)
s.Equal("", result.Message)

env.AssertExpectations(s.T())
}

func (s *UnitTestSuite) Test_BasicReceiveWithTimeoutWorkflow__WithPayload__TimedOut() {
env := s.NewTestWorkflowEnvironment()

env.ExecuteWorkflow(BasicReceiveWithTimeoutWorkflow__WithPayload)

s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
Expand Down
34 changes: 32 additions & 2 deletions channel/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,51 @@ type BasicReceiveWithTimeoutWorkflowResult struct {
Message string
}

func BasicReceiveWithTimeoutWorkflow(ctx workflow.Context) (BasicReceiveWithTimeoutWorkflowResult, error) {
func BasicReceiveWithTimeoutWorkflow__WithPayload(ctx workflow.Context) (BasicReceiveWithTimeoutWorkflowResult, error) {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, ao)

sigCh := workflow.GetSignalChannel(ctx, "signal-receive-with-timeout")
childCtx, cancel := workflow.WithCancel(ctx)

sigCh := workflow.GetSignalChannel(childCtx, "signal-receive-with-timeout")
var message string
hasTimedOut := ReceiveWithTimeout(ctx, sigCh, &message, time.Minute*30)

if hasTimedOut {
cancel()
}

var result BasicReceiveWithTimeoutWorkflowResult = BasicReceiveWithTimeoutWorkflowResult{
HasTimedOut: hasTimedOut,
Message: message,
}

return result, nil
}

func BasicReceiveWithTimeoutWorkflow__NoPayload(ctx workflow.Context) (BasicReceiveWithTimeoutWorkflowResult, error) {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, ao)

childCtx, cancel := workflow.WithCancel(ctx)

sigCh := workflow.GetSignalChannel(childCtx, "signal-receive-with-timeout")
hasTimedOut := ReceiveWithTimeout(ctx, sigCh, nil, time.Minute*30)

if hasTimedOut {
cancel()
}

var result BasicReceiveWithTimeoutWorkflowResult = BasicReceiveWithTimeoutWorkflowResult{
HasTimedOut: hasTimedOut,
Message: "",
}

return result, nil
}

0 comments on commit e532120

Please sign in to comment.