Skip to content

Commit

Permalink
VTAdmin: Support for Materialize Create
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Oct 11, 2024
1 parent f0062e6 commit a3941a4
Show file tree
Hide file tree
Showing 15 changed files with 2,473 additions and 1,199 deletions.
46 changes: 46 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"time"
Expand All @@ -27,6 +28,7 @@ import (

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/vtctldclient"
Expand Down Expand Up @@ -143,6 +145,50 @@ func ParseTabletTypes(cmd *cobra.Command) error {
return nil
}

func ParseTableMaterializeSettings(tableSettings string, parser *sqlparser.Parser) ([]*vtctldatapb.TableMaterializeSettings, error) {
tableMaterializeSettings := make([]*vtctldatapb.TableMaterializeSettings, 0)
err := json.Unmarshal([]byte(tableSettings), &tableMaterializeSettings)
if err != nil {
return tableMaterializeSettings, fmt.Errorf("table-settings is not valid JSON")
}
if len(tableMaterializeSettings) == 0 {
return tableMaterializeSettings, fmt.Errorf("empty table-settings")
}

// Validate the provided queries.
seenSourceTables := make(map[string]bool)
for _, tms := range tableMaterializeSettings {
if tms.TargetTable == "" || tms.SourceExpression == "" {
return tableMaterializeSettings, fmt.Errorf("missing target_table or source_expression")
}
// Validate that the query is valid.
stmt, err := parser.Parse(tms.SourceExpression)
if err != nil {
return tableMaterializeSettings, fmt.Errorf("invalid source_expression: %q", tms.SourceExpression)
}
// Validate that each source-expression uses a different table.
// If any of them query the same table the materialize workflow
// will fail.
err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case sqlparser.TableName:
if node.Name.NotEmpty() {
if seenSourceTables[node.Name.String()] {
return false, fmt.Errorf("multiple source_expression queries use the same table: %q", node.Name.String())
}
seenSourceTables[node.Name.String()] = true
}
}
return true, nil
}, stmt)
if err != nil {
return tableMaterializeSettings, err
}
}

return tableMaterializeSettings, nil
}

func validateOnDDL(cmd *cobra.Command) error {
if _, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(CreateOptions.OnDDL)]; !ok {
return fmt.Errorf("invalid on-ddl value: %s", CreateOptions.OnDDL)
Expand Down
44 changes: 3 additions & 41 deletions go/cmd/vtctldclient/command/vreplication/materialize/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,47 +162,9 @@ func (ts *tableSettings) String() string {
}

func (ts *tableSettings) Set(v string) error {
ts.val = make([]*vtctldatapb.TableMaterializeSettings, 0)
err := json.Unmarshal([]byte(v), &ts.val)
if err != nil {
return fmt.Errorf("table-settings is not valid JSON")
}
if len(ts.val) == 0 {
return fmt.Errorf("empty table-settings")
}

// Validate the provided queries.
seenSourceTables := make(map[string]bool)
for _, tms := range ts.val {
if tms.TargetTable == "" || tms.SourceExpression == "" {
return fmt.Errorf("missing target_table or source_expression")
}
// Validate that the query is valid.
stmt, err := ts.parser.Parse(tms.SourceExpression)
if err != nil {
return fmt.Errorf("invalid source_expression: %q", tms.SourceExpression)
}
// Validate that each source-expression uses a different table.
// If any of them query the same table the materialize workflow
// will fail.
err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case sqlparser.TableName:
if node.Name.NotEmpty() {
if seenSourceTables[node.Name.String()] {
return false, fmt.Errorf("multiple source_expression queries use the same table: %q", node.Name.String())
}
seenSourceTables[node.Name.String()] = true
}
}
return true, nil
}, stmt)
if err != nil {
return err
}
}

return nil
var err error
ts.val, err = common.ParseTableMaterializeSettings(v, ts.parser)
return err
}

func (ts *tableSettings) Type() string {
Expand Down
Loading

0 comments on commit a3941a4

Please sign in to comment.