Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTAdmin: display workflow type in workflows list #11685

Merged
Merged
2,381 changes: 1,201 additions & 1,180 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

86 changes: 86 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 21 additions & 16 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
time_updated,
transaction_timestamp,
message,
tags
tags,
workflow_type,
workflow_sub_type
FROM
_vt.vreplication
%s`,
Expand Down Expand Up @@ -322,7 +324,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
// - sourceShardsByWorkflow[workflow.Name] != nil
// - targetShardsByWorkflow[workflow.Name] != nil
// - workflow.ShardStatuses != nil
scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) error {
scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row sqltypes.RowNamedValues, tablet *topo.TabletInfo) error {
span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow")
defer span.Finish()

Expand All @@ -332,42 +334,44 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
span.Annotate("workflow", workflow.Name)
span.Annotate("tablet_alias", tablet.AliasString())

id, err := evalengine.ToInt64(row[0])
id, err := evalengine.ToInt64(row["id"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note that with named values the field names are case sensitive and we're not guaranteeing case with AS alias clauses in the SELECT right now. Should be fine since these are vitess tables and the columns do seem to be all lower case in our defined schema.

if err != nil {
return err
}

var bls binlogdatapb.BinlogSource
rowBytes, err := row[2].ToBytes()
rowBytes, err := row["source"].ToBytes()
if err != nil {
return err
}
if err := prototext.Unmarshal(rowBytes, &bls); err != nil {
return err
}

pos := row[3].ToString()
stopPos := row[4].ToString()
state := row[6].ToString()
dbName := row[7].ToString()
pos := row["pos"].ToString()
stopPos := row["stop_pos"].ToString()
state := row["state"].ToString()
dbName := row["db_name"].ToString()

timeUpdatedSeconds, err := evalengine.ToInt64(row[8])
timeUpdatedSeconds, err := evalengine.ToInt64(row["time_updated"])
if err != nil {
return err
}

transactionTimeSeconds, err := evalengine.ToInt64(row[9])
transactionTimeSeconds, err := evalengine.ToInt64(row["transaction_timestamp"])
if err != nil {
return err
}

message := row[10].ToString()
message := row["message"].ToString()

tags := row[11].ToString()
tags := row["tags"].ToString()
var tagArray []string
if tags != "" {
tagArray = strings.Split(tags, ",")
}
workflowType, _ := row["workflow_type"].ToInt64()
workflowSubType, _ := row["workflow_sub_type"].ToInt64()
stream := &vtctldatapb.Workflow_Stream{
Id: id,
Shard: tablet.Shard,
Expand All @@ -386,7 +390,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
Message: message,
Tags: tagArray,
}

workflow.WorkflowType = binlogdatapb.VReplicationWorkflowType_name[int32(workflowType)]
workflow.WorkflowSubType = binlogdatapb.VReplicationWorkflowSubType_name[int32(workflowSubType)]
stream.CopyStates, err = s.getWorkflowCopyStates(ctx, tablet, id)
if err != nil {
return err
Expand Down Expand Up @@ -481,8 +486,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
// to a workflow we're already aggregating, or if it's a workflow we
// haven't seen yet for that shard primary. We use the workflow name to
// dedupe for this.
for _, row := range qr.Rows {
workflowName := row[1].ToString()
for _, row := range qr.Named().Rows {
workflowName := row["workflow"].ToString()
workflow, ok := workflowsMap[workflowName]
if !ok {
workflow = &vtctldatapb.Workflow{
Expand All @@ -496,7 +501,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
}

scanWorkflowWg.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) {
go func(ctx context.Context, workflow *vtctldatapb.Workflow, row sqltypes.RowNamedValues, tablet *topo.TabletInfo) {
defer scanWorkflowWg.Done()
if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
scanWorkflowErrors.RecordError(err)
Expand Down
2 changes: 2 additions & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ message Workflow {
ReplicationLocation target = 3;
int64 max_v_replication_lag = 4;
map<string, ShardStream> shard_streams = 5;
string workflow_type = 6;
string workflow_sub_type = 7;

message ReplicationLocation {
string keyspace = 1;
Expand Down
2 changes: 1 addition & 1 deletion test/local_example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ done;

./202_move_tables.sh
sleep 3 # required for now

exit
./203_switch_reads.sh

./204_switch_writes.sh
Expand Down
26 changes: 16 additions & 10 deletions web/vtadmin/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions web/vtadmin/src/components/routes/Workflows.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export const Workflows = () => {
target: workflow.workflow?.target?.keyspace,
targetShards: workflow.workflow?.target?.shards,
timeUpdated: getTimeUpdated(workflow),
workflowType: workflow.workflow?.workflow_type,
workflowSubType: workflow.workflow?.workflow_sub_type,
}));
const filtered = filterNouns(filter, mapped);
return orderBy(filtered, ['name', 'clusterName', 'source', 'target']);
Expand All @@ -70,6 +72,14 @@ export const Workflows = () => {
<tr key={idx}>
<DataCell>
<div className="font-bold">{href ? <Link to={href}>{row.name}</Link> : row.name}</div>
{row.workflowType && (
<div className="text-secondary text-success-200">
{row.workflowType}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one we'll still want to handle the null case from line 57 😄

Suggested change
{row.workflowType}
{row.workflowType || 'N/A'}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one we'll still want to handle the null case from line 57 smile

I thought it would never reach this point (line 77) if row.workflowType is null, because of the check on line 75?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I missed that! You're correct. This is great!

{row.workflowSubType && row.workflowSubType !== 'None' && (
<span className="text-sm">{' (' + row.workflowSubType + ')'}</span>
)}
</div>
)}
<div className="text-sm text-secondary">{row.clusterName}</div>
</DataCell>
<DataCell>
Expand Down
12 changes: 12 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading