Skip to content

Commit

Permalink
VReplication: Add stream DDL processing stats (#15769)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Apr 25, 2024
1 parent 96b1419 commit 716fc12
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 27 deletions.
34 changes: 28 additions & 6 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,25 @@ func TestVReplicationDDLHandling(t *testing.T) {
checkColQueryTarget := fmt.Sprintf("select count(column_name) from information_schema.columns where table_schema='vt_%s' and table_name='%s' and column_name='%s'",
targetKs, table, newColumn)

// expectedAction is the specific action, e.g. ignore, that should have a count of 1. All other
// actions should have a count of 0. id is the stream ID to check.
checkOnDDLStats := func(expectedAction binlogdatapb.OnDDLAction, id int) {
jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON values look like this: {"onddl_test.3.IGNORE": 1}
for _, action := range binlogdatapb.OnDDLAction_name {
count := gjson.Get(jsVal, fmt.Sprintf(`%s\.%d\.%s`, workflow, id, action)).Int()
expectedCount := int64(0)
if action == expectedAction.String() {
expectedCount = 1
}
require.Equal(t, expectedCount, count, "expected %s stat counter of %d but got %d, full value: %s", action, expectedCount, count, jsVal)
}
}

// Test IGNORE behavior
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=IGNORE")
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_IGNORE.String())
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Add new col on source
Expand All @@ -170,8 +187,10 @@ func TestVReplicationDDLHandling(t *testing.T) {
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
// Confirm new col does exist on source
waitForQueryResult(t, vtgateConn, sourceKs, checkColQuerySource, "[[INT64(1)]]")
// Also test Cancel --keep_routing_rules
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep_routing_rules")
// Confirm that we updated the stats on the target tablet as expected.
checkOnDDLStats(binlogdatapb.OnDDLAction_IGNORE, 1)
// Also test Cancel --keep-routing-rules
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep-routing-rules")
// Confirm that the routing rules were NOT cleared
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
Expand All @@ -188,7 +207,7 @@ func TestVReplicationDDLHandling(t *testing.T) {
require.NoError(t, err, "error executing %q: %v", dropColDDL, err)

// Test STOP behavior (new col now exists nowhere)
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=STOP")
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_STOP.String())
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Add new col on the source
Expand All @@ -198,10 +217,12 @@ func TestVReplicationDDLHandling(t *testing.T) {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String(), fmt.Sprintf("Message==Stopped at DDL %s", addColDDL))
// Confirm that the target does not have new col
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
// Confirm that we updated the stats on the target tablet as expected.
checkOnDDLStats(binlogdatapb.OnDDLAction_STOP, 2)
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table)

// Test EXEC behavior (new col now exists on source)
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=EXEC")
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_EXEC.String())
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Confirm target has new col from copy phase
Expand All @@ -213,7 +234,8 @@ func TestVReplicationDDLHandling(t *testing.T) {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
// Confirm new col was dropped on target
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table)
// Confirm that we updated the stats on the target tablet as expected.
checkOnDDLStats(binlogdatapb.OnDDLAction_EXEC, 3)
}

// TestVreplicationCopyThrottling tests the logic that is used
Expand Down
13 changes: 8 additions & 5 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type Stats struct {
PartialQueryCacheSize *stats.CountersWithMultiLabels

ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component

DDLEventActions *stats.CountersWithSingleLabel
}

// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
Expand Down Expand Up @@ -171,20 +173,21 @@ func NewStats() *Stats {
bps.ReplicationLagSeconds.Store(math.MaxInt64)
bps.PhaseTimings = stats.NewTimings("", "", "Phase")
bps.QueryTimings = stats.NewTimings("", "", "Phase")
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase")
bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement")
bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement")
bps.CopyRowCount = stats.NewCounter("", "")
bps.CopyLoopCount = stats.NewCounter("", "")
bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement")
bps.VReplicationLags = stats.NewTimings("", "", "")
bps.VReplicationLagRates = stats.NewRates("", bps.VReplicationLags, 15*60/5, 5*time.Second)
bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table", "")
bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table")
bps.TableCopyTimings = stats.NewTimings("", "", "Table")
bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"})
bps.DDLEventActions = stats.NewCountersWithSingleLabel("", "", "action")
return bps
}

Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ type controller struct {
externalCluster string // For Mount+Migrate

// Information used in vdiff stats/metrics.
Errors *stats.CountersWithMultiLabels
TableDiffRowCounts *stats.CountersWithMultiLabels
Errors *stats.CountersWithSingleLabel
TableDiffRowCounts *stats.CountersWithSingleLabel
TableDiffPhaseTimings *stats.Timings
}

Expand All @@ -100,8 +100,8 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac
tmc: vde.tmClientFactory(),
sources: make(map[string]*migrationSource),
options: options,
Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}),
TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}),
Errors: stats.NewCountersWithSingleLabel("", "", "Error"),
TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"),
TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"),
}
ctx, ct.cancel = context.WithCancel(ctx)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type vdiffStats struct {
func (vds *vdiffStats) register() {
globalStats.Count = stats.NewGauge("", "")
globalStats.ErrorCount = stats.NewCounter("", "")
globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "")
globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table")
globalStats.RowsDiffedCount = stats.NewCounter("", "")

stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vdiff/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func TestVDiffStats(t *testing.T) {
testStats := &vdiffStats{
ErrorCount: stats.NewCounter("", ""),
RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table", ""),
RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table"),
RowsDiffedCount: stats.NewCounter("", ""),
}
id := int64(1)
Expand All @@ -41,8 +41,8 @@ func TestVDiffStats(t *testing.T) {
workflow: "testwf",
workflowType: binlogdatapb.VReplicationWorkflowType_MoveTables,
uuid: uuid.New().String(),
Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}),
TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}),
Errors: stats.NewCountersWithSingleLabel("", "", "Error"),
TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"),
TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"),
},
}
Expand All @@ -65,7 +65,7 @@ func TestVDiffStats(t *testing.T) {
testStats.ErrorCount.Set(11)
require.Equal(t, int64(11), testStats.ErrorCount.Get())

testStats.controllers[id].Errors.Add([]string{"test error"}, int64(12))
testStats.controllers[id].Errors.Add("test error", int64(12))
require.Equal(t, int64(12), testStats.controllers[id].Errors.Counts()["test error"])

testStats.RestartedTableDiffs.Add("t1", int64(5))
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D
if _, err := dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
td.wd.ct.TableDiffRowCounts.Add([]string{td.table.Name}, dr.ProcessedRows)
td.wd.ct.TableDiffRowCounts.Add(td.table.Name, dr.ProcessedRows)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (wd *workflowDiffer) diff(ctx context.Context) (err error) {
defer func() {
if err != nil {
globalStats.ErrorCount.Add(1)
wd.ct.Errors.Add([]string{err.Error()}, 1)
wd.ct.Errors.Add(err.Error(), 1)
}
}()
dbClient := wd.ct.dbClientFactory()
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,22 @@ func (st *vrStats) register() {
}
return result
})

stats.NewCountersFuncWithMultiLabels(
"VReplicationDDLActions",
"vreplication DDL processing actions per stream",
[]string{"workflow", "action"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
for key, val := range ct.blpStats.DDLEventActions.Counts() {
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val
}
}
return result
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
18 changes: 13 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import (

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/proto/binlogdata"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -93,7 +92,7 @@ func TestStatusHtml(t *testing.T) {
testStats.controllers = map[int32]*controller{
1: {
id: 1,
source: &binlogdata.BinlogSource{
source: &binlogdatapb.BinlogSource{
Keyspace: "ks",
Shard: "0",
},
Expand All @@ -103,7 +102,7 @@ func TestStatusHtml(t *testing.T) {
},
2: {
id: 2,
source: &binlogdata.BinlogSource{
source: &binlogdatapb.BinlogSource{
Keyspace: "ks",
Shard: "1",
},
Expand Down Expand Up @@ -140,7 +139,7 @@ func TestVReplicationStats(t *testing.T) {
testStats.controllers = map[int32]*controller{
1: {
id: 1,
source: &binlogdata.BinlogSource{
source: &binlogdatapb.BinlogSource{
Keyspace: "ks",
Shard: "0",
},
Expand Down Expand Up @@ -195,6 +194,15 @@ func TestVReplicationStats(t *testing.T) {
require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"])
require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"])

blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_IGNORE.String(), 4)
blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC.String(), 3)
blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), 2)
blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_STOP.String(), 1)
require.Equal(t, int64(4), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_IGNORE.String()])
require.Equal(t, int64(3), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_EXEC.String()])
require.Equal(t, int64(2), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_EXEC_IGNORE.String()])
require.Equal(t, int64(1), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_STOP.String()])

var tm int64 = 1234567890
blpStats.RecordHeartbeat(tm)
require.Equal(t, tm, blpStats.Heartbeat())
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
log.Errorf("internal error: vplayer is in a transaction on event: %v", event)
return fmt.Errorf("internal error: vplayer is in a transaction on event: %v", event)
}
vp.vr.stats.DDLEventActions.Add(vp.vr.source.OnDdl.String(), 1) // Record the DDL handling
switch vp.vr.source.OnDdl {
case binlogdatapb.OnDDLAction_IGNORE:
// We still have to update the position.
Expand Down

0 comments on commit 716fc12

Please sign in to comment.