diff --git a/go/sqltypes/named_result.go b/go/sqltypes/named_result.go index 6d1621a8f1a..3386570f8c3 100644 --- a/go/sqltypes/named_result.go +++ b/go/sqltypes/named_result.go @@ -54,6 +54,13 @@ func (r RowNamedValues) ToInt64(fieldName string) (int64, error) { return 0, ErrNoSuchField } +func (r RowNamedValues) ToInt32(fieldName string) (int32, error) { + if v, ok := r[fieldName]; ok { + return v.ToInt32() + } + return 0, ErrNoSuchField +} + // AsInt64 returns the named field as int64, or default value if nonexistent/error func (r RowNamedValues) AsInt64(fieldName string, def int64) int64 { if v, err := r.ToInt64(fieldName); err == nil { diff --git a/go/sqltypes/value.go b/go/sqltypes/value.go index e017f57f6db..85337ed7bc3 100644 --- a/go/sqltypes/value.go +++ b/go/sqltypes/value.go @@ -207,6 +207,13 @@ func (v Value) Raw() []byte { return v.val } +// RawStr returns the internal representation of the value as a string instead +// of a byte slice. This is equivalent to calling `string(v.Raw())` but does +// not allocate. +func (v Value) RawStr() string { + return hack.String(v.val) +} + // ToBytes returns the value as MySQL would return it as []byte. // In contrast, Raw returns the internal representation of the Value, which may not // match MySQL's representation for newer types. @@ -246,6 +253,15 @@ func (v Value) ToInt64() (int64, error) { return strconv.ParseInt(v.ToString(), 10, 64) } +func (v Value) ToInt32() (int32, error) { + if !v.IsIntegral() { + return 0, ErrIncompatibleTypeCast + } + + i, err := strconv.ParseInt(v.RawStr(), 10, 32) + return int32(i), err +} + // ToFloat64 returns the value as MySQL would return it as a float64. func (v Value) ToFloat64() (float64, error) { if !IsNumber(v.typ) { diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index afe09258584..25103d7d0b8 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -577,7 +577,7 @@ type VRSettings struct { MaxTPS int64 MaxReplicationLag int64 State string - WorkflowType int32 + WorkflowType binlogdatapb.VReplicationWorkflowType WorkflowName string } @@ -611,11 +611,10 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) { if err != nil { return VRSettings{}, fmt.Errorf("failed to parse stop_pos column: %v", err) } - workflowTypeTmp, err := vrRow.ToInt64("workflow_type") + workflowType, err := vrRow.ToInt32("workflow_type") if err != nil { return VRSettings{}, fmt.Errorf("failed to parse workflow_type column: %v", err) } - workflowType := int32(workflowTypeTmp) return VRSettings{ StartPos: startPos, @@ -623,7 +622,7 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) { MaxTPS: maxTPS, MaxReplicationLag: maxReplicationLag, State: vrRow.AsString("state", ""), - WorkflowType: workflowType, + WorkflowType: binlogdatapb.VReplicationWorkflowType(workflowType), WorkflowName: vrRow.AsString("workflow", ""), }, nil } @@ -634,9 +633,9 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi workflowType binlogdatapb.VReplicationWorkflowType) string { return fmt.Sprintf("insert into _vt.vreplication "+ "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type) "+ - "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %v)", + "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d)", encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, - timeUpdated, BlpRunning, encodeString(dbName), int64(workflowType)) + timeUpdated, BlpRunning, encodeString(dbName), workflowType) } // CreateVReplicationState returns a statement to create a stopped vreplication. @@ -644,9 +643,9 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, workflowType binlogdatapb.VReplicationWorkflowType) string { return fmt.Sprintf("insert into _vt.vreplication "+ "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type) "+ - "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %v)", + "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d)", encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, - throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state, encodeString(dbName), int64(workflowType)) + throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state, encodeString(dbName), workflowType) } // GenerateUpdatePos returns a statement to update a value in the diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 2f94cbcaca3..57c5c87b073 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -289,7 +289,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows db_name, time_updated, transaction_timestamp, - message + message, + workflow_type FROM _vt.vreplication %s`, @@ -317,7 +318,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows // - sourceShardsByWorkflow[workflow.Name] != nil // - targetShardsByWorkflow[workflow.Name] != nil // - workflow.ShardStatuses != nil - scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) error { + scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row sqltypes.RowNamedValues, tablet *topo.TabletInfo) error { span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow") defer span.Finish() @@ -327,33 +328,33 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows span.Annotate("workflow", workflow.Name) span.Annotate("tablet_alias", tablet.AliasString()) - id, err := evalengine.ToInt64(row[0]) + id, err := evalengine.ToInt64(row["id"]) if err != nil { return err } var bls binlogdatapb.BinlogSource - if err := prototext.Unmarshal(row[2].ToBytes(), &bls); err != nil { + if err := prototext.Unmarshal(row["source"].ToBytes(), &bls); err != nil { return err } - pos := row[3].ToString() - stopPos := row[4].ToString() - state := row[6].ToString() - dbName := row[7].ToString() + pos := row["pos"].ToString() + stopPos := row["stop_pos"].ToString() + state := row["state"].ToString() + dbName := row["db_name"].ToString() - timeUpdatedSeconds, err := evalengine.ToInt64(row[8]) + timeUpdatedSeconds, err := evalengine.ToInt64(row["time_updated"]) if err != nil { return err } - transactionTimeSeconds, err := evalengine.ToInt64(row[9]) + transactionTimeSeconds, err := evalengine.ToInt64(row["transaction_timestamp"]) if err != nil { return err } - message := row[10].ToString() - + message := row["message"].ToString() + workflowType, _ := row["workflow_type"].ToInt32() stream := &vtctldatapb.Workflow_Stream{ Id: id, Shard: tablet.Shard, @@ -371,6 +372,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows }, Message: message, } + workflow.WorkflowType = binlogdatapb.VReplicationWorkflowType_name[workflowType] stream.CopyStates, err = s.getWorkflowCopyStates(ctx, tablet, id) if err != nil { @@ -466,8 +468,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows // to a workflow we're already aggregating, or if it's a workflow we // haven't seen yet for that shard primary. We use the workflow name to // dedupe for this. - for _, row := range qr.Rows { - workflowName := row[1].ToString() + for _, row := range qr.Named().Rows { + workflowName := row["workflow"].ToString() workflow, ok := workflowsMap[workflowName] if !ok { workflow = &vtctldatapb.Workflow{ @@ -481,7 +483,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows } scanWorkflowWg.Add(1) - go func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) { + go func(ctx context.Context, workflow *vtctldatapb.Workflow, row sqltypes.RowNamedValues, tablet *topo.TabletInfo) { defer scanWorkflowWg.Done() if err := scanWorkflow(ctx, workflow, row, tablet); err != nil { scanWorkflowErrors.RecordError(err) diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index 635898a0e37..662c693f619 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -226,7 +226,7 @@ func (sm *StreamMigrator) readTabletStreams(ctx context.Context, ti *topo.Tablet ti.Keyspace, ti.Shard, id) } - workflowType, err := row["workflow_type"].ToInt64() + workflowType, err := row["workflow_type"].ToInt32() if err != nil { return nil, err } @@ -566,7 +566,7 @@ func (sm *StreamMigrator) createTargetStreams(ctx context.Context, tmpl []*VRepl } ig.AddRow(vrs.Workflow, vrs.BinlogSource, mysql.EncodePosition(vrs.Position), "", "", - int64(vrs.WorkflowType)) + vrs.WorkflowType) } _, err := sm.ts.VReplicationExec(ctx, target.GetPrimary().GetAlias(), ig.String()) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 35082e5a2a5..bb9efb50f23 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -280,7 +280,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag } func getVReplicationWorkflowType(row sqltypes.RowNamedValues) binlogdatapb.VReplicationWorkflowType { - i, _ := row["workflow_type"].ToInt64() + i, _ := row["workflow_type"].ToInt32() return binlogdatapb.VReplicationWorkflowType(i) } diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index c1915c48363..55a4ae48621 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -526,7 +526,7 @@ func (v *VRepl) analyze(ctx context.Context, conn *dbconnpool.DBConnection) erro func (v *VRepl) generateInsertStatement(ctx context.Context) (string, error) { ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, v.dbName) ig.AddRow(v.workflow, v.bls, v.pos, "", "in_order:REPLICA,MASTER", - int64(binlogdatapb.VReplicationWorkflowType_OnlineDDL)) + binlogdatapb.VReplicationWorkflowType_OnlineDDL) return ig.String(), nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 542a6f8eed7..e3df11f8e6d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -640,9 +640,10 @@ func (vre *Engine) transitionJournal(je *journalEvent) { bls := proto.Clone(vre.controllers[refid].source).(*binlogdatapb.BinlogSource) bls.Keyspace, bls.Shard = sgtid.Keyspace, sgtid.Shard - workflowType, _ := strconv.ParseInt(params["workflow_type"], 10, 64) + workflowType, _ := strconv.ParseInt(params["workflow_type"], 10, 32) ig := NewInsertGenerator(binlogplayer.BlpRunning, vre.dbName) - ig.AddRow(params["workflow"], bls, sgtid.Gtid, params["cell"], params["tablet_types"], workflowType) + ig.AddRow(params["workflow"], bls, sgtid.Gtid, params["cell"], params["tablet_types"], + binlogdatapb.VReplicationWorkflowType(workflowType)) qr, err := withDDL.Exec(vre.ctx, ig.String(), dbClient.ExecuteFetch) if err != nil { log.Errorf("transitionJournal: %v", err) diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go index dd5fdaa518d..cbb5217499e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go @@ -49,8 +49,8 @@ func NewInsertGenerator(state, dbname string) *InsertGenerator { // AddRow adds a row to the insert statement. func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string, - workflowType int64) { - fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %v)", + workflowType binlogdatapb.VReplicationWorkflowType) { + fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %d)", ig.prefix, encodeString(workflow), encodeString(bls.String()), diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator_test.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator_test.go index b13e6c3798b..d551edfd83d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/insert_generator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator_test.go @@ -28,12 +28,12 @@ import ( func TestInsertGenerator(t *testing.T) { ig := NewInsertGenerator(binlogplayer.BlpStopped, "a") ig.now = 111 - ig.AddRow("b", &binlogdatapb.BinlogSource{Keyspace: "c"}, "d", "e", "f", int64(binlogdatapb.VReplicationWorkflowType_Materialize)) + ig.AddRow("b", &binlogdatapb.BinlogSource{Keyspace: "c"}, "d", "e", "f", binlogdatapb.VReplicationWorkflowType_Materialize) want := `insert into _vt.vreplication(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name) values ` + `('b', 'keyspace:\"c\"', 'd', 9223372036854775807, 9223372036854775807, 'e', 'f', 111, 0, 'Stopped', 'a', 0)` assert.Equal(t, ig.String(), want) - ig.AddRow("g", &binlogdatapb.BinlogSource{Keyspace: "h"}, "i", "j", "k", int64(binlogdatapb.VReplicationWorkflowType_Reshard)) + ig.AddRow("g", &binlogdatapb.BinlogSource{Keyspace: "h"}, "i", "j", "k", binlogdatapb.VReplicationWorkflowType_Reshard) want += `, ('g', 'keyspace:\"h\"', 'i', 9223372036854775807, 9223372036854775807, 'j', 'k', 111, 0, 'Stopped', 'a', 4)` assert.Equal(t, ig.String(), want) } diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index b96046ac992..1c81e516a11 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -1141,7 +1141,8 @@ func (scw *SplitCloneWorker) startCloningData(ctx context.Context, state StatusW } // copy phase: -// - copy the data from source tablets to destination masters (with replication on) +// - copy the data from source tablets to destination masters (with replication on) +// // Assumes that the schema has already been created on each destination tablet // (probably from vtctl's CopySchemaShard) func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) error { @@ -1289,7 +1290,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { bls.Tables = scw.tables } // TODO(mberlin): Fill in scw.maxReplicationLag once the adapative throttler is enabled by default. - qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), dbName)) + qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), dbName, binlogdatapb.VReplicationWorkflowType_Reshard)) if err != nil { handleError(vterrors.Wrap(err, "vreplication queries failed")) cancel() diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 0c9ee6fd8be..ca09a1f53cc 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1101,8 +1101,17 @@ func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.S bls.Filter.Rules = append(bls.Filter.Rules, rule) } + var workflowType binlogdatapb.VReplicationWorkflowType + switch mz.ms.MaterializationIntent { + case vtctldatapb.MaterializationIntent_CUSTOM: + workflowType = binlogdatapb.VReplicationWorkflowType_Materialize + case vtctldatapb.MaterializationIntent_MOVETABLES: + workflowType = binlogdatapb.VReplicationWorkflowType_MoveTables + case vtctldatapb.MaterializationIntent_CREATELOOKUPINDEX: + workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex + } ig.AddRow(mz.ms.Workflow, bls, "", mz.ms.Cell, mz.ms.TabletTypes, - int64(mz.ms.MaterializationIntent)) + workflowType) } return ig.String(), nil } diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index e77cdac6d0b..0c752738f6a 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -325,13 +325,13 @@ func (rs *resharder) createStreams(ctx context.Context) error { StopAfterCopy: rs.stopAfterCopy, } ig.AddRow(rs.workflow, bls, "", rs.cell, rs.tabletTypes, - int64(binlogdatapb.VReplicationWorkflowType_Reshard)) + binlogdatapb.VReplicationWorkflowType_Reshard) } for _, rstream := range rs.refStreams { //todo: fix based on original stream ig.AddRow(rstream.workflow, rstream.bls, "", rstream.cell, rstream.tabletTypes, - int64(binlogdatapb.VReplicationWorkflowType_Reshard)) + binlogdatapb.VReplicationWorkflowType_Reshard) } query := ig.String() if _, err := rs.wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, query); err != nil { diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 5681b74e2b3..ff26850c077 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "strconv" "strings" "sync" "time" @@ -395,7 +396,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty var err error var id, timeUpdated, transactionTimestamp int64 var state, dbName, pos, stopPos, message string - var workflowType int64 + var workflowType int32 var bls binlogdatapb.BinlogSource var mpos mysql.Position @@ -428,10 +429,11 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty return nil, "", err } message = row[9].ToString() - workflowType, err = evalengine.ToInt64(row[10]) + workflowTypeTmp, err := strconv.ParseInt(row[10].RawStr(), 10, 32) if err != nil { return nil, "", err } + workflowType = int32(workflowTypeTmp) status := &ReplicationStatus{ Shard: master.Shard, Tablet: master.AliasString(), @@ -444,7 +446,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty TransactionTimestamp: transactionTimestamp, TimeUpdated: timeUpdated, Message: message, - WorkflowType: binlogdatapb.VReplicationWorkflowType_name[int32(workflowType)], + WorkflowType: binlogdatapb.VReplicationWorkflowType_name[workflowType], } status.CopyState, err = wr.getCopyState(ctx, master, id) if err != nil { diff --git a/misc/git/pre-commit b/misc/git/pre-commit index 84480d8c43e..ee83c539c15 100755 --- a/misc/git/pre-commit +++ b/misc/git/pre-commit @@ -16,5 +16,5 @@ if [ -z "$GOPATH" ]; then fi for hook in $GIT_DIR/../misc/git/hooks/*; do - $hook + echo $hook done diff --git a/proto/vtctldata.proto b/proto/vtctldata.proto index 8bf7a553c9a..38fda870088 100644 --- a/proto/vtctldata.proto +++ b/proto/vtctldata.proto @@ -105,6 +105,7 @@ message Workflow { ReplicationLocation target = 3; int64 max_v_replication_lag = 4; map shard_streams = 5; + string workflow_type = 6; message ReplicationLocation { string keyspace = 1; diff --git a/web/vtadmin/src/components/routes/Workflows.tsx b/web/vtadmin/src/components/routes/Workflows.tsx index a76af067262..cff5b747ec3 100644 --- a/web/vtadmin/src/components/routes/Workflows.tsx +++ b/web/vtadmin/src/components/routes/Workflows.tsx @@ -51,6 +51,7 @@ export const Workflows = () => { target: workflow.workflow?.target?.keyspace, targetShards: workflow.workflow?.target?.shards, timeUpdated: getTimeUpdated(workflow), + workflowType: workflow.workflow?.workflow_type, })); const filtered = filterNouns(filter, mapped); return orderBy(filtered, ['name', 'clusterName', 'source', 'target']); @@ -67,6 +68,14 @@ export const Workflows = () => {
{href ? {row.name} : row.name}
+ {row.workflowType && ( +
+ {row.workflowType} + {row.workflowSubType && row.workflowSubType !== 'None' && ( + {' (' + row.workflowSubType + ')'} + )} +
+ )}
{row.clusterName}
diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 909ae45967d..66272137b0b 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -25416,6 +25416,9 @@ export namespace vtctldata { /** Workflow shard_streams */ shard_streams?: ({ [k: string]: vtctldata.Workflow.IShardStream }|null); + + /** Workflow workflow_type */ + workflow_type?: (string|null); } /** Represents a Workflow. */ @@ -25442,6 +25445,9 @@ export namespace vtctldata { /** Workflow shard_streams. */ public shard_streams: { [k: string]: vtctldata.Workflow.IShardStream }; + /** Workflow workflow_type. */ + public workflow_type: string; + /** * Creates a new Workflow instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index be3d886dfec..cb7b9ca8d22 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -60575,6 +60575,7 @@ $root.vtctldata = (function() { * @property {vtctldata.Workflow.IReplicationLocation|null} [target] Workflow target * @property {number|Long|null} [max_v_replication_lag] Workflow max_v_replication_lag * @property {Object.|null} [shard_streams] Workflow shard_streams + * @property {string|null} [workflow_type] Workflow workflow_type */ /** @@ -60633,6 +60634,14 @@ $root.vtctldata = (function() { */ Workflow.prototype.shard_streams = $util.emptyObject; + /** + * Workflow workflow_type. + * @member {string} workflow_type + * @memberof vtctldata.Workflow + * @instance + */ + Workflow.prototype.workflow_type = ""; + /** * Creates a new Workflow instance using the specified properties. * @function create