-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix concurrency on stream execute engine primitives #14586
fix concurrency on stream execute engine primitives #14586
Conversation
… apis Signed-off-by: Harshit Gangal <[email protected]>
Review ChecklistHello reviewers! 👋 Please follow this checklist when reviewing this Pull Request. General
Tests
Documentation
New flags
If a workflow is added or modified:
Backward compatibility
|
@@ -88,6 +88,7 @@ func (p *Projection) TryStreamExecute(ctx context.Context, vcursor VCursor, bind | |||
env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) | |||
var once sync.Once | |||
var fields []*querypb.Field | |||
var mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harshit-gangal Is this one necessary? There's already locking in the implementation here which should afaik handle any concurrency issues.
@vmg Afaik the evalengine.NewExpressionEnv
should be concurrent usage safe, or is it not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it definitely isn't, it contains the VM that is used to execute the expressions. You cannot evaluate two expressions at once in the same VM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, so we gotta lock here then as well for now unless we'd want to move creating the env inside the callback then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The next plan is to create env per shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That refactor would not be part of the backport and will be made as a separate PR.
go/vt/vtgate/engine/limit.go
Outdated
@@ -108,6 +110,8 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars | |||
return nil | |||
} | |||
|
|||
mu.Lock() | |||
defer mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like need this for the count, but wonder, should we do this with an atomic counter instead? The usage might be complicated though, so the lock might be a lot easier for now and then to optimize later.
Signed-off-by: Harshit Gangal <[email protected]>
…rojectio and distinct streaming async test Signed-off-by: Harshit Gangal <[email protected]>
go/vt/vtgate/engine/projection.go
Outdated
return vcursor.StreamExecutePrimitive(ctx, p.Input, bindVars, wantfields, func(qr *sqltypes.Result) error { | ||
var err error | ||
if wantfields { | ||
if wantfields && qr.Fields != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harshit-gangal This change breaks a bunch of tests it looks like. Why was this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this change was made due to the testing in projection_test.go
, but I suspect it's a problem with that test? Since it runs concurrent stream results, but even if that happens, it's guaranteed that there would always be a field result first, so I don't think the test models a real scenario then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think that now that we lock here to also protect the evalengine expression env, we don't need the more elaborate locking around wantfields
anymore.
This ensures that we project fields properly and removes racy behavior from the limit logic as well by moving up the lock. Signed-off-by: Dirkjan Bussink <[email protected]>
Signed-off-by: Dirkjan Bussink <[email protected]>
Signed-off-by: Dirkjan Bussink <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Co-authored-by: Dirkjan Bussink <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Co-authored-by: Dirkjan Bussink <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Co-authored-by: Dirkjan Bussink <[email protected]>
…14586) (#14591) Signed-off-by: Harshit Gangal <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Dirkjan Bussink <[email protected]> Co-authored-by: Harshit Gangal <[email protected]>
…14586) (#14592) Signed-off-by: Harshit Gangal <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Dirkjan Bussink <[email protected]> Co-authored-by: Harshit Gangal <[email protected]>
…14586) (#14590) Signed-off-by: Harshit Gangal <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Co-authored-by: Harshit Gangal <[email protected]> Co-authored-by: Dirkjan Bussink <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Co-authored-by: Dirkjan Bussink <[email protected]>
Description
This PR adds the required lock on the engine primitive with Streaming support where a local state is maintained and needs to be synchronized between parallel calls from underlying callbacks.
The primitive were either returning the wrong result or were causing VTGate panics.
Related Issue(s)
Checklist