diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 4c72781df29..c06489006f8 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -157,8 +157,25 @@ func TestVReplicationDDLHandling(t *testing.T) { checkColQueryTarget := fmt.Sprintf("select count(column_name) from information_schema.columns where table_schema='vt_%s' and table_name='%s' and column_name='%s'", targetKs, table, newColumn) + // expectedAction is the specific action, e.g. ignore, that should have a count of 1. All other + // actions should have a count of 0. id is the stream ID to check. + checkOnDDLStats := func(expectedAction binlogdatapb.OnDDLAction, id int) { + jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON values look like this: {"onddl_test.3.IGNORE": 1} + for _, action := range binlogdatapb.OnDDLAction_name { + count := gjson.Get(jsVal, fmt.Sprintf(`%s\.%d\.%s`, workflow, id, action)).Int() + expectedCount := int64(0) + if action == expectedAction.String() { + expectedCount = 1 + } + require.Equal(t, expectedCount, count, "expected %s stat counter of %d but got %d, full value: %s", action, expectedCount, count, jsVal) + } + } + // Test IGNORE behavior - moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=IGNORE") + moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_IGNORE.String()) // Wait until we get through the copy phase... catchup(t, targetTab, workflow, "MoveTables") // Add new col on source @@ -170,8 +187,10 @@ func TestVReplicationDDLHandling(t *testing.T) { waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") // Confirm new col does exist on source waitForQueryResult(t, vtgateConn, sourceKs, checkColQuerySource, "[[INT64(1)]]") - // Also test Cancel --keep_routing_rules - moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep_routing_rules") + // Confirm that we updated the stats on the target tablet as expected. + checkOnDDLStats(binlogdatapb.OnDDLAction_IGNORE, 1) + // Also test Cancel --keep-routing-rules + moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep-routing-rules") // Confirm that the routing rules were NOT cleared rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") require.NoError(t, err) @@ -188,7 +207,7 @@ func TestVReplicationDDLHandling(t *testing.T) { require.NoError(t, err, "error executing %q: %v", dropColDDL, err) // Test STOP behavior (new col now exists nowhere) - moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=STOP") + moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_STOP.String()) // Wait until we get through the copy phase... catchup(t, targetTab, workflow, "MoveTables") // Add new col on the source @@ -198,10 +217,12 @@ func TestVReplicationDDLHandling(t *testing.T) { waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String(), fmt.Sprintf("Message==Stopped at DDL %s", addColDDL)) // Confirm that the target does not have new col waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") + // Confirm that we updated the stats on the target tablet as expected. + checkOnDDLStats(binlogdatapb.OnDDLAction_STOP, 2) moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) // Test EXEC behavior (new col now exists on source) - moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=EXEC") + moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_EXEC.String()) // Wait until we get through the copy phase... catchup(t, targetTab, workflow, "MoveTables") // Confirm target has new col from copy phase @@ -213,7 +234,8 @@ func TestVReplicationDDLHandling(t *testing.T) { waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) // Confirm new col was dropped on target waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") - moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) + // Confirm that we updated the stats on the target tablet as expected. + checkOnDDLStats(binlogdatapb.OnDDLAction_EXEC, 3) } // TestVreplicationCopyThrottling tests the logic that is used diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index ea2c9c63a51..77f76c2997e 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -115,6 +115,8 @@ type Stats struct { PartialQueryCacheSize *stats.CountersWithMultiLabels ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component + + DDLEventActions *stats.CountersWithSingleLabel } // RecordHeartbeat updates the time the last heartbeat from vstreamer was seen @@ -171,20 +173,21 @@ func NewStats() *Stats { bps.ReplicationLagSeconds.Store(math.MaxInt64) bps.PhaseTimings = stats.NewTimings("", "", "Phase") bps.QueryTimings = stats.NewTimings("", "", "Phase") - bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "") - bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") - bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") + bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase") + bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement") + bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement") bps.CopyRowCount = stats.NewCounter("", "") bps.CopyLoopCount = stats.NewCounter("", "") bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"}) - bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") + bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement") bps.VReplicationLags = stats.NewTimings("", "", "") bps.VReplicationLagRates = stats.NewRates("", bps.VReplicationLags, 15*60/5, 5*time.Second) - bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table", "") + bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table") bps.TableCopyTimings = stats.NewTimings("", "", "Table") bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"}) + bps.DDLEventActions = stats.NewCountersWithSingleLabel("", "", "action") return bps } diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 0265e8a0a35..20c1501989e 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -78,8 +78,8 @@ type controller struct { externalCluster string // For Mount+Migrate // Information used in vdiff stats/metrics. - Errors *stats.CountersWithMultiLabels - TableDiffRowCounts *stats.CountersWithMultiLabels + Errors *stats.CountersWithSingleLabel + TableDiffRowCounts *stats.CountersWithSingleLabel TableDiffPhaseTimings *stats.Timings } @@ -100,8 +100,8 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac tmc: vde.tmClientFactory(), sources: make(map[string]*migrationSource), options: options, - Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), - TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), + Errors: stats.NewCountersWithSingleLabel("", "", "Error"), + TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"), TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), } ctx, ct.cancel = context.WithCancel(ctx) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index b68e1f86556..04cda6ac0c1 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -47,7 +47,7 @@ type vdiffStats struct { func (vds *vdiffStats) register() { globalStats.Count = stats.NewGauge("", "") globalStats.ErrorCount = stats.NewCounter("", "") - globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "") + globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table") globalStats.RowsDiffedCount = stats.NewCounter("", "") stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats_test.go b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go index b4f02d7b192..21b2caa9992 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go @@ -31,7 +31,7 @@ import ( func TestVDiffStats(t *testing.T) { testStats := &vdiffStats{ ErrorCount: stats.NewCounter("", ""), - RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table", ""), + RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table"), RowsDiffedCount: stats.NewCounter("", ""), } id := int64(1) @@ -41,8 +41,8 @@ func TestVDiffStats(t *testing.T) { workflow: "testwf", workflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, uuid: uuid.New().String(), - Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), - TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), + Errors: stats.NewCountersWithSingleLabel("", "", "Error"), + TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"), TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), }, } @@ -65,7 +65,7 @@ func TestVDiffStats(t *testing.T) { testStats.ErrorCount.Set(11) require.Equal(t, int64(11), testStats.ErrorCount.Get()) - testStats.controllers[id].Errors.Add([]string{"test error"}, int64(12)) + testStats.controllers[id].Errors.Add("test error", int64(12)) require.Equal(t, int64(12), testStats.controllers[id].Errors.Counts()["test error"]) testStats.RestartedTableDiffs.Add("t1", int64(5)) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 1b64662e551..9a98dd3f10b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -763,7 +763,7 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } - td.wd.ct.TableDiffRowCounts.Add([]string{td.table.Name}, dr.ProcessedRows) + td.wd.ct.TableDiffRowCounts.Add(td.table.Name, dr.ProcessedRows) return nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 56b8d663a3c..8c00b61b784 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -240,7 +240,7 @@ func (wd *workflowDiffer) diff(ctx context.Context) (err error) { defer func() { if err != nil { globalStats.ErrorCount.Add(1) - wd.ct.Errors.Add([]string{err.Error()}, 1) + wd.ct.Errors.Add(err.Error(), 1) } }() dbClient := wd.ct.dbClientFactory() diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index 5b5b6ede24c..11f458d9541 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -527,6 +527,22 @@ func (st *vrStats) register() { } return result }) + + stats.NewCountersFuncWithMultiLabels( + "VReplicationDDLActions", + "vreplication DDL processing actions per stream", + []string{"workflow", "action"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64, len(st.controllers)) + for _, ct := range st.controllers { + for key, val := range ct.blpStats.DDLEventActions.Counts() { + result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val + } + } + return result + }) } func (st *vrStats) numControllers() int64 { diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index d94802adb7b..12b79008d0b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -27,10 +27,9 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/proto/binlogdata" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -93,7 +92,7 @@ func TestStatusHtml(t *testing.T) { testStats.controllers = map[int32]*controller{ 1: { id: 1, - source: &binlogdata.BinlogSource{ + source: &binlogdatapb.BinlogSource{ Keyspace: "ks", Shard: "0", }, @@ -103,7 +102,7 @@ func TestStatusHtml(t *testing.T) { }, 2: { id: 2, - source: &binlogdata.BinlogSource{ + source: &binlogdatapb.BinlogSource{ Keyspace: "ks", Shard: "1", }, @@ -140,7 +139,7 @@ func TestVReplicationStats(t *testing.T) { testStats.controllers = map[int32]*controller{ 1: { id: 1, - source: &binlogdata.BinlogSource{ + source: &binlogdatapb.BinlogSource{ Keyspace: "ks", Shard: "0", }, @@ -195,6 +194,15 @@ func TestVReplicationStats(t *testing.T) { require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"]) require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"]) + blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_IGNORE.String(), 4) + blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC.String(), 3) + blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), 2) + blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_STOP.String(), 1) + require.Equal(t, int64(4), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_IGNORE.String()]) + require.Equal(t, int64(3), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_EXEC.String()]) + require.Equal(t, int64(2), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_EXEC_IGNORE.String()]) + require.Equal(t, int64(1), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_STOP.String()]) + var tm int64 = 1234567890 blpStats.RecordHeartbeat(tm) require.Equal(t, tm, blpStats.Heartbeat()) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index f2cb0a96e71..d7b60a104c4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -657,6 +657,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m log.Errorf("internal error: vplayer is in a transaction on event: %v", event) return fmt.Errorf("internal error: vplayer is in a transaction on event: %v", event) } + vp.vr.stats.DDLEventActions.Add(vp.vr.source.OnDdl.String(), 1) // Record the DDL handling switch vp.vr.source.OnDdl { case binlogdatapb.OnDDLAction_IGNORE: // We still have to update the position.