Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew committed Oct 28, 2024
1 parent ca909b6 commit 5d2778e
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 13 deletions.
4 changes: 4 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,10 @@ func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
return wc.sdkFlags.tryUse(flag, !wc.isReplay)
}

func (wc *workflowEnvironmentImpl) GetFlag(flag sdkFlag) bool {
return wc.sdkFlags.getFlag(flag)
}

func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) {
wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/internal_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool {
}
}

// getFlag returns true if the flag is currently set.
func (sf *sdkFlags) getFlag(flag sdkFlag) bool {
return sf.currentFlags[flag] || sf.newFlags[flag]
}

// set marks a flag as in current use regardless of replay status.
func (sf *sdkFlags) set(flags ...sdkFlag) {
if !sf.capabilities.GetSdkMetadata() {
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ type (
DrainUnhandledUpdates() bool
// TryUse returns true if this flag may currently be used.
TryUse(flag sdkFlag) bool
// GetFlag returns if the flag is currently used.
GetFlag(flag sdkFlag) bool
}

// WorkflowDefinitionFactory factory for creating WorkflowDefinition instances.
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ func (s *selectorImpl) Select(ctx Context) {
}
// readyBranch is not executed when AddDefault is specified,
// setting the value here prevents the signal from being dropped
dropSignalFlag := getWorkflowEnvironment(ctx).TryUse(SDKFlagBlockedSelectorSignalReceive)
dropSignalFlag := getWorkflowEnvironment(ctx).GetFlag(SDKFlagBlockedSelectorSignalReceive)
if dropSignalFlag {
c.recValue = &v
}
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,10 @@ func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
return true
}

func (env *testWorkflowEnvironmentImpl) GetFlag(flag sdkFlag) bool {
return true
}

func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) {
env.bufferedUpdateRequests[name] = append(env.bufferedUpdateRequests[name], f)
}
Expand Down
10 changes: 2 additions & 8 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4249,27 +4249,21 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() {
var v string
selector.AddReceive(ch1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
fmt.Println("received signal from ch1")
})
selector.AddDefault(func() {
ch2.Receive(ctx, &v)
fmt.Println("received signal from ch2")
})
selector.Select(ctx)
fmt.Println("ch1.Len()", ch1.Len(), "s", v)
// testWorkflowEnvironmentImpl.TryUse always returns true for flags
// test for fixed behavior
s.Require().True(ch1.Len() == 1 && v == "s2")
s.Require().True(selector.HasPending())

return nil
}

// send a signal 5 seconds after workflow started
// send a signal after workflow has started
env := s.NewTestWorkflowEnvironment()
env.RegisterDelayedCallback(func() {
fmt.Println("sending signal to 1")
env.SignalWorkflow("test-signal", "s1")
fmt.Println("sending signal to 2")
env.SignalWorkflow("test-signal-2", "s2")
}, 5*time.Second)
env.ExecuteWorkflow(workflowFn)
Expand Down
8 changes: 4 additions & 4 deletions test/replaytests/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,14 +639,14 @@ func SelectorBlockingDefaultWorkflow(ctx workflow.Context) error {
ch2.Receive(ctx, &s)
})
selector.Select(ctx)
if ch1.Len() == 0 && s == "two" {
logger.Info("Signal in ch1 lost")
return nil
} else {
if selector.HasPending() {
var result string
activity := workflow.ExecuteActivity(ctx, SelectorBlockingDefaultActivity, "Signal not lost")
activity.Get(ctx, &result)
logger.Info("Result", result)
} else {
logger.Info("Signal in ch1 lost")
return nil
}
return nil
}
Expand Down

0 comments on commit 5d2778e

Please sign in to comment.