Skip to content

Commit

Permalink
Add VReplication DDL processing stats
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Apr 21, 2024
1 parent 14b36d0 commit cb40ac3
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type Stats struct {
PartialQueryCacheSize *stats.CountersWithMultiLabels

ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component

DDLEventActions *stats.CountersWithMultiLabels
}

// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
Expand Down Expand Up @@ -185,6 +187,7 @@ func NewStats() *Stats {
bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"})
bps.DDLEventActions = stats.NewCountersWithMultiLabels("", "", []string{"action"})
return bps
}

Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,22 @@ func (st *vrStats) register() {
}
return result
})

stats.NewCountersFuncWithMultiLabels(
"VReplicationDDLActions",
"vreplication DDL processing actions per stream",
[]string{"workflow", "action"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
for key, val := range ct.blpStats.DDLEventActions.Counts() {
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val
}
}
return result
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err != nil {
return err
}
vp.vr.stats.DDLEventActions.Add([]string{"ignore"}, 1)
if posReached {
return io.EOF
}
Expand All @@ -680,6 +681,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err := vp.commit(); err != nil {
return err
}
vp.vr.stats.DDLEventActions.Add([]string{"stop"}, 1)
return io.EOF
case binlogdatapb.OnDDLAction_EXEC:
// It's impossible to save the position transactionally with the statement.
Expand All @@ -694,6 +696,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err != nil {
return err
}
vp.vr.stats.DDLEventActions.Add([]string{"exec"}, 1)
if posReached {
return io.EOF
}
Expand All @@ -706,6 +709,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err != nil {
return err
}
vp.vr.stats.DDLEventActions.Add([]string{"exec_ignore"}, 1)
if posReached {
return io.EOF
}
Expand Down

0 comments on commit cb40ac3

Please sign in to comment.