Skip to content

Commit

Permalink
partial backports for workflow_type, Fix typecasting issue for workfl…
Browse files Browse the repository at this point in the history
…ow types vitessio#12217 , VTAdmin: display workflow type in workflows list  vitessio#11685

Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
DeathBorn committed Apr 23, 2024
1 parent dc0d06a commit 3395d8b
Show file tree
Hide file tree
Showing 19 changed files with 104 additions and 42 deletions.
7 changes: 7 additions & 0 deletions go/sqltypes/named_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func (r RowNamedValues) ToInt64(fieldName string) (int64, error) {
return 0, ErrNoSuchField
}

func (r RowNamedValues) ToInt32(fieldName string) (int32, error) {
if v, ok := r[fieldName]; ok {
return v.ToInt32()
}
return 0, ErrNoSuchField
}

// AsInt64 returns the named field as int64, or default value if nonexistent/error
func (r RowNamedValues) AsInt64(fieldName string, def int64) int64 {
if v, err := r.ToInt64(fieldName); err == nil {
Expand Down
16 changes: 16 additions & 0 deletions go/sqltypes/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ func (v Value) Raw() []byte {
return v.val
}

// RawStr returns the internal representation of the value as a string instead
// of a byte slice. This is equivalent to calling `string(v.Raw())` but does
// not allocate.
func (v Value) RawStr() string {
return hack.String(v.val)
}

// ToBytes returns the value as MySQL would return it as []byte.
// In contrast, Raw returns the internal representation of the Value, which may not
// match MySQL's representation for newer types.
Expand Down Expand Up @@ -246,6 +253,15 @@ func (v Value) ToInt64() (int64, error) {
return strconv.ParseInt(v.ToString(), 10, 64)
}

func (v Value) ToInt32() (int32, error) {
if !v.IsIntegral() {
return 0, ErrIncompatibleTypeCast
}

i, err := strconv.ParseInt(v.RawStr(), 10, 32)
return int32(i), err
}

// ToFloat64 returns the value as MySQL would return it as a float64.
func (v Value) ToFloat64() (float64, error) {
if !IsNumber(v.typ) {
Expand Down
15 changes: 7 additions & 8 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ type VRSettings struct {
MaxTPS int64
MaxReplicationLag int64
State string
WorkflowType int32
WorkflowType binlogdatapb.VReplicationWorkflowType
WorkflowName string
}

Expand Down Expand Up @@ -611,19 +611,18 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse stop_pos column: %v", err)
}
workflowTypeTmp, err := vrRow.ToInt64("workflow_type")
workflowType, err := vrRow.ToInt32("workflow_type")
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse workflow_type column: %v", err)
}
workflowType := int32(workflowTypeTmp)

return VRSettings{
StartPos: startPos,
StopPos: stopPos,
MaxTPS: maxTPS,
MaxReplicationLag: maxReplicationLag,
State: vrRow.AsString("state", ""),
WorkflowType: workflowType,
WorkflowType: binlogdatapb.VReplicationWorkflowType(workflowType),
WorkflowName: vrRow.AsString("workflow", ""),
}, nil
}
Expand All @@ -634,19 +633,19 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi
workflowType binlogdatapb.VReplicationWorkflowType) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %v)",
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d)",
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag,
timeUpdated, BlpRunning, encodeString(dbName), int64(workflowType))
timeUpdated, BlpRunning, encodeString(dbName), workflowType)
}

// CreateVReplicationState returns a statement to create a stopped vreplication.
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position, state string, dbName string,
workflowType binlogdatapb.VReplicationWorkflowType) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %v)",
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d)",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled,
throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state, encodeString(dbName), int64(workflowType))
throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state, encodeString(dbName), workflowType)
}

// GenerateUpdatePos returns a statement to update a value in the
Expand Down
32 changes: 17 additions & 15 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
db_name,
time_updated,
transaction_timestamp,
message
message,
workflow_type
FROM
_vt.vreplication
%s`,
Expand Down Expand Up @@ -317,7 +318,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
// - sourceShardsByWorkflow[workflow.Name] != nil
// - targetShardsByWorkflow[workflow.Name] != nil
// - workflow.ShardStatuses != nil
scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) error {
scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row sqltypes.RowNamedValues, tablet *topo.TabletInfo) error {
span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow")
defer span.Finish()

Expand All @@ -327,33 +328,33 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
span.Annotate("workflow", workflow.Name)
span.Annotate("tablet_alias", tablet.AliasString())

id, err := evalengine.ToInt64(row[0])
id, err := evalengine.ToInt64(row["id"])
if err != nil {
return err
}

var bls binlogdatapb.BinlogSource
if err := prototext.Unmarshal(row[2].ToBytes(), &bls); err != nil {
if err := prototext.Unmarshal(row["source"].ToBytes(), &bls); err != nil {
return err
}

pos := row[3].ToString()
stopPos := row[4].ToString()
state := row[6].ToString()
dbName := row[7].ToString()
pos := row["pos"].ToString()
stopPos := row["stop_pos"].ToString()
state := row["state"].ToString()
dbName := row["db_name"].ToString()

timeUpdatedSeconds, err := evalengine.ToInt64(row[8])
timeUpdatedSeconds, err := evalengine.ToInt64(row["time_updated"])
if err != nil {
return err
}

transactionTimeSeconds, err := evalengine.ToInt64(row[9])
transactionTimeSeconds, err := evalengine.ToInt64(row["transaction_timestamp"])
if err != nil {
return err
}

message := row[10].ToString()

message := row["message"].ToString()
workflowType, _ := row["workflow_type"].ToInt32()
stream := &vtctldatapb.Workflow_Stream{
Id: id,
Shard: tablet.Shard,
Expand All @@ -371,6 +372,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
},
Message: message,
}
workflow.WorkflowType = binlogdatapb.VReplicationWorkflowType_name[workflowType]

stream.CopyStates, err = s.getWorkflowCopyStates(ctx, tablet, id)
if err != nil {
Expand Down Expand Up @@ -466,8 +468,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
// to a workflow we're already aggregating, or if it's a workflow we
// haven't seen yet for that shard primary. We use the workflow name to
// dedupe for this.
for _, row := range qr.Rows {
workflowName := row[1].ToString()
for _, row := range qr.Named().Rows {
workflowName := row["workflow"].ToString()
workflow, ok := workflowsMap[workflowName]
if !ok {
workflow = &vtctldatapb.Workflow{
Expand All @@ -481,7 +483,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
}

scanWorkflowWg.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) {
go func(ctx context.Context, workflow *vtctldatapb.Workflow, row sqltypes.RowNamedValues, tablet *topo.TabletInfo) {
defer scanWorkflowWg.Done()
if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
scanWorkflowErrors.RecordError(err)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/stream_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (sm *StreamMigrator) readTabletStreams(ctx context.Context, ti *topo.Tablet
ti.Keyspace, ti.Shard, id)
}

workflowType, err := row["workflow_type"].ToInt64()
workflowType, err := row["workflow_type"].ToInt32()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func (sm *StreamMigrator) createTargetStreams(ctx context.Context, tmpl []*VRepl
}

ig.AddRow(vrs.Workflow, vrs.BinlogSource, mysql.EncodePosition(vrs.Position), "", "",
int64(vrs.WorkflowType))
vrs.WorkflowType)
}

_, err := sm.ts.VReplicationExec(ctx, target.GetPrimary().GetAlias(), ig.String())
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag
}

func getVReplicationWorkflowType(row sqltypes.RowNamedValues) binlogdatapb.VReplicationWorkflowType {
i, _ := row["workflow_type"].ToInt64()
i, _ := row["workflow_type"].ToInt32()
return binlogdatapb.VReplicationWorkflowType(i)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (v *VRepl) analyze(ctx context.Context, conn *dbconnpool.DBConnection) erro
func (v *VRepl) generateInsertStatement(ctx context.Context) (string, error) {
ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, v.dbName)
ig.AddRow(v.workflow, v.bls, v.pos, "", "in_order:REPLICA,MASTER",
int64(binlogdatapb.VReplicationWorkflowType_OnlineDDL))
binlogdatapb.VReplicationWorkflowType_OnlineDDL)

return ig.String(), nil
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,10 @@ func (vre *Engine) transitionJournal(je *journalEvent) {
bls := proto.Clone(vre.controllers[refid].source).(*binlogdatapb.BinlogSource)
bls.Keyspace, bls.Shard = sgtid.Keyspace, sgtid.Shard

workflowType, _ := strconv.ParseInt(params["workflow_type"], 10, 64)
workflowType, _ := strconv.ParseInt(params["workflow_type"], 10, 32)
ig := NewInsertGenerator(binlogplayer.BlpRunning, vre.dbName)
ig.AddRow(params["workflow"], bls, sgtid.Gtid, params["cell"], params["tablet_types"], workflowType)
ig.AddRow(params["workflow"], bls, sgtid.Gtid, params["cell"], params["tablet_types"],
binlogdatapb.VReplicationWorkflowType(workflowType))
qr, err := withDDL.Exec(vre.ctx, ig.String(), dbClient.ExecuteFetch)
if err != nil {
log.Errorf("transitionJournal: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/insert_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func NewInsertGenerator(state, dbname string) *InsertGenerator {

// AddRow adds a row to the insert statement.
func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string,
workflowType int64) {
fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %v)",
workflowType binlogdatapb.VReplicationWorkflowType) {
fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %d)",
ig.prefix,
encodeString(workflow),
encodeString(bls.String()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import (
func TestInsertGenerator(t *testing.T) {
ig := NewInsertGenerator(binlogplayer.BlpStopped, "a")
ig.now = 111
ig.AddRow("b", &binlogdatapb.BinlogSource{Keyspace: "c"}, "d", "e", "f", int64(binlogdatapb.VReplicationWorkflowType_Materialize))
ig.AddRow("b", &binlogdatapb.BinlogSource{Keyspace: "c"}, "d", "e", "f", binlogdatapb.VReplicationWorkflowType_Materialize)
want := `insert into _vt.vreplication(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name) values ` +
`('b', 'keyspace:\"c\"', 'd', 9223372036854775807, 9223372036854775807, 'e', 'f', 111, 0, 'Stopped', 'a', 0)`
assert.Equal(t, ig.String(), want)

ig.AddRow("g", &binlogdatapb.BinlogSource{Keyspace: "h"}, "i", "j", "k", int64(binlogdatapb.VReplicationWorkflowType_Reshard))
ig.AddRow("g", &binlogdatapb.BinlogSource{Keyspace: "h"}, "i", "j", "k", binlogdatapb.VReplicationWorkflowType_Reshard)
want += `, ('g', 'keyspace:\"h\"', 'i', 9223372036854775807, 9223372036854775807, 'j', 'k', 111, 0, 'Stopped', 'a', 4)`
assert.Equal(t, ig.String(), want)
}
5 changes: 3 additions & 2 deletions go/vt/worker/split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,8 @@ func (scw *SplitCloneWorker) startCloningData(ctx context.Context, state StatusW
}

// copy phase:
// - copy the data from source tablets to destination masters (with replication on)
// - copy the data from source tablets to destination masters (with replication on)
//
// Assumes that the schema has already been created on each destination tablet
// (probably from vtctl's CopySchemaShard)
func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) error {
Expand Down Expand Up @@ -1289,7 +1290,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error {
bls.Tables = scw.tables
}
// TODO(mberlin): Fill in scw.maxReplicationLag once the adapative throttler is enabled by default.
qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), dbName))
qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), dbName, binlogdatapb.VReplicationWorkflowType_Reshard))
if err != nil {
handleError(vterrors.Wrap(err, "vreplication queries failed"))
cancel()
Expand Down
11 changes: 10 additions & 1 deletion go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,8 +1101,17 @@ func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.S

bls.Filter.Rules = append(bls.Filter.Rules, rule)
}
var workflowType binlogdatapb.VReplicationWorkflowType
switch mz.ms.MaterializationIntent {
case vtctldatapb.MaterializationIntent_CUSTOM:
workflowType = binlogdatapb.VReplicationWorkflowType_Materialize
case vtctldatapb.MaterializationIntent_MOVETABLES:
workflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
case vtctldatapb.MaterializationIntent_CREATELOOKUPINDEX:
workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex
}
ig.AddRow(mz.ms.Workflow, bls, "", mz.ms.Cell, mz.ms.TabletTypes,
int64(mz.ms.MaterializationIntent))
workflowType)
}
return ig.String(), nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,13 @@ func (rs *resharder) createStreams(ctx context.Context) error {
StopAfterCopy: rs.stopAfterCopy,
}
ig.AddRow(rs.workflow, bls, "", rs.cell, rs.tabletTypes,
int64(binlogdatapb.VReplicationWorkflowType_Reshard))
binlogdatapb.VReplicationWorkflowType_Reshard)
}

for _, rstream := range rs.refStreams {
//todo: fix based on original stream
ig.AddRow(rstream.workflow, rstream.bls, "", rstream.cell, rstream.tabletTypes,
int64(binlogdatapb.VReplicationWorkflowType_Reshard))
binlogdatapb.VReplicationWorkflowType_Reshard)
}
query := ig.String()
if _, err := rs.wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, query); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -395,7 +396,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
var err error
var id, timeUpdated, transactionTimestamp int64
var state, dbName, pos, stopPos, message string
var workflowType int64
var workflowType int32
var bls binlogdatapb.BinlogSource
var mpos mysql.Position

Expand Down Expand Up @@ -428,10 +429,11 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
return nil, "", err
}
message = row[9].ToString()
workflowType, err = evalengine.ToInt64(row[10])
workflowTypeTmp, err := strconv.ParseInt(row[10].RawStr(), 10, 32)
if err != nil {
return nil, "", err
}
workflowType = int32(workflowTypeTmp)
status := &ReplicationStatus{
Shard: master.Shard,
Tablet: master.AliasString(),
Expand All @@ -444,7 +446,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
TransactionTimestamp: transactionTimestamp,
TimeUpdated: timeUpdated,
Message: message,
WorkflowType: binlogdatapb.VReplicationWorkflowType_name[int32(workflowType)],
WorkflowType: binlogdatapb.VReplicationWorkflowType_name[workflowType],
}
status.CopyState, err = wr.getCopyState(ctx, master, id)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion misc/git/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ if [ -z "$GOPATH" ]; then
fi

for hook in $GIT_DIR/../misc/git/hooks/*; do
$hook
echo $hook
done
1 change: 1 addition & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ message Workflow {
ReplicationLocation target = 3;
int64 max_v_replication_lag = 4;
map<string, ShardStream> shard_streams = 5;
string workflow_type = 6;

message ReplicationLocation {
string keyspace = 1;
Expand Down
9 changes: 9 additions & 0 deletions web/vtadmin/src/components/routes/Workflows.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export const Workflows = () => {
target: workflow.workflow?.target?.keyspace,
targetShards: workflow.workflow?.target?.shards,
timeUpdated: getTimeUpdated(workflow),
workflowType: workflow.workflow?.workflow_type,
}));
const filtered = filterNouns(filter, mapped);
return orderBy(filtered, ['name', 'clusterName', 'source', 'target']);
Expand All @@ -67,6 +68,14 @@ export const Workflows = () => {
<tr key={idx}>
<DataCell>
<div className="font-weight-bold">{href ? <Link to={href}>{row.name}</Link> : row.name}</div>
{row.workflowType && (
<div className="text-secondary text-success-200">
{row.workflowType}
{row.workflowSubType && row.workflowSubType !== 'None' && (
<span className="text-sm">{' (' + row.workflowSubType + ')'}</span>
)}
</div>
)}
<div className="font-size-small text-color-secondary">{row.clusterName}</div>
</DataCell>
<DataCell>
Expand Down
Loading

0 comments on commit 3395d8b

Please sign in to comment.