From 8575b170183bce389103d451965dc12070f4e36d Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Mon, 16 Oct 2023 15:02:19 +0530 Subject: [PATCH] Refactor: New operator InsertionSelection to adhere to the operator model (#14286) --- .../planbuilder/operator_transformers.go | 58 ++++++++++++++---- go/vt/vtgate/planbuilder/operators/insert.go | 49 ++++----------- .../planbuilder/operators/insert_selection.go | 61 +++++++++++++++++++ 3 files changed, 119 insertions(+), 49 deletions(-) create mode 100644 go/vt/vtgate/planbuilder/operators/insert_selection.go diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index 8104a20bc6c..5cfd07e8062 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -66,11 +66,53 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op ops.Operator) ( return transformFkCascade(ctx, op) case *operators.FkVerify: return transformFkVerify(ctx, op) + case *operators.InsertSelection: + return transformInsertionSelection(ctx, op) } return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op)) } +func transformInsertionSelection(ctx *plancontext.PlanningContext, op *operators.InsertSelection) (logicalPlan, error) { + rb, isRoute := op.InsertionOp.(*operators.Route) + if !isRoute { + return nil, vterrors.VT13001(fmt.Sprintf("Incorrect type encountered: %T (transformInsertionSelection)", op.InsertionOp)) + } + + stmt, dmlOp, err := operators.ToSQL(ctx, rb.Source) + if err != nil { + return nil, err + } + + if stmtWithComments, ok := stmt.(sqlparser.Commented); ok && rb.Comments != nil { + stmtWithComments.SetComments(rb.Comments.GetComments()) + } + + ins := dmlOp.(*operators.Insert) + eins := &engine.Insert{ + Opcode: mapToInsertOpCode(rb.Routing.OpCode(), true), + Keyspace: rb.Routing.Keyspace(), + TableName: ins.VTable.Name.String(), + Ignore: ins.Ignore, + ForceNonStreaming: op.ForceNonStreaming, + Generate: autoIncGenerate(ins.AutoIncrement), + ColVindexes: ins.ColVindexes, + VindexValues: ins.VindexValues, + VindexValueOffset: ins.VindexValueOffset, + } + lp := &insert{eInsert: eins} + + eins.Prefix, eins.Mid, eins.Suffix = generateInsertShardedQuery(ins.AST) + + selectionPlan, err := transformToLogicalPlan(ctx, op.SelectionOp) + if err != nil { + return nil, err + } + lp.source = selectionPlan + + return lp, nil +} + // transformFkCascade transforms a FkCascade operator into a logical plan. func transformFkCascade(ctx *plancontext.PlanningContext, fkc *operators.FkCascade) (logicalPlan, error) { // We convert the parent operator to a logical plan. @@ -460,11 +502,10 @@ func buildRouteLogicalPlan(ctx *plancontext.PlanningContext, op *operators.Route func buildInsertLogicalPlan(ctx *plancontext.PlanningContext, rb *operators.Route, op ops.Operator, stmt *sqlparser.Insert) (logicalPlan, error) { ins := op.(*operators.Insert) eins := &engine.Insert{ - Opcode: mapToInsertOpCode(rb.Routing.OpCode(), ins.Input != nil), + Opcode: mapToInsertOpCode(rb.Routing.OpCode(), false), Keyspace: rb.Routing.Keyspace(), TableName: ins.VTable.Name.String(), Ignore: ins.Ignore, - ForceNonStreaming: ins.ForceNonStreaming, Generate: autoIncGenerate(ins.AutoIncrement), ColVindexes: ins.ColVindexes, VindexValues: ins.VindexValues, @@ -474,20 +515,11 @@ func buildInsertLogicalPlan(ctx *plancontext.PlanningContext, rb *operators.Rout // we would need to generate the query on the fly. The only exception here is // when unsharded query with autoincrement for that there is no input operator. - if eins.Opcode != engine.InsertUnsharded || ins.Input != nil { + if eins.Opcode != engine.InsertUnsharded { eins.Prefix, eins.Mid, eins.Suffix = generateInsertShardedQuery(ins.AST) } - if ins.Input == nil { - eins.Query = generateQuery(stmt) - } else { - newSrc, err := transformToLogicalPlan(ctx, ins.Input) - if err != nil { - return nil, err - } - lp.source = newSrc - } - + eins.Query = generateQuery(stmt) return lp, nil } diff --git a/go/vt/vtgate/planbuilder/operators/insert.go b/go/vt/vtgate/planbuilder/operators/insert.go index 57c01da5d4b..e732b7c2b09 100644 --- a/go/vt/vtgate/planbuilder/operators/insert.go +++ b/go/vt/vtgate/planbuilder/operators/insert.go @@ -40,8 +40,6 @@ type Insert struct { AutoIncrement *Generate // Ignore specifies whether to ignore duplicate key errors during insertion. Ignore bool - // ForceNonStreaming when true, select first then insert, this is to avoid locking rows by select for insert. - ForceNonStreaming bool // ColVindexes are the vindexes that will use the VindexValues or VindexValueOffset ColVindexes []*vindexes.ColumnVindex @@ -53,26 +51,11 @@ type Insert struct { // that will appear in the result set of the select query. VindexValueOffset [][]int - // Insert using select query will have select plan as input operator for the insert operation. - Input ops.Operator - + noInputs noColumns noPredicates } -func (i *Insert) Inputs() []ops.Operator { - if i.Input == nil { - return nil - } - return []ops.Operator{i.Input} -} - -func (i *Insert) SetInputs(inputs []ops.Operator) { - if len(inputs) > 0 { - i.Input = inputs[0] - } -} - // Generate represents an auto-increment generator for the insert operation. type Generate struct { // Keyspace represents the keyspace information for the table. @@ -102,18 +85,12 @@ func (i *Insert) GetOrdering(*plancontext.PlanningContext) []ops.OrderBy { var _ ops.Operator = (*Insert)(nil) -func (i *Insert) Clone(inputs []ops.Operator) ops.Operator { - var input ops.Operator - if len(inputs) > 0 { - input = inputs[0] - } +func (i *Insert) Clone([]ops.Operator) ops.Operator { return &Insert{ - Input: input, VTable: i.VTable, AST: i.AST, AutoIncrement: i.AutoIncrement, Ignore: i.Ignore, - ForceNonStreaming: i.ForceNonStreaming, ColVindexes: i.ColVindexes, VindexValues: i.VindexValues, VindexValueOffset: i.VindexValueOffset, @@ -207,15 +184,12 @@ func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.I return nil, err } case sqlparser.SelectStatement: - route.Source, err = insertSelectPlan(ctx, insOp, insStmt, rows) - if err != nil { - return nil, err - } + return insertSelectPlan(ctx, insOp, route, insStmt, rows) } return route, nil } -func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlparser.Insert, sel sqlparser.SelectStatement) (*Insert, error) { +func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, routeOp *Route, ins *sqlparser.Insert, sel sqlparser.SelectStatement) (*InsertSelection, error) { if columnMismatch(insOp.AutoIncrement, ins, sel) { return nil, vterrors.VT03006() } @@ -225,23 +199,26 @@ func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlp return nil, err } - // select plan will be taken as input to insert rows into the table. - insOp.Input = selOp + // output of the select plan will be used to insert rows into the table. + insertSelect := &InsertSelection{ + SelectionOp: selOp, + InsertionOp: routeOp, + } - // When the table you are steaming data from and table you are inserting from are same. + // When the table you are streaming data from and table you are inserting from are same. // Then due to locking of the index range on the table we might not be able to insert into the table. // Therefore, instead of streaming, this flag will ensure the records are first read and then inserted. insertTbl := insOp.TablesUsed()[0] selTables := TablesUsed(selOp) for _, tbl := range selTables { if insertTbl == tbl { - insOp.ForceNonStreaming = true + insertSelect.ForceNonStreaming = true break } } if len(insOp.ColVindexes) == 0 { - return insOp, nil + return insertSelect, nil } colVindexes := insOp.ColVindexes @@ -262,7 +239,7 @@ func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlp } } insOp.VindexValueOffset = vv - return insOp, nil + return insertSelect, nil } func columnMismatch(gen *Generate, ins *sqlparser.Insert, sel sqlparser.SelectStatement) bool { diff --git a/go/vt/vtgate/planbuilder/operators/insert_selection.go b/go/vt/vtgate/planbuilder/operators/insert_selection.go new file mode 100644 index 00000000000..454052e0b61 --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/insert_selection.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" +) + +// InsertSelection operator represents an INSERT into SELECT FROM query. +// It holds the operators for running the selection and insertion. +type InsertSelection struct { + SelectionOp ops.Operator + InsertionOp ops.Operator + + // ForceNonStreaming when true, select first then insert, this is to avoid locking rows by select for insert. + ForceNonStreaming bool + + noColumns + noPredicates +} + +func (is *InsertSelection) Clone(inputs []ops.Operator) ops.Operator { + return &InsertSelection{ + SelectionOp: inputs[0], + InsertionOp: inputs[1], + } +} + +func (is *InsertSelection) Inputs() []ops.Operator { + return []ops.Operator{is.SelectionOp, is.InsertionOp} +} + +func (is *InsertSelection) SetInputs(inputs []ops.Operator) { + is.SelectionOp = inputs[0] + is.InsertionOp = inputs[1] +} + +func (is *InsertSelection) ShortDescription() string { + return "" +} + +func (is *InsertSelection) GetOrdering(*plancontext.PlanningContext) []ops.OrderBy { + return nil +} + +var _ ops.Operator = (*InsertSelection)(nil)