Skip to content

Commit

Permalink
Add HasVReplicationWorkflows and ReadVReplicationWorkflows RPCs
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 3, 2024
1 parent c7637e7 commit 4e0a542
Show file tree
Hide file tree
Showing 22 changed files with 3,838 additions and 939 deletions.
1,023 changes: 663 additions & 360 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

929 changes: 925 additions & 4 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Large diffs are not rendered by default.

609 changes: 317 additions & 292 deletions go/vt/proto/tabletmanagerservice/tabletmanagerservice.pb.go

Large diffs are not rendered by default.

72 changes: 72 additions & 0 deletions go/vt/proto/tabletmanagerservice/tabletmanagerservice_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,14 @@ func (itmc *internalTabletManagerClient) DeleteVReplicationWorkflow(context.Cont
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) HasVReplicationWorkflows(context.Context, *topodatapb.Tablet, *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) ReadVReplicationWorkflows(context.Context, *topodatapb.Tablet, *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) ReadVReplicationWorkflow(context.Context, *topodatapb.Tablet, *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
Expand Down
121 changes: 58 additions & 63 deletions go/vt/vtctl/workflow/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@ package workflow

import (
"context"
"errors"
"fmt"
"sync"
"time"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/schema"
Expand All @@ -37,7 +33,9 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type resharder struct {
Expand Down Expand Up @@ -132,13 +130,12 @@ func (s *Server) buildResharder(ctx context.Context, keyspace, workflow string,
func (rs *resharder) validateTargets(ctx context.Context) error {
err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
targetPrimary := rs.targetPrimaries[target.ShardName()]
query := fmt.Sprintf("select 1 from _vt.vreplication where db_name=%s", encodeString(targetPrimary.DbName()))
p3qr, err := rs.s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query)
res, err := rs.s.tmc.HasVReplicationWorkflows(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.HasVReplicationWorkflowsRequest{DbName: targetPrimary.DbName()})
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query)
return vterrors.Wrapf(err, "HasVReplicationWorkflows(%v, %s)", targetPrimary.Tablet, targetPrimary.DbName())
}
if len(p3qr.Rows) != 0 {
return errors.New("some streams already exist in the target shards, please clean them up and retry the command")
if res.Has {
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "some streams already exist in the target shards, please clean them up and retry the command")
}
return nil
})
Expand All @@ -150,66 +147,61 @@ func (rs *resharder) readRefStreams(ctx context.Context) error {
err := rs.forAll(rs.sourceShards, func(source *topo.ShardInfo) error {
sourcePrimary := rs.sourcePrimaries[source.ShardName()]

query := fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name=%s and message != 'FROZEN'", encodeString(sourcePrimary.DbName()))
p3qr, err := rs.s.tmc.VReplicationExec(ctx, sourcePrimary.Tablet, query)
req := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{
DbName: sourcePrimary.DbName(),
ExcludeFrozen: true,
}
res, err := rs.s.tmc.ReadVReplicationWorkflows(ctx, sourcePrimary.Tablet, req)
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", sourcePrimary.Tablet, query)
return vterrors.Wrapf(err, "ReadVReplicationWorkflows(%v, %+v)", sourcePrimary.Tablet, req)
}
qr := sqltypes.Proto3ToResult(p3qr)

mu.Lock()
defer mu.Unlock()
for _, workflow := range res.Workflows {
mu.Lock()
defer mu.Unlock()

mustCreate := false
var ref map[string]bool
if rs.refStreams == nil {
rs.refStreams = make(map[string]*refStream)
mustCreate = true
} else {
// Copy the ref streams for comparison.
ref = make(map[string]bool, len(rs.refStreams))
for k := range rs.refStreams {
ref[k] = true
mustCreate := false
var ref map[string]bool
if rs.refStreams == nil {
rs.refStreams = make(map[string]*refStream)
mustCreate = true
} else {
// Copy the ref streams for comparison.
ref = make(map[string]bool, len(rs.refStreams))
for k := range rs.refStreams {
ref[k] = true
}
}
}
for _, row := range qr.Rows {

workflow := row[0].ToString()
if workflow == "" {
if workflow.Workflow == "" {
return fmt.Errorf("VReplication streams must have named workflows for migration: shard: %s:%s", source.Keyspace(), source.ShardName())
}
var bls binlogdatapb.BinlogSource
rowBytes, err := row[1].ToBytes()
if err != nil {
return err
}
if err := prototext.Unmarshal(rowBytes, &bls); err != nil {
return vterrors.Wrapf(err, "prototext.Unmarshal: %v", row)
}
isReference, err := rs.blsIsReference(&bls)
if err != nil {
return vterrors.Wrap(err, "blsIsReference")
}
if !isReference {
continue
}
refKey := fmt.Sprintf("%s:%s:%s", workflow, bls.Keyspace, bls.Shard)
if mustCreate {
rs.refStreams[refKey] = &refStream{
workflow: workflow,
bls: &bls,
cell: row[2].ToString(),
tabletTypes: row[3].ToString(),
for _, stream := range res.Workflows[0].Streams {
bls := stream.Bls
isReference, err := rs.blsIsReference(bls)
if err != nil {
return vterrors.Wrap(err, "blsIsReference")
}
} else {
if !ref[refKey] {
return fmt.Errorf("streams are mismatched across source shards for workflow: %s", workflow)
if !isReference {
continue
}
refKey := fmt.Sprintf("%s:%s:%s", workflow, bls.Keyspace, bls.Shard)
if mustCreate {
rs.refStreams[refKey] = &refStream{
workflow: workflow.Workflow,
bls: bls,
cell: workflow.Cells,
tabletTypes: buildTabletTypesString(workflow.TabletTypes, workflow.TabletSelectionPreference),
}
} else {
if !ref[refKey] {
return fmt.Errorf("streams are mismatched across source shards for workflow: %s", workflow)
}
delete(ref, refKey)
}
delete(ref, refKey)
}
}
if len(ref) != 0 {
return fmt.Errorf("streams are mismatched across source shards: %v", ref)
if len(ref) != 0 {
return fmt.Errorf("streams are mismatched across source shards: %v", ref)
}
}
return nil
})
Expand Down Expand Up @@ -332,10 +324,13 @@ func (rs *resharder) startStreams(ctx context.Context) error {
// that we've created on the new shards as we're migrating them.
// We use the comment directive to indicate that this is intentional
// and OK.
query := fmt.Sprintf("update /*vt+ %s */ _vt.vreplication set state='Running' where db_name=%s",
vreplication.AllowUnsafeWriteCommentDirective, encodeString(targetPrimary.DbName()))
if _, err := rs.s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query)
req := &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: rs.workflow,
State: binlogdatapb.VReplicationWorkflowState_Running,
}
if _, err := rs.s.tmc.UpdateVReplicationWorkflow(ctx, targetPrimary.Tablet, req); err != nil {
return vterrors.Wrapf(err, "UpdateVReplicationWorkflow(%v, 'state='%s')",
targetPrimary.Tablet, binlogdatapb.VReplicationWorkflowState_Running.String())
}
return nil
})
Expand Down
13 changes: 6 additions & 7 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -1698,7 +1697,11 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea
} else {
log.Warningf("Streams will not be started since --auto-start is set to false")
}
return nil, nil

return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{
Keyspace: req.Keyspace,
Workflow: req.Workflow,
})
}

// VDiffCreate is part of the vtctlservicepb.VtctldServer interface.
Expand All @@ -1718,11 +1721,7 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe
span.Annotate("auto_retry", req.AutoRetry)
span.Annotate("max_diff_duration", req.MaxDiffDuration)

tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes)
if req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = discovery.InOrderHint + tabletTypesStr
}

tabletTypesStr := buildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference)
options := &tabletmanagerdatapb.VDiffOptions{
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
TabletTypes: tabletTypesStr,
Expand Down
Loading

0 comments on commit 4e0a542

Please sign in to comment.