Skip to content

Commit

Permalink
handle error properly, better guard rails for passing env
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew committed Nov 4, 2024
1 parent 5639859 commit d03b73a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
5 changes: 3 additions & 2 deletions internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,9 +1419,10 @@ func (uc *updateCallback) Reject(err error) {

func (uc *updateCallback) Complete(success interface{}, err error) {
// cache update result so we can dedup duplicate update IDs
if uc.env.impl.updateMap != nil {
uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = success
if uc.env == nil {
panic("env is needed in updateCallback to cache update results for deduping purposes")
}
uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = updateResult{success, err}
uc.complete(success, err)
}

Expand Down
13 changes: 8 additions & 5 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ type (
taskQueues map[string]struct{}
}

updateResult struct {
success interface{}
err error
}

// testWorkflowEnvironmentShared is the shared data between parent workflow and child workflow test environments
testWorkflowEnvironmentShared struct {
locker sync.Mutex
Expand Down Expand Up @@ -208,7 +213,7 @@ type (
signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error
queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error)
updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks)
updateMap map[string]interface{}
updateMap map[string]updateResult
startedHandler func(r WorkflowExecution, e error)

isWorkflowCompleted bool
Expand Down Expand Up @@ -2182,6 +2187,7 @@ func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler(
handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks),
) {
env.updateHandler = handler
env.updateMap = make(map[string]updateResult)
}

func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler(
Expand Down Expand Up @@ -2724,16 +2730,13 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u
if err != nil {
panic(err)
}
if env.updateMap == nil {
env.updateMap = make(map[string]interface{})
}

// check for duplicate update ID
if _, ok := env.updateMap[id]; ok {
// return cached result
env.postCallback(func() {
uc.Accept()
uc.Complete(env.updateMap[id], nil)
uc.Complete(env.updateMap[id].success, env.updateMap[id].err)
}, false)
} else {
env.currentUpdateId = id
Expand Down
12 changes: 12 additions & 0 deletions internal/workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func TestWorkflowIDUpdateWorkflowByID(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
}, "input")
require.NoError(t, err)
}, time.Second)
Expand Down Expand Up @@ -322,6 +323,7 @@ func TestChildWorkflowUpdate(t *testing.T) {
require.Fail(t, "update failed", err)
}
},
env: env,
}, nil)
assert.NoError(t, err)
}, time.Second*5)
Expand Down Expand Up @@ -375,6 +377,7 @@ func TestWorkflowUpdateOrder(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand Down Expand Up @@ -415,6 +418,7 @@ func TestWorkflowNotRegisteredRejected(t *testing.T) {
require.Fail(t, "update should not be accepted")
},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -439,6 +443,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -452,6 +457,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {
require.Fail(t, "update should not be rejected")
},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -462,6 +468,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand Down Expand Up @@ -559,6 +566,7 @@ func TestAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -569,6 +577,7 @@ func TestAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, time.Minute)

Expand Down Expand Up @@ -617,6 +626,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 0)

Expand All @@ -627,6 +637,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, time.Minute)

Expand All @@ -637,6 +648,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
},
accept: func() {},
complete: func(interface{}, error) {},
env: env,
})
}, 2*time.Minute)

Expand Down

0 comments on commit d03b73a

Please sign in to comment.