Skip to content

Commit

Permalink
potential solution to multiple async updates
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew committed Nov 4, 2024
1 parent fd52f49 commit 16b9ce3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
12 changes: 12 additions & 0 deletions internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,18 @@ type updateCallback struct {
reject func(error)
complete func(interface{}, error)
env *TestWorkflowEnvironment
updateID string
}

// env and updateID are needed to cache update results for deduping purposes
func newUpdateCallback(env *TestWorkflowEnvironment, updateID string, accept func(), reject func(error), complete func(interface{}, error)) *updateCallback {
return &updateCallback{
accept: accept,
reject: reject,
complete: complete,
env: env,
updateID: updateID,
}
}

func (uc *updateCallback) Accept() {
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2750,6 +2750,8 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u
uc.Complete(env.updateMap[id].success, env.updateMap[id].err)
}, false)
} else {
// TODO: This doesn't account for multiple async updates
// would a UC -> ID map work? Would I have to use pointers?
env.currentUpdateId = id
env.postCallback(func() {
// Do not send any headers on test invocations
Expand All @@ -2768,6 +2770,7 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id
if err != nil {
panic(err)
}
// TODO: handle dedup
workflowHandle.env.postCallback(func() {
workflowHandle.env.updateHandler(name, id, data, nil, uc)
}, true)
Expand Down
21 changes: 13 additions & 8 deletions internal/workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,17 @@ func TestWorkflowIDUpdateWorkflowByID(t *testing.T) {
var suite WorkflowTestSuite
// Test UpdateWorkflowByID works with custom ID
env := suite.NewTestWorkflowEnvironment()
updateID := "id"
env.RegisterDelayedCallback(func() {
err := env.UpdateWorkflowByID("my-workflow-id", "update", "id", &updateCallback{
reject: func(err error) {
err := env.UpdateWorkflowByID("my-workflow-id", "update", updateID, newUpdateCallback(
env,
updateID,
func() {},
func(err error) {
require.Fail(t, "update should not be rejected")
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
}, "input")
func(interface{}, error) {},
), "input")
require.NoError(t, err)
}, time.Second)

Expand Down Expand Up @@ -500,7 +502,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {

func TestWorkflowDuplicateIDDedup(t *testing.T) {
var suite WorkflowTestSuite
// Test dev server rejects UpdateWorkflow with same ID
// Test dev server dedups UpdateWorkflow with same ID
env := suite.NewTestWorkflowEnvironment()
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow("update", "id", &updateCallback{
Expand All @@ -517,7 +519,8 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) {
require.Equal(t, 0, intResult)
}
},
env: env,
env: env,
updateID: "id",
}, 0)
}, 0)

Expand Down Expand Up @@ -553,6 +556,7 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) {
return Sleep(ctx, time.Hour)
})
require.NoError(t, env.GetWorkflowError())
require.True(t, false)
}

func TestAllHandlersFinished(t *testing.T) {
Expand Down Expand Up @@ -802,6 +806,7 @@ func TestWorkflowUpdateLogger(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand Down

0 comments on commit 16b9ce3

Please sign in to comment.