Skip to content

Commit

Permalink
Refactor: New operator InsertionSelection to adhere to the operator m…
Browse files Browse the repository at this point in the history
…odel (vitessio#14286)
  • Loading branch information
GuptaManan100 authored Oct 16, 2023
1 parent a44d799 commit 8575b17
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 49 deletions.
58 changes: 45 additions & 13 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,53 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op ops.Operator) (
return transformFkCascade(ctx, op)
case *operators.FkVerify:
return transformFkVerify(ctx, op)
case *operators.InsertSelection:
return transformInsertionSelection(ctx, op)
}

return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op))
}

func transformInsertionSelection(ctx *plancontext.PlanningContext, op *operators.InsertSelection) (logicalPlan, error) {
rb, isRoute := op.InsertionOp.(*operators.Route)
if !isRoute {
return nil, vterrors.VT13001(fmt.Sprintf("Incorrect type encountered: %T (transformInsertionSelection)", op.InsertionOp))
}

stmt, dmlOp, err := operators.ToSQL(ctx, rb.Source)
if err != nil {
return nil, err
}

if stmtWithComments, ok := stmt.(sqlparser.Commented); ok && rb.Comments != nil {
stmtWithComments.SetComments(rb.Comments.GetComments())
}

ins := dmlOp.(*operators.Insert)
eins := &engine.Insert{
Opcode: mapToInsertOpCode(rb.Routing.OpCode(), true),
Keyspace: rb.Routing.Keyspace(),
TableName: ins.VTable.Name.String(),
Ignore: ins.Ignore,
ForceNonStreaming: op.ForceNonStreaming,
Generate: autoIncGenerate(ins.AutoIncrement),
ColVindexes: ins.ColVindexes,
VindexValues: ins.VindexValues,
VindexValueOffset: ins.VindexValueOffset,
}
lp := &insert{eInsert: eins}

eins.Prefix, eins.Mid, eins.Suffix = generateInsertShardedQuery(ins.AST)

selectionPlan, err := transformToLogicalPlan(ctx, op.SelectionOp)
if err != nil {
return nil, err
}
lp.source = selectionPlan

return lp, nil
}

// transformFkCascade transforms a FkCascade operator into a logical plan.
func transformFkCascade(ctx *plancontext.PlanningContext, fkc *operators.FkCascade) (logicalPlan, error) {
// We convert the parent operator to a logical plan.
Expand Down Expand Up @@ -460,11 +502,10 @@ func buildRouteLogicalPlan(ctx *plancontext.PlanningContext, op *operators.Route
func buildInsertLogicalPlan(ctx *plancontext.PlanningContext, rb *operators.Route, op ops.Operator, stmt *sqlparser.Insert) (logicalPlan, error) {
ins := op.(*operators.Insert)
eins := &engine.Insert{
Opcode: mapToInsertOpCode(rb.Routing.OpCode(), ins.Input != nil),
Opcode: mapToInsertOpCode(rb.Routing.OpCode(), false),
Keyspace: rb.Routing.Keyspace(),
TableName: ins.VTable.Name.String(),
Ignore: ins.Ignore,
ForceNonStreaming: ins.ForceNonStreaming,
Generate: autoIncGenerate(ins.AutoIncrement),
ColVindexes: ins.ColVindexes,
VindexValues: ins.VindexValues,
Expand All @@ -474,20 +515,11 @@ func buildInsertLogicalPlan(ctx *plancontext.PlanningContext, rb *operators.Rout

// we would need to generate the query on the fly. The only exception here is
// when unsharded query with autoincrement for that there is no input operator.
if eins.Opcode != engine.InsertUnsharded || ins.Input != nil {
if eins.Opcode != engine.InsertUnsharded {
eins.Prefix, eins.Mid, eins.Suffix = generateInsertShardedQuery(ins.AST)
}

if ins.Input == nil {
eins.Query = generateQuery(stmt)
} else {
newSrc, err := transformToLogicalPlan(ctx, ins.Input)
if err != nil {
return nil, err
}
lp.source = newSrc
}

eins.Query = generateQuery(stmt)
return lp, nil
}

Expand Down
49 changes: 13 additions & 36 deletions go/vt/vtgate/planbuilder/operators/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type Insert struct {
AutoIncrement *Generate
// Ignore specifies whether to ignore duplicate key errors during insertion.
Ignore bool
// ForceNonStreaming when true, select first then insert, this is to avoid locking rows by select for insert.
ForceNonStreaming bool

// ColVindexes are the vindexes that will use the VindexValues or VindexValueOffset
ColVindexes []*vindexes.ColumnVindex
Expand All @@ -53,26 +51,11 @@ type Insert struct {
// that will appear in the result set of the select query.
VindexValueOffset [][]int

// Insert using select query will have select plan as input operator for the insert operation.
Input ops.Operator

noInputs
noColumns
noPredicates
}

func (i *Insert) Inputs() []ops.Operator {
if i.Input == nil {
return nil
}
return []ops.Operator{i.Input}
}

func (i *Insert) SetInputs(inputs []ops.Operator) {
if len(inputs) > 0 {
i.Input = inputs[0]
}
}

// Generate represents an auto-increment generator for the insert operation.
type Generate struct {
// Keyspace represents the keyspace information for the table.
Expand Down Expand Up @@ -102,18 +85,12 @@ func (i *Insert) GetOrdering(*plancontext.PlanningContext) []ops.OrderBy {

var _ ops.Operator = (*Insert)(nil)

func (i *Insert) Clone(inputs []ops.Operator) ops.Operator {
var input ops.Operator
if len(inputs) > 0 {
input = inputs[0]
}
func (i *Insert) Clone([]ops.Operator) ops.Operator {
return &Insert{
Input: input,
VTable: i.VTable,
AST: i.AST,
AutoIncrement: i.AutoIncrement,
Ignore: i.Ignore,
ForceNonStreaming: i.ForceNonStreaming,
ColVindexes: i.ColVindexes,
VindexValues: i.VindexValues,
VindexValueOffset: i.VindexValueOffset,
Expand Down Expand Up @@ -207,15 +184,12 @@ func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.I
return nil, err
}
case sqlparser.SelectStatement:
route.Source, err = insertSelectPlan(ctx, insOp, insStmt, rows)
if err != nil {
return nil, err
}
return insertSelectPlan(ctx, insOp, route, insStmt, rows)
}
return route, nil
}

func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlparser.Insert, sel sqlparser.SelectStatement) (*Insert, error) {
func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, routeOp *Route, ins *sqlparser.Insert, sel sqlparser.SelectStatement) (*InsertSelection, error) {
if columnMismatch(insOp.AutoIncrement, ins, sel) {
return nil, vterrors.VT03006()
}
Expand All @@ -225,23 +199,26 @@ func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlp
return nil, err
}

// select plan will be taken as input to insert rows into the table.
insOp.Input = selOp
// output of the select plan will be used to insert rows into the table.
insertSelect := &InsertSelection{
SelectionOp: selOp,
InsertionOp: routeOp,
}

// When the table you are steaming data from and table you are inserting from are same.
// When the table you are streaming data from and table you are inserting from are same.
// Then due to locking of the index range on the table we might not be able to insert into the table.
// Therefore, instead of streaming, this flag will ensure the records are first read and then inserted.
insertTbl := insOp.TablesUsed()[0]
selTables := TablesUsed(selOp)
for _, tbl := range selTables {
if insertTbl == tbl {
insOp.ForceNonStreaming = true
insertSelect.ForceNonStreaming = true
break
}
}

if len(insOp.ColVindexes) == 0 {
return insOp, nil
return insertSelect, nil
}

colVindexes := insOp.ColVindexes
Expand All @@ -262,7 +239,7 @@ func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlp
}
}
insOp.VindexValueOffset = vv
return insOp, nil
return insertSelect, nil
}

func columnMismatch(gen *Generate, ins *sqlparser.Insert, sel sqlparser.SelectStatement) bool {
Expand Down
61 changes: 61 additions & 0 deletions go/vt/vtgate/planbuilder/operators/insert_selection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operators

import (
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

// InsertSelection operator represents an INSERT into SELECT FROM query.
// It holds the operators for running the selection and insertion.
type InsertSelection struct {
SelectionOp ops.Operator
InsertionOp ops.Operator

// ForceNonStreaming when true, select first then insert, this is to avoid locking rows by select for insert.
ForceNonStreaming bool

noColumns
noPredicates
}

func (is *InsertSelection) Clone(inputs []ops.Operator) ops.Operator {
return &InsertSelection{
SelectionOp: inputs[0],
InsertionOp: inputs[1],
}
}

func (is *InsertSelection) Inputs() []ops.Operator {
return []ops.Operator{is.SelectionOp, is.InsertionOp}
}

func (is *InsertSelection) SetInputs(inputs []ops.Operator) {
is.SelectionOp = inputs[0]
is.InsertionOp = inputs[1]
}

func (is *InsertSelection) ShortDescription() string {
return ""
}

func (is *InsertSelection) GetOrdering(*plancontext.PlanningContext) []ops.OrderBy {
return nil
}

var _ ops.Operator = (*InsertSelection)(nil)

0 comments on commit 8575b17

Please sign in to comment.