Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedup duplicate update IDs for test environment #1695

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,7 @@ type updateCallback struct {
accept func()
reject func(error)
complete func(interface{}, error)
env *TestWorkflowEnvironment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm so this class is not for users, this whole file is for testing since it is post-fixed with *_test.go. The logic can't be in the updateCallback as we take an impl. from user. The logic needs to be in (env *testWorkflowEnvironmentImpl) updateWorkflow. We could potentially add a wrapper around the user interface. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is user in this context referring to the user of the test suite?

Copy link
Contributor Author

@yuandrew yuandrew Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole file is for testing since it is post-fixed with *_test.go. The logic can't be in the updateCallback as we take an impl. from user

And would it be accurate to rephrase this as "we need to implement this within the test suite, not the individual test logic. updateCallback is a test specific implementation"?

Copy link
Contributor Author

@yuandrew yuandrew Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially add a wrapper around the user interface.

What interface are you referring to here? updateWorkflow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is user in this context referring to the user of the test suite?

Yeah

And would it be accurate to rephrase this as "we need to implement this within the test suite, not the individual test logic. updateCallback is a test specific implementation"?

yeah, exactly

What interface are you referring to here?

UpdateCallbacks

}

func (uc *updateCallback) Accept() {
Expand All @@ -1417,6 +1418,11 @@ func (uc *updateCallback) Reject(err error) {
}

func (uc *updateCallback) Complete(success interface{}, err error) {
// cache update result so we can dedup duplicate update IDs
if uc.env == nil {
panic("env is needed in updateCallback to cache update results for deduping purposes")
}
uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = updateResult{success, err}
uc.complete(success, err)
}

Expand Down
29 changes: 25 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ type (
taskQueues map[string]struct{}
}

updateResult struct {
success interface{}
err error
}

// testWorkflowEnvironmentShared is the shared data between parent workflow and child workflow test environments
testWorkflowEnvironmentShared struct {
locker sync.Mutex
Expand Down Expand Up @@ -208,6 +213,7 @@ type (
signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error
queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error)
updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks)
updateMap map[string]updateResult
startedHandler func(r WorkflowExecution, e error)

isWorkflowCompleted bool
Expand All @@ -229,6 +235,8 @@ type (

workflowFunctionExecuting bool
bufferedUpdateRequests map[string][]func()

currentUpdateId string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are multiple concurrent updates sent to the workflow? It looks like those would be lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, good point

}

testSessionEnvironmentImpl struct {
Expand Down Expand Up @@ -2179,6 +2187,7 @@ func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler(
handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks),
) {
env.updateHandler = handler
env.updateMap = make(map[string]updateResult)
}

func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler(
Expand Down Expand Up @@ -2721,10 +2730,22 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u
if err != nil {
panic(err)
}
env.postCallback(func() {
// Do not send any headers on test invocations
env.updateHandler(name, id, data, nil, uc)
}, true)

// check for duplicate update ID
if _, ok := env.updateMap[id]; ok {
// return cached result
env.postCallback(func() {
uc.Accept()
uc.Complete(env.updateMap[id].success, env.updateMap[id].err)
}, false)
} else {
env.currentUpdateId = id
env.postCallback(func() {
// Do not send any headers on test invocations
env.updateHandler(name, id, data, nil, uc)
}, true)
}

}

func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id string, uc UpdateCallbacks, args ...interface{}) error {
Expand Down
69 changes: 69 additions & 0 deletions internal/workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func TestWorkflowIDUpdateWorkflowByID(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
}, "input")
require.NoError(t, err)
}, time.Second)
Expand Down Expand Up @@ -322,6 +323,7 @@ func TestChildWorkflowUpdate(t *testing.T) {
require.Fail(t, "update failed", err)
}
},
env: env,
}, nil)
assert.NoError(t, err)
}, time.Second*5)
Expand Down Expand Up @@ -375,6 +377,7 @@ func TestWorkflowUpdateOrder(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand Down Expand Up @@ -415,6 +418,7 @@ func TestWorkflowNotRegisteredRejected(t *testing.T) {
require.Fail(t, "update should not be accepted")
},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -439,6 +443,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -452,6 +457,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {
require.Fail(t, "update should not be rejected")
},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -462,6 +468,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand Down Expand Up @@ -491,6 +498,63 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {
require.Equal(t, "unknown update bad update. KnownUpdates=[update]", updateRejectionErr.Error())
}

func TestWorkflowDuplicateIDDedup(t *testing.T) {
var suite WorkflowTestSuite
// Test dev server rejects UpdateWorkflow with same ID
env := suite.NewTestWorkflowEnvironment()
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow("update", "id", &updateCallback{
reject: func(err error) {
require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err))
},
accept: func() {
},
complete: func(result interface{}, err error) {
intResult, ok := result.(int)
if !ok {
require.Fail(t, "result should be int")
} else {
require.Equal(t, 0, intResult)
}
},
env: env,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current solution requires env to be passed to updateCallback, not ideal, but struggling to come up with an alternate solution

}, 0)
}, 0)

env.RegisterDelayedCallback(func() {
env.UpdateWorkflow("update", "id", &updateCallback{
reject: func(err error) {
require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err))
},
accept: func() {
},
complete: func(result interface{}, err error) {
intResult, ok := result.(int)
if !ok {
require.Fail(t, "result should be int")
} else {
// if dedup, this be okay, even if we pass in 1 as arg, since it's deduping,
// the result should match the first update's result, 0
require.Equal(t, 0, intResult)
}
},
env: env,
}, 1)

}, 1*time.Millisecond)

env.ExecuteWorkflow(func(ctx Context) error {
err := SetUpdateHandler(ctx, "update", func(ctx Context, i int) (int, error) {
return i, nil
}, UpdateHandlerOptions{})
if err != nil {
return err
}
return Sleep(ctx, time.Hour)
})
require.NoError(t, env.GetWorkflowError())
}

func TestAllHandlersFinished(t *testing.T) {
var suite WorkflowTestSuite
env := suite.NewTestWorkflowEnvironment()
Expand All @@ -502,6 +566,7 @@ func TestAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -512,6 +577,7 @@ func TestAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, time.Minute)

Expand Down Expand Up @@ -560,6 +626,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -570,6 +637,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, time.Minute)

Expand All @@ -580,6 +648,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 2*time.Minute)

Expand Down
Loading