Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize non-streaming result stats aggregation #16480

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c7f792a
Add Result#MergeStats, #Stats
rafer Jul 23, 2024
fbab8a3
Handle result stats in non-streaming Concatenate
rafer Jul 23, 2024
e525e7c
Handle result stats in non-streaming Distinct
rafer Jul 24, 2024
0684937
Use MergeStats in DmlWithInput
rafer Jul 24, 2024
251d690
Handle result stats in non-streaming FkCascade
rafer Jul 24, 2024
c020851
Handle result stats in non-streaming FkVerify
rafer Jul 24, 2024
2005635
Handle result stats in non-streaming HashJoin
rafer Jul 24, 2024
8d7145e
Handle result stats in non-streaming InsertSelect
rafer Jul 24, 2024
3746357
Handle result stats in non-streaming Join
rafer Jul 24, 2024
1577883
Handle result stats in non-streaming Lock
rafer Jul 24, 2024
2ad27e8
Call MergeStats in Truncate and AppendResult
rafer Jul 25, 2024
8b3aa96
Handle result stats in non-streaming ScalarAggregate
rafer Jul 25, 2024
43ffe74
Handle result stats in non-streaming SemiJoin
rafer Jul 25, 2024
d83a352
Use MergeStats in Sequential
rafer Jul 25, 2024
3ae5dbc
Handle result stats in non-streaming SimpleProjection
rafer Jul 25, 2024
09a4773
Merge result stats from both queries in UncorrelatedSubquery
rafer Jul 25, 2024
0c401a5
Use MergeStats in non-streaming Upsert
rafer Jul 25, 2024
f2e5c97
Don't change ResultAppend early exit behavior
rafer Jul 25, 2024
8e9bccb
Extract Result#StatsEmpty
rafer Jul 25, 2024
5ef68ca
Check results instead of err when both are present
rafer Jul 25, 2024
fcda667
Revert "Handle result stats in non-streaming FkCascade"
rafer Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions go/sqltypes/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (result *Result) Truncate(l int) *Result {

out := &Result{
InsertID: result.InsertID,
RowsAffected: result.RowsAffected,
Info: result.Info,
SessionStateChanges: result.SessionStateChanges,
}
Expand All @@ -166,6 +165,7 @@ func (result *Result) Truncate(l int) *Result {
out.Rows = append(out.Rows, r[:l])
}
}
out.MergeStats(result)
return out
}

Expand Down Expand Up @@ -324,17 +324,33 @@ func (result *Result) StripMetadata(incl querypb.ExecuteOptions_IncludedFields)
// to another result.Note currently it doesn't handle cases like
// if two results have different fields.We will enhance this function.
func (result *Result) AppendResult(src *Result) {
if src.RowsAffected == 0 && len(src.Rows) == 0 && len(src.Fields) == 0 {
if src.StatsEmpty() && len(src.Rows) == 0 && len(src.Fields) == 0 {
return
}
if result.Fields == nil {
result.Fields = src.Fields
}
result.RowsAffected += src.RowsAffected
if src.InsertID != 0 {
result.InsertID = src.InsertID
}
result.Rows = append(result.Rows, src.Rows...)
result.MergeStats(src)
}

// Stats returns a copy of result with only the stats fields
func (result *Result) Stats() *Result {
return &Result{
RowsAffected: result.RowsAffected,
}
}

func (result *Result) StatsEmpty() bool {
return result.RowsAffected == 0
}

// MergeStats updates the receiver's stats by merging in the stats from src.
func (result *Result) MergeStats(src *Result) {
result.RowsAffected += src.RowsAffected
}

// Named returns a NamedResult based on this struct
Expand Down
26 changes: 26 additions & 0 deletions go/sqltypes/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,32 @@ func TestAppendResult(t *testing.T) {
}
}

func TestStats(t *testing.T) {
result := &Result{
RowsAffected: 1,
Fields: []*querypb.Field{{
Type: Int64,
}, {
Type: VarChar,
}},
InsertID: 1,
Rows: [][]Value{
{TestValue(Int64, "1"), MakeTrusted(VarChar, nil)},
{TestValue(Int64, "2"), MakeTrusted(VarChar, nil)},
},
}
want := &Result{
RowsAffected: 1,
}
assert.Equal(t, want, result.Stats())
}

func TestMergeStats(t *testing.T) {
result := &Result{RowsAffected: 1}
result.MergeStats(&Result{RowsAffected: 2})
assert.Equal(t, uint64(3), result.RowsAffected)
}

func TestReplaceKeyspace(t *testing.T) {
result := &Result{
Fields: []*querypb.Field{{
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,19 @@ func (c *Concatenate) TryExecute(ctx context.Context, vcursor VCursor, bindVars
return nil, err
}

var rows [][]sqltypes.Value
out := &sqltypes.Result{
Fields: fields,
}
err = c.coerceAndVisitResults(res, fieldTypes, func(result *sqltypes.Result) error {
rows = append(rows, result.Rows...)
out.Rows = append(out.Rows, result.Rows...)
out.MergeStats(result)
return nil
}, evalengine.ParseSQLMode(vcursor.SQLMode()))
if err != nil {
return nil, err
}

return &sqltypes.Result{
Fields: fields,
Rows: rows,
}, nil
return out, nil
}

func (c *Concatenate) coerceValuesTo(row sqltypes.Row, fieldTypes []evalengine.Type, sqlmode evalengine.SQLMode) error {
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vtgate/engine/concatenate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ func r(names, types string, rows ...string) *sqltypes.Result {
return sqltypes.MakeTestResult(fields, rows...)
}

func rWithStats(rowsAffected uint64, names, types string, rows ...string) *sqltypes.Result {
result := r(names, types, rows...)
result.RowsAffected = rowsAffected
return result
}

func TestConcatenate_NoErrors(t *testing.T) {
type testCase struct {
testName string
Expand Down Expand Up @@ -108,6 +114,13 @@ func TestConcatenate_NoErrors(t *testing.T) {
r("id|col1|col2", "int64|varchar|varbinary", "1|a1|b1", "2|a2|b2"),
},
expectedResult: r("myid|mycol1|mycol2", "int64|varchar|varbinary", "1|a1|b1", "2|a2|b2"),
}, {
testName: "merged stats",
inputs: []*sqltypes.Result{
rWithStats(1, "id|col1|col2", "int64|varbinary|varbinary", "1|a1|b1"),
rWithStats(2, "id|col1|col2", "int64|varbinary|varbinary", "2|a2|b2"),
},
expectedResult: rWithStats(3, "id|col1|col2", "int64|varbinary|varbinary", "1|a1|b1", "2|a2|b2"),
}}

for _, tc := range testCases {
Expand All @@ -130,6 +143,9 @@ func TestConcatenate_NoErrors(t *testing.T) {
require.NoError(t, err)
utils.MustMatch(t, tc.expectedResult.Fields, qr.Fields, "fields")
utils.MustMatch(t, tc.expectedResult.Rows, qr.Rows)

// Only testing stats match in non-streaming mode
utils.MustMatch(t, tc.expectedResult.Stats(), qr.Stats(), "stats")
} else {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedError)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (d *Distinct) TryExecute(ctx context.Context, vcursor VCursor, bindVars map
Fields: input.Fields,
InsertID: input.InsertID,
}
result.MergeStats(input)

pt := newProbeTable(d.CheckCols, vcursor.Environment().CollationEnv())

Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func TestDistinct(t *testing.T) {
collations: []collations.ID{collations.CollationUtf8mb4ID, collations.Unknown},
inputs: r("myid|id", "varchar|int64", "monkey|1", "horse|1", "Horse|1", "Monkey|1", "horses|1", "MONKEY|2"),
expectedResult: r("myid|id", "varchar|int64", "monkey|1", "horse|1", "horses|1", "MONKEY|2"),
}, {
testName: "merged stats",
collations: []collations.ID{collations.CollationUtf8mb4ID, collations.Unknown},
inputs: rWithStats(10, "myid", "int64", "0", "1", "1", "null", "null"),
expectedResult: rWithStats(10, "myid", "int64", "0", "1", "null"),
}}

for _, tc := range testCases {
Expand Down Expand Up @@ -107,6 +112,9 @@ func TestDistinct(t *testing.T) {
got := fmt.Sprintf("%v", qr.Rows)
expected := fmt.Sprintf("%v", tc.expectedResult.Rows)
utils.MustMatch(t, expected, got, "result not what correct")

// Only testing stats match in non-streaming mode
utils.MustMatch(t, tc.expectedResult.Stats(), qr.Stats(), "result stats did not match")
} else {
require.EqualError(t, err, tc.expectedError)
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/dml_with_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (dml *DMLWithInput) TryExecute(ctx context.Context, vcursor VCursor, bindVa
if res == nil {
res = qr
} else {
res.RowsAffected += qr.RowsAffected
res.MergeStats(qr)
}
}
return res, nil
Expand Down Expand Up @@ -146,7 +146,7 @@ func executeNonLiteralUpdate(ctx context.Context, vcursor VCursor, bindVars map[
if res == nil {
res = qr
} else {
res.RowsAffected += qr.RowsAffected
res.MergeStats(qr)
}
}
return res, nil
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vtgate/engine/fk_verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (f *FkVerify) GetFields(ctx context.Context, vcursor VCursor, bindVars map[

// TryExecute implements the Primitive interface
func (f *FkVerify) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
stats := &sqltypes.Result{}
for _, v := range f.Verify {
qr, err := vcursor.ExecutePrimitive(ctx, v.Exec, bindVars, wantfields)
if err != nil {
Expand All @@ -77,8 +78,14 @@ func (f *FkVerify) TryExecute(ctx context.Context, vcursor VCursor, bindVars map
if len(qr.Rows) > 0 {
return nil, getError(v.Typ)
}
stats.MergeStats(qr)
}
return vcursor.ExecutePrimitive(ctx, f.Exec, bindVars, wantfields)

result, err := vcursor.ExecutePrimitive(ctx, f.Exec, bindVars, wantfields)
if result != nil {
result.MergeStats(stats)
}
return result, err
}

// TryStreamExecute implements the Primitive interface
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vtgate/engine/fk_verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -58,16 +60,21 @@ func TestFKVerifyUpdate(t *testing.T) {

t.Run("foreign key verification success", func(t *testing.T) {
fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"))
fakeRes.RowsAffected = 1
vc := newDMLTestVCursor("0")
vc.results = []*sqltypes.Result{fakeRes}
_, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true)
vc.results = []*sqltypes.Result{
fakeRes,
{RowsAffected: 2},
}
result, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ks.0: select 1 from child c left join parent p on p.cola = 1 and p.colb = 'a' where p.cola is null and p.colb is null {} false false`,
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ks.0: update child set cola = 1, colb = 'a' where foo = 48 {} true true`,
})
assert.Equal(t, uint64(3), result.RowsAffected)

vc.Rewind()
err = fkc.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true, func(result *sqltypes.Result) error { return nil })
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/engine/hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func (hj *HashJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars ma
result := &sqltypes.Result{
Fields: joinFields(lresult.Fields, rresult.Fields, hj.Cols),
}
result.MergeStats(lresult)
result.MergeStats(rresult)

for _, currentRHSRow := range rresult.Rows {
matches, err := pt.get(currentRHSRow)
Expand Down
13 changes: 11 additions & 2 deletions go/vt/vtgate/engine/hash_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestHashJoinVariations(t *testing.T) {
// This test tries the different variations of hash-joins:
// comparing values of same type and different types, and both left and right outer joins
lhs := func() Primitive {
return &fakePrimitive{
p := &fakePrimitive{
results: []*sqltypes.Result{
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand All @@ -46,9 +46,12 @@ func TestHashJoinVariations(t *testing.T) {
),
},
}
p.results[0].RowsAffected = 1
return p
}

rhs := func() Primitive {
return &fakePrimitive{
p := &fakePrimitive{
results: []*sqltypes.Result{
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand All @@ -62,6 +65,8 @@ func TestHashJoinVariations(t *testing.T) {
),
},
}
p.results[0].RowsAffected = 2
return p
}

rows := func(r ...string) []string { return r }
Expand Down Expand Up @@ -131,6 +136,7 @@ func TestHashJoinVariations(t *testing.T) {
}

expected := sqltypes.MakeTestResult(fields, tc.expected...)
expected.RowsAffected = 3

typ, err := evalengine.CoerceTypes(typeForOffset(tc.lhs), typeForOffset(tc.rhs), collations.MySQL8())
require.NoError(t, err)
Expand All @@ -157,6 +163,9 @@ func TestHashJoinVariations(t *testing.T) {
jn.Right = last()
r, err := wrapStreamExecute(jn, &noopVCursor{}, map[string]*querypb.BindVariable{}, true)
require.NoError(t, err)

// Result stats handling not implemented for streaming
expected.RowsAffected = 0
expectResultAnyOrder(t, r, expected)
})
}
Expand Down
20 changes: 15 additions & 5 deletions go/vt/vtgate/engine/insert_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,20 @@ func (ins *InsertSelect) buildVindexRowsValues(rows []sqltypes.Row) ([][]sqltype
}

func (ins *InsertSelect) execInsertSharded(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
result, err := ins.execSelect(ctx, vcursor, bindVars)
selectResult, err := ins.execSelect(ctx, vcursor, bindVars)
if err != nil {
return nil, err
}
if len(result.rows) == 0 {
return &sqltypes.Result{}, nil
if len(selectResult.rows) == 0 {
return selectResult.stats, nil
}

result, err := ins.insertIntoShardedTable(ctx, vcursor, bindVars, selectResult)
if result != nil {
result.MergeStats(selectResult.stats)
}

return ins.insertIntoShardedTable(ctx, vcursor, bindVars, result)
return result, err
}

func (ins *InsertSelect) description() PrimitiveDescription {
Expand Down Expand Up @@ -369,6 +374,7 @@ func insertVarOffset(rowNum, colOffset int) string {
type insertRowsResult struct {
rows []sqltypes.Row
insertID uint64
stats *sqltypes.Result
}

func (ins *InsertSelect) execSelect(
Expand All @@ -377,9 +383,12 @@ func (ins *InsertSelect) execSelect(
bindVars map[string]*querypb.BindVariable,
) (insertRowsResult, error) {
res, err := vcursor.ExecutePrimitive(ctx, ins.Input, bindVars, false)
if err != nil || len(res.Rows) == 0 {
if err != nil {
return insertRowsResult{}, err
}
if len(res.Rows) == 0 {
return insertRowsResult{stats: res.Stats()}, nil
}

insertID, err := ins.processGenerateFromSelect(ctx, vcursor, ins, res.Rows)
if err != nil {
Expand All @@ -389,6 +398,7 @@ func (ins *InsertSelect) execSelect(
return insertRowsResult{
rows: res.Rows,
insertID: uint64(insertID),
stats: res.Stats(),
}, nil
}

Expand Down
Loading
Loading