Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support cross shard DELETE with LIMIT/ORDER BY #14959

Merged
merged 8 commits into from
Jan 24, 2024
62 changes: 62 additions & 0 deletions go/test/endtoend/vtgate/queries/dml/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/utils"
)
Expand Down Expand Up @@ -78,3 +79,64 @@ func TestMultiTableDelete(t *testing.T) {
mcmp.AssertMatches(`select oid, ename from oevent_tbl order by oid`,
`[[INT64(1) VARCHAR("a")] [INT64(2) VARCHAR("b")] [INT64(3) VARCHAR("a")] [INT64(4) VARCHAR("c")]]`)
}

// TestDeleteWithLimit executed delete queries with limit
func TestDeleteWithLimit(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 19, "vtgate")

harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
mcmp, closer := start(t)
defer closer()

// initial rows
mcmp.Exec("insert into s_tbl(id, num) values (1,10), (2,10), (3,10), (4,20), (5,5), (6,15), (7,17), (8,80)")
mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)")

// check rows
mcmp.AssertMatches(`select id, num from s_tbl order by id`,
`[[INT64(1) INT64(10)] [INT64(2) INT64(10)] [INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(5) INT64(5)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`)
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)

// delete with limit
qr := mcmp.Exec(`delete from s_tbl order by num, id limit 3`)
require.EqualValues(t, 3, qr.RowsAffected)

qr = mcmp.Exec(`delete from order_tbl where region_id = 1 limit 1`)
require.EqualValues(t, 1, qr.RowsAffected)

// check rows
mcmp.AssertMatches(`select id, num from s_tbl order by id`,
`[[INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`)
// 2 rows matches but limit is 1, so any one of the row can remain in table.
mcmp.AssertMatchesAnyNoCompare(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)

// delete with limit
qr = mcmp.Exec(`delete from s_tbl where num < 20 limit 2`)
require.EqualValues(t, 2, qr.RowsAffected)

qr = mcmp.Exec(`delete from order_tbl limit 5`)
require.EqualValues(t, 3, qr.RowsAffected)

// check rows
// 3 rows matches `num < 20` but limit is 2 so any one of them can remain in the table.
mcmp.AssertMatchesAnyNoCompare(`select id, num from s_tbl order by id`,
`[[INT64(4) INT64(20)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`,
`[[INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(8) INT64(80)]]`,
`[[INT64(4) INT64(20)] [INT64(6) INT64(15)] [INT64(8) INT64(80)]]`)
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[]`)

// remove all rows
mcmp.Exec(`delete from s_tbl`)
mcmp.Exec(`delete from order_tbl limit 5`)

// try with limit again on empty table.
qr = mcmp.Exec(`delete from s_tbl where num < 20 limit 2`)
require.EqualValues(t, 0, qr.RowsAffected)

qr = mcmp.Exec(`delete from order_tbl limit 5`)
require.EqualValues(t, 0, qr.RowsAffected)

}
8 changes: 6 additions & 2 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 0 additions & 94 deletions go/vt/vtgate/engine/delete_multi.go

This file was deleted.

70 changes: 0 additions & 70 deletions go/vt/vtgate/engine/delete_multi_test.go

This file was deleted.

121 changes: 121 additions & 0 deletions go/vt/vtgate/engine/delete_with_input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright 2023 The Vitess Authors.
frouioui marked this conversation as resolved.
Show resolved Hide resolved

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
)

var _ Primitive = (*DeleteWithInput)(nil)

const DmVals = "dm_vals"

// DeleteWithInput represents the instructions to perform a delete operation based on the input result.
type DeleteWithInput struct {
Delete Primitive
Input Primitive

OutputCols []int

txNeeded
}

func (del *DeleteWithInput) RouteType() string {
return "DeleteWithInput"

Check warning on line 44 in go/vt/vtgate/engine/delete_with_input.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/engine/delete_with_input.go#L43-L44

Added lines #L43 - L44 were not covered by tests
}

func (del *DeleteWithInput) GetKeyspaceName() string {
return del.Input.GetKeyspaceName()

Check warning on line 48 in go/vt/vtgate/engine/delete_with_input.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/engine/delete_with_input.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

func (del *DeleteWithInput) GetTableName() string {
return del.Input.GetTableName()

Check warning on line 52 in go/vt/vtgate/engine/delete_with_input.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/engine/delete_with_input.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (del *DeleteWithInput) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{del.Input, del.Delete}, nil

Check warning on line 56 in go/vt/vtgate/engine/delete_with_input.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/engine/delete_with_input.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}

// TryExecute performs a non-streaming exec.
func (del *DeleteWithInput) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
inputRes, err := vcursor.ExecutePrimitive(ctx, del.Input, bindVars, false)
if err != nil {
return nil, err
}

Check warning on line 64 in go/vt/vtgate/engine/delete_with_input.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/engine/delete_with_input.go#L63-L64

Added lines #L63 - L64 were not covered by tests

var bv *querypb.BindVariable
if len(del.OutputCols) == 1 {
bv = getBVSingle(inputRes, del.OutputCols[0])
} else {
bv = getBVMulti(inputRes, del.OutputCols)
}

bindVars[DmVals] = bv
return vcursor.ExecutePrimitive(ctx, del.Delete, bindVars, false)
}

func getBVSingle(res *sqltypes.Result, offset int) *querypb.BindVariable {
bv := &querypb.BindVariable{Type: querypb.Type_TUPLE}
for _, row := range res.Rows {
bv.Values = append(bv.Values, sqltypes.ValueToProto(row[offset]))
}
return bv
}

func getBVMulti(res *sqltypes.Result, offsets []int) *querypb.BindVariable {
bv := &querypb.BindVariable{Type: querypb.Type_TUPLE}
outputVals := make([]sqltypes.Value, 0, len(offsets))
for _, row := range res.Rows {
for _, offset := range offsets {
outputVals = append(outputVals, row[offset])
}
bv.Values = append(bv.Values, sqltypes.TupleToProto(outputVals))
outputVals = outputVals[:0]
}
return bv
}

// TryStreamExecute performs a streaming exec.
func (del *DeleteWithInput) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
res, err := del.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return err
}

Check warning on line 103 in go/vt/vtgate/engine/delete_with_input.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/engine/delete_with_input.go#L102-L103

Added lines #L102 - L103 were not covered by tests
return callback(res)
}

// GetFields fetches the field info.
func (del *DeleteWithInput) GetFields(context.Context, VCursor, map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.VT13001("unreachable code for MULTI DELETE")

Check warning on line 109 in go/vt/vtgate/engine/delete_with_input.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/engine/delete_with_input.go#L108-L109

Added lines #L108 - L109 were not covered by tests
}

func (del *DeleteWithInput) description() PrimitiveDescription {
other := map[string]any{
"Offset": del.OutputCols,
}
return PrimitiveDescription{
OperatorType: "DeleteWithInput",
TargetTabletType: topodatapb.TabletType_PRIMARY,
Other: other,
}

Check warning on line 120 in go/vt/vtgate/engine/delete_with_input.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/engine/delete_with_input.go#L112-L120

Added lines #L112 - L120 were not covered by tests
}
Loading
Loading