Skip to content

Commit

Permalink
Get Materialize creation working w/o VReplicationExec
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Nov 2, 2023
1 parent 12e1a60 commit 1045dd5
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 14 deletions.
6 changes: 4 additions & 2 deletions go/textutil/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"unicode"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/binlogdata"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -94,7 +94,7 @@ func ValueIsSimulatedNull(val any) bool {
return cval == SimulatedNullString
case []string:
return len(cval) == 1 && cval[0] == sqltypes.NULL.String()
case binlogdata.OnDDLAction:
case binlogdatapb.OnDDLAction:
return int32(cval) == int32(SimulatedNullInt)
case int:
return cval == SimulatedNullInt
Expand All @@ -104,6 +104,8 @@ func ValueIsSimulatedNull(val any) bool {
return int64(cval) == int64(SimulatedNullInt)
case []topodatapb.TabletType:
return len(cval) == 1 && cval[0] == topodatapb.TabletType(SimulatedNullInt)
case binlogdatapb.VReplicationWorkflowState:
return int32(cval) == int32(SimulatedNullInt)
default:
return false
}
Expand Down
13 changes: 13 additions & 0 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ type materializer struct {
workflowType binlogdatapb.VReplicationWorkflowType
}

func (mz *materializer) getWorkflowType() binlogdatapb.VReplicationWorkflowType {
var workflowType binlogdatapb.VReplicationWorkflowType
switch mz.ms.MaterializationIntent {
case vtctldatapb.MaterializationIntent_CUSTOM:
workflowType = binlogdatapb.VReplicationWorkflowType_Materialize
case vtctldatapb.MaterializationIntent_MOVETABLES:
workflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
case vtctldatapb.MaterializationIntent_CREATELOOKUPINDEX:
workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex
}
return workflowType
}

func (mz *materializer) getWorkflowSubType() (binlogdatapb.VReplicationWorkflowSubType, error) {
switch {
case mz.isPartial && mz.ms.AtomicCopy:
Expand Down
20 changes: 19 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,25 @@ func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSet
ms: ms,
}

err := mz.createMaterializerStreams()
tsp := tabletmanagerdatapb.TabletSelectionPreference_INORDER
tt, inOrder, err := discovery.ParseTabletTypesAndOrder(ms.TabletTypes)
if err != nil {
return err
}
if inOrder {
tsp = tabletmanagerdatapb.TabletSelectionPreference_INORDER
}

err = mz.createWorkflowStreams(&tabletmanagerdatapb.CreateVReplicationWorkflowRequest{
Workflow: ms.Workflow,
Cells: strings.Split(ms.Cell, ","),
TabletTypes: tt,
TabletSelectionPreference: tsp,
WorkflowType: mz.getWorkflowType(),
DeferSecondaryKeys: ms.DeferSecondaryKeys,
AutoStart: true,
StopAfterCopy: ms.StopAfterCopy,
})
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand Down Expand Up @@ -300,6 +301,7 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
}
if !textutil.ValueIsSimulatedNull(req.State) {
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
log.Errorf("DEBUG: State was updated to %s using request value: %v", state, req.State)
}
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
Expand Down
38 changes: 27 additions & 11 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,10 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
keyspace, shard)
selectRes := sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|cell|tablet_types",
"int64|varchar|varchar|varchar",
"id|source|cell|tablet_types|state|message",
"int64|varchar|varchar|varchar|varchar|varbinary",
),
fmt.Sprintf("%d|%s|%s|%s", vreplID, blsStr, cells[0], tabletTypes[0]),
fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID, blsStr, cells[0], tabletTypes[0]),
)
idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a",
sqltypes.Int64BindVariable(int64(vreplID)))
Expand All @@ -504,62 +504,80 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
name: "update cells",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone2"},
// TabletTypes is an empty value, so the current value should be cleared
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`,
keyspace, shard, "zone2", vreplID),
},
{
name: "update cells, NULL tablet_types",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone3"},
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, // So keep the current value of replica
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, "zone3", tabletTypes[0], vreplID),
},
{
name: "update tablet_types",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
TabletSelectionPreference: tabletmanagerdatapb.TabletSelectionPreference_INORDER,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`,
keyspace, shard, "in_order:rdonly,replica", vreplID),
},
{
name: "update tablet_types, NULL cells",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: textutil.SimulatedNullStringSlice, // So keep the current value of zone1
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, cells[0], "rdonly", vreplID),
},
{
name: "update on_ddl",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
OnDdl: binlogdatapb.OnDDLAction_EXEC,
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID),
},
{
name: "update cell,tablet_types,on_ddl",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone1", "zone2", "zone3"},
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_PRIMARY},
OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE,
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
},
{
name: "update state",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState_Stopped,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
}

for _, tt := range tests {
Expand All @@ -575,8 +593,6 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
require.NotNil(t, tt.request, "No request provided")
require.NotEqual(t, "", tt.query, "No expected query provided")

tt.request.State = binlogdatapb.VReplicationWorkflowState_Stopped

// These are the same for each RPC call.
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
Expand Down

0 comments on commit 1045dd5

Please sign in to comment.