diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index ea2c9c63a51..dfc8cd58963 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -115,6 +115,8 @@ type Stats struct { PartialQueryCacheSize *stats.CountersWithMultiLabels ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component + + DDLEventActions *stats.CountersWithMultiLabels } // RecordHeartbeat updates the time the last heartbeat from vstreamer was seen @@ -185,6 +187,7 @@ func NewStats() *Stats { bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"}) + bps.DDLEventActions = stats.NewCountersWithMultiLabels("", "", []string{"action"}) return bps } diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index 5b5b6ede24c..11f458d9541 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -527,6 +527,22 @@ func (st *vrStats) register() { } return result }) + + stats.NewCountersFuncWithMultiLabels( + "VReplicationDDLActions", + "vreplication DDL processing actions per stream", + []string{"workflow", "action"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64, len(st.controllers)) + for _, ct := range st.controllers { + for key, val := range ct.blpStats.DDLEventActions.Counts() { + result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val + } + } + return result + }) } func (st *vrStats) numControllers() int64 { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index f2cb0a96e71..307bb2d6657 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -664,6 +664,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } + vp.vr.stats.DDLEventActions.Add([]string{"ignore"}, 1) if posReached { return io.EOF } @@ -680,6 +681,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.commit(); err != nil { return err } + vp.vr.stats.DDLEventActions.Add([]string{"stop"}, 1) return io.EOF case binlogdatapb.OnDDLAction_EXEC: // It's impossible to save the position transactionally with the statement. @@ -694,6 +696,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } + vp.vr.stats.DDLEventActions.Add([]string{"exec"}, 1) if posReached { return io.EOF } @@ -706,6 +709,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } + vp.vr.stats.DDLEventActions.Add([]string{"exec_ignore"}, 1) if posReached { return io.EOF }