From 6eb27ecb15b2a56d407196c6bb417f5440290d2d Mon Sep 17 00:00:00 2001 From: Steven Littiebrant Date: Fri, 8 Oct 2021 22:21:21 -0700 Subject: [PATCH] "Lets take a stab in the dark and see if it works to resolve shutdown races" and it seems to work. Well that was surprisingly simple. I'm not confident about it though. I don't actually know when these things run. .... but maybe it's preferable to the locks for odin? --- internal/context.go | 6 ------ internal/context_test.go | 31 ++++++++++++++++++++++++++++ internal/internal_workflow.go | 39 ++++++++++++++++++++--------------- 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/internal/context.go b/internal/context.go index 93370cdc2..684d0f444 100644 --- a/internal/context.go +++ b/internal/context.go @@ -22,7 +22,6 @@ package internal import ( "fmt" - "sync" "time" "github.com/opentracing/opentracing-go" @@ -288,7 +287,6 @@ type cancelCtx struct { done Channel // closed by the first cancel call. - mu sync.Mutex canceled bool children map[canceler]bool // set to nil by the first cancel call @@ -310,16 +308,12 @@ func (c *cancelCtx) String() string { // cancel closes c.done, cancels each of c's children, and, if // removeFromParent is true, removes c from its parent's children. func (c *cancelCtx) cancel(removeFromParent bool, err error) { - c.mu.Lock() if c.canceled { - c.mu.Unlock() // calling cancel from multiple go routines isn't safe // avoid a data race by only allowing the first call return } c.canceled = true - c.mu.Unlock() - if err == nil { panic("context: internal error: missing cancel error") } diff --git a/internal/context_test.go b/internal/context_test.go index 6f7940dee..b195dbf34 100644 --- a/internal/context_test.go +++ b/internal/context_test.go @@ -57,3 +57,34 @@ func TestContext_RaceRegression(t *testing.T) { env.ExecuteWorkflow(wf) assert.NoError(t, env.GetWorkflowError()) } + +func TestContext_RaceRegression_2(t *testing.T) { + /* + It's apparently also possible to race on adding children while propagating the cancel to children. + */ + s := WorkflowTestSuite{} + s.SetLogger(zaptest.NewLogger(t)) + env := s.NewTestWorkflowEnvironment() + wf := func(ctx Context) error { + ctx, cancel := WithCancel(ctx) + racyCancel := func(ctx Context) { + defer cancel() // defer is necessary as Sleep will never return due to Goexit + defer func() { + _, ccancel := WithCancel(ctx) + cancel() + ccancel() + }() + _ = Sleep(ctx, time.Hour) + } + // start a handful to increase odds of a race being detected + for i := 0; i < 10; i++ { + Go(ctx, racyCancel) + } + + _ = Sleep(ctx, time.Minute) // die early + return nil + } + env.RegisterWorkflow(wf) + env.ExecuteWorkflow(wf) + assert.NoError(t, env.GetWorkflowError()) +} \ No newline at end of file diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 213ddf834..cd4e7978a 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -144,14 +144,15 @@ type ( unblockFunc func(status string, stackDepth int) (keepBlocked bool) coroutineState struct { - name string - dispatcher *dispatcherImpl // dispatcher this context belongs to - aboutToBlock chan bool // used to notify dispatcher that coroutine that owns this context is about to block - unblock chan unblockFunc // used to notify coroutine that it should continue executing. - keptBlocked bool // true indicates that coroutine didn't make any progress since the last yield unblocking - closed bool // indicates that owning coroutine has finished execution - blocked atomic.Bool - panicError *workflowPanicError // non nil if coroutine had unhandled panic + name string + dispatcher *dispatcherImpl // dispatcher this context belongs to + aboutToBlock chan bool // used to notify dispatcher that coroutine that owns this context is about to block + unblock chan unblockFunc // used to notify coroutine that it should continue executing. + keptBlocked bool // true indicates that coroutine didn't make any progress since the last yield unblocking + closed bool // indicates that owning coroutine has finished execution + completedShutdown chan struct{} // closed after .closed is set to true, use to wait for shutdown + blocked atomic.Bool + panicError *workflowPanicError // non nil if coroutine had unhandled panic } dispatcherImpl struct { @@ -596,7 +597,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) { hasResult = false v, ok, m := c.receiveAsyncImpl(callback) - if !ok && !m { //channel closed and empty + if !ok && !m { // channel closed and empty return m } @@ -606,7 +607,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) { state.unblocked() return m } - continue //corrupt signal. Drop and reset process + continue // corrupt signal. Drop and reset process } for { if hasResult { @@ -615,7 +616,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) { state.unblocked() return more } - break //Corrupt signal. Drop and reset process. + break // Corrupt signal. Drop and reset process. } state.yield(fmt.Sprintf("blocked on %s.Receive", c.name)) } @@ -631,7 +632,7 @@ func (c *channelImpl) ReceiveAsync(valuePtr interface{}) (ok bool) { func (c *channelImpl) ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) { for { v, ok, more := c.receiveAsyncImpl(nil) - if !ok && !more { //channel closed and empty + if !ok && !more { // channel closed and empty return ok, more } @@ -774,7 +775,7 @@ func (c *channelImpl) Close() { // Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize func (c *channelImpl) assignValue(from interface{}, to interface{}) error { err := decodeAndAssignValue(c.dataConverter, from, to) - //add to metrics + // add to metrics if err != nil { c.env.GetLogger().Error(fmt.Sprintf("Corrupt signal received on channel %s. Error deserializing", c.name), zap.Error(err)) c.env.GetMetricsScope().Counter(metrics.CorruptedSignalsCounter).Inc(1) @@ -840,6 +841,7 @@ func (s *coroutineState) call() { func (s *coroutineState) close() { s.closed = true + close(s.completedShutdown) s.aboutToBlock <- true } @@ -849,6 +851,8 @@ func (s *coroutineState) exit() { runtime.Goexit() return true } + // wait for it + <-s.completedShutdown } } @@ -887,10 +891,11 @@ func (d *dispatcherImpl) newNamedCoroutine(ctx Context, name string, f func(ctx func (d *dispatcherImpl) newState(name string) *coroutineState { c := &coroutineState{ - name: name, - dispatcher: d, - aboutToBlock: make(chan bool, 1), - unblock: make(chan unblockFunc), + name: name, + dispatcher: d, + aboutToBlock: make(chan bool, 1), + unblock: make(chan unblockFunc), + completedShutdown: make(chan struct{}), } d.sequence++ d.coroutines = append(d.coroutines, c)