From 5d2778ee9f6c7e9ff652dc59c3f652c79732ef56 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 28 Oct 2024 16:36:16 -0700 Subject: [PATCH] PR feedback --- internal/internal_event_handlers.go | 4 ++++ internal/internal_flags.go | 5 +++++ internal/internal_worker_base.go | 2 ++ internal/internal_workflow.go | 2 +- internal/internal_workflow_testsuite.go | 4 ++++ internal/internal_workflow_testsuite_test.go | 10 ++-------- test/replaytests/workflows.go | 8 ++++---- 7 files changed, 22 insertions(+), 13 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 82a9785be..9f6860fc3 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -978,6 +978,10 @@ func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool { return wc.sdkFlags.tryUse(flag, !wc.isReplay) } +func (wc *workflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { + return wc.sdkFlags.getFlag(flag) +} + func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) { wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f) } diff --git a/internal/internal_flags.go b/internal/internal_flags.go index de2e3f5b2..89693d839 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -110,6 +110,11 @@ func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool { } } +// getFlag returns true if the flag is currently set. +func (sf *sdkFlags) getFlag(flag sdkFlag) bool { + return sf.currentFlags[flag] || sf.newFlags[flag] +} + // set marks a flag as in current use regardless of replay status. func (sf *sdkFlags) set(flags ...sdkFlag) { if !sf.capabilities.GetSdkMetadata() { diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index dc3865e6c..f7160da8e 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -145,6 +145,8 @@ type ( DrainUnhandledUpdates() bool // TryUse returns true if this flag may currently be used. TryUse(flag sdkFlag) bool + // GetFlag returns if the flag is currently used. + GetFlag(flag sdkFlag) bool } // WorkflowDefinitionFactory factory for creating WorkflowDefinition instances. diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 9bfa0279e..245661027 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1409,7 +1409,7 @@ func (s *selectorImpl) Select(ctx Context) { } // readyBranch is not executed when AddDefault is specified, // setting the value here prevents the signal from being dropped - dropSignalFlag := getWorkflowEnvironment(ctx).TryUse(SDKFlagBlockedSelectorSignalReceive) + dropSignalFlag := getWorkflowEnvironment(ctx).GetFlag(SDKFlagBlockedSelectorSignalReceive) if dropSignalFlag { c.recValue = &v } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 3b9742f73..a92fbb371 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -584,6 +584,10 @@ func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool { return true } +func (env *testWorkflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { + return true +} + func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) { env.bufferedUpdateRequests[name] = append(env.bufferedUpdateRequests[name], f) } diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 494e95720..452d51555 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4249,27 +4249,21 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { var v string selector.AddReceive(ch1, func(c ReceiveChannel, more bool) { c.Receive(ctx, &v) - fmt.Println("received signal from ch1") }) selector.AddDefault(func() { ch2.Receive(ctx, &v) - fmt.Println("received signal from ch2") }) selector.Select(ctx) - fmt.Println("ch1.Len()", ch1.Len(), "s", v) - // testWorkflowEnvironmentImpl.TryUse always returns true for flags - // test for fixed behavior s.Require().True(ch1.Len() == 1 && v == "s2") + s.Require().True(selector.HasPending()) return nil } - // send a signal 5 seconds after workflow started + // send a signal after workflow has started env := s.NewTestWorkflowEnvironment() env.RegisterDelayedCallback(func() { - fmt.Println("sending signal to 1") env.SignalWorkflow("test-signal", "s1") - fmt.Println("sending signal to 2") env.SignalWorkflow("test-signal-2", "s2") }, 5*time.Second) env.ExecuteWorkflow(workflowFn) diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index cd6cf5c33..97089844a 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -639,14 +639,14 @@ func SelectorBlockingDefaultWorkflow(ctx workflow.Context) error { ch2.Receive(ctx, &s) }) selector.Select(ctx) - if ch1.Len() == 0 && s == "two" { - logger.Info("Signal in ch1 lost") - return nil - } else { + if selector.HasPending() { var result string activity := workflow.ExecuteActivity(ctx, SelectorBlockingDefaultActivity, "Signal not lost") activity.Get(ctx, &result) logger.Info("Result", result) + } else { + logger.Info("Signal in ch1 lost") + return nil } return nil }