Skip to content

Commit

Permalink
VTAdmin: display workflow type in workflows list (#11685)
Browse files Browse the repository at this point in the history
* Add workflow type and subtype to workflow output. Display workflow type on Workflows page

Signed-off-by: Rohit Nayak <[email protected]>

* Run prettier

Signed-off-by: Rohit Nayak <[email protected]>

* Remove type/subtype from stream proto since we only use it at the global Workflow level

Signed-off-by: Rohit Nayak <[email protected]>

* Use RowNamedValues for getting workflow streams

Signed-off-by: Rohit Nayak <[email protected]>

* Rollback local example hacks added for testing

Signed-off-by: Rohit Nayak <[email protected]>

* Update vtadmin protos

Signed-off-by: Rohit Nayak <[email protected]>

* Update protos

Signed-off-by: Rohit Nayak <[email protected]>

* Address review comments

Signed-off-by: Rohit Nayak <[email protected]>

* Fix lint issue

Signed-off-by: Rohit Nayak <[email protected]>

* Fix lint issue

Signed-off-by: Rohit Nayak <[email protected]>

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Nov 17, 2022
1 parent 9c0ecb5 commit 77506ae
Show file tree
Hide file tree
Showing 9 changed files with 1,393 additions and 1,207 deletions.
2,381 changes: 1,201 additions & 1,180 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

86 changes: 86 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 21 additions & 16 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
time_updated,
transaction_timestamp,
message,
tags
tags,
workflow_type,
workflow_sub_type
FROM
_vt.vreplication
%s`,
Expand Down Expand Up @@ -322,7 +324,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
// - sourceShardsByWorkflow[workflow.Name] != nil
// - targetShardsByWorkflow[workflow.Name] != nil
// - workflow.ShardStatuses != nil
scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) error {
scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row sqltypes.RowNamedValues, tablet *topo.TabletInfo) error {
span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow")
defer span.Finish()

Expand All @@ -332,42 +334,44 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
span.Annotate("workflow", workflow.Name)
span.Annotate("tablet_alias", tablet.AliasString())

id, err := evalengine.ToInt64(row[0])
id, err := evalengine.ToInt64(row["id"])
if err != nil {
return err
}

var bls binlogdatapb.BinlogSource
rowBytes, err := row[2].ToBytes()
rowBytes, err := row["source"].ToBytes()
if err != nil {
return err
}
if err := prototext.Unmarshal(rowBytes, &bls); err != nil {
return err
}

pos := row[3].ToString()
stopPos := row[4].ToString()
state := row[6].ToString()
dbName := row[7].ToString()
pos := row["pos"].ToString()
stopPos := row["stop_pos"].ToString()
state := row["state"].ToString()
dbName := row["db_name"].ToString()

timeUpdatedSeconds, err := evalengine.ToInt64(row[8])
timeUpdatedSeconds, err := evalengine.ToInt64(row["time_updated"])
if err != nil {
return err
}

transactionTimeSeconds, err := evalengine.ToInt64(row[9])
transactionTimeSeconds, err := evalengine.ToInt64(row["transaction_timestamp"])
if err != nil {
return err
}

message := row[10].ToString()
message := row["message"].ToString()

tags := row[11].ToString()
tags := row["tags"].ToString()
var tagArray []string
if tags != "" {
tagArray = strings.Split(tags, ",")
}
workflowType, _ := row["workflow_type"].ToInt64()
workflowSubType, _ := row["workflow_sub_type"].ToInt64()
stream := &vtctldatapb.Workflow_Stream{
Id: id,
Shard: tablet.Shard,
Expand All @@ -386,7 +390,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
Message: message,
Tags: tagArray,
}

workflow.WorkflowType = binlogdatapb.VReplicationWorkflowType_name[int32(workflowType)]
workflow.WorkflowSubType = binlogdatapb.VReplicationWorkflowSubType_name[int32(workflowSubType)]
stream.CopyStates, err = s.getWorkflowCopyStates(ctx, tablet, id)
if err != nil {
return err
Expand Down Expand Up @@ -481,8 +486,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
// to a workflow we're already aggregating, or if it's a workflow we
// haven't seen yet for that shard primary. We use the workflow name to
// dedupe for this.
for _, row := range qr.Rows {
workflowName := row[1].ToString()
for _, row := range qr.Named().Rows {
workflowName := row["workflow"].ToString()
workflow, ok := workflowsMap[workflowName]
if !ok {
workflow = &vtctldatapb.Workflow{
Expand All @@ -496,7 +501,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
}

scanWorkflowWg.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) {
go func(ctx context.Context, workflow *vtctldatapb.Workflow, row sqltypes.RowNamedValues, tablet *topo.TabletInfo) {
defer scanWorkflowWg.Done()
if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
scanWorkflowErrors.RecordError(err)
Expand Down
2 changes: 2 additions & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ message Workflow {
ReplicationLocation target = 3;
int64 max_v_replication_lag = 4;
map<string, ShardStream> shard_streams = 5;
string workflow_type = 6;
string workflow_sub_type = 7;

message ReplicationLocation {
string keyspace = 1;
Expand Down
2 changes: 1 addition & 1 deletion test/local_example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ done;

./202_move_tables.sh
sleep 3 # required for now

exit
./203_switch_reads.sh

./204_switch_writes.sh
Expand Down
26 changes: 16 additions & 10 deletions web/vtadmin/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions web/vtadmin/src/components/routes/Workflows.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export const Workflows = () => {
target: workflow.workflow?.target?.keyspace,
targetShards: workflow.workflow?.target?.shards,
timeUpdated: getTimeUpdated(workflow),
workflowType: workflow.workflow?.workflow_type,
workflowSubType: workflow.workflow?.workflow_sub_type,
}));
const filtered = filterNouns(filter, mapped);
return orderBy(filtered, ['name', 'clusterName', 'source', 'target']);
Expand All @@ -70,6 +72,14 @@ export const Workflows = () => {
<tr key={idx}>
<DataCell>
<div className="font-bold">{href ? <Link to={href}>{row.name}</Link> : row.name}</div>
{row.workflowType && (
<div className="text-secondary text-success-200">
{row.workflowType}
{row.workflowSubType && row.workflowSubType !== 'None' && (
<span className="text-sm">{' (' + row.workflowSubType + ')'}</span>
)}
</div>
)}
<div className="text-sm text-secondary">{row.clusterName}</div>
</DataCell>
<DataCell>
Expand Down
12 changes: 12 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 77506ae

Please sign in to comment.