From cb40ac3a54a237e403107e71fbfa124cef6a4258 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 21 Apr 2024 15:42:55 -0400 Subject: [PATCH 01/12] Add VReplication DDL processing stats Signed-off-by: Matt Lord --- go/vt/binlog/binlogplayer/binlog_player.go | 3 +++ .../vttablet/tabletmanager/vreplication/stats.go | 16 ++++++++++++++++ .../tabletmanager/vreplication/vplayer.go | 4 ++++ 3 files changed, 23 insertions(+) diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index ea2c9c63a51..dfc8cd58963 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -115,6 +115,8 @@ type Stats struct { PartialQueryCacheSize *stats.CountersWithMultiLabels ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component + + DDLEventActions *stats.CountersWithMultiLabels } // RecordHeartbeat updates the time the last heartbeat from vstreamer was seen @@ -185,6 +187,7 @@ func NewStats() *Stats { bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"}) + bps.DDLEventActions = stats.NewCountersWithMultiLabels("", "", []string{"action"}) return bps } diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index 5b5b6ede24c..11f458d9541 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -527,6 +527,22 @@ func (st *vrStats) register() { } return result }) + + stats.NewCountersFuncWithMultiLabels( + "VReplicationDDLActions", + "vreplication DDL processing actions per stream", + []string{"workflow", "action"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64, len(st.controllers)) + for _, ct := range st.controllers { + for key, val := range ct.blpStats.DDLEventActions.Counts() { + result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val + } + } + return result + }) } func (st *vrStats) numControllers() int64 { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index f2cb0a96e71..307bb2d6657 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -664,6 +664,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } + vp.vr.stats.DDLEventActions.Add([]string{"ignore"}, 1) if posReached { return io.EOF } @@ -680,6 +681,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.commit(); err != nil { return err } + vp.vr.stats.DDLEventActions.Add([]string{"stop"}, 1) return io.EOF case binlogdatapb.OnDDLAction_EXEC: // It's impossible to save the position transactionally with the statement. @@ -694,6 +696,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } + vp.vr.stats.DDLEventActions.Add([]string{"exec"}, 1) if posReached { return io.EOF } @@ -706,6 +709,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } + vp.vr.stats.DDLEventActions.Add([]string{"exec_ignore"}, 1) if posReached { return io.EOF } From 1b9ee25ace969ca5028f2db4e12f630446ffa162 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 21 Apr 2024 16:22:44 -0400 Subject: [PATCH 02/12] Add basic unit test Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/stats_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index d94802adb7b..c7f089fb366 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -195,6 +195,15 @@ func TestVReplicationStats(t *testing.T) { require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"]) require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"]) + blpStats.DDLEventActions.Add([]string{"ignore"}, 4) + blpStats.DDLEventActions.Add([]string{"exec"}, 3) + blpStats.DDLEventActions.Add([]string{"exec_ignore"}, 2) + blpStats.DDLEventActions.Add([]string{"stop"}, 1) + require.Equal(t, int64(4), testStats.controllers[1].blpStats.DDLEventActions.Counts()["ignore"]) + require.Equal(t, int64(3), testStats.controllers[1].blpStats.DDLEventActions.Counts()["exec"]) + require.Equal(t, int64(2), testStats.controllers[1].blpStats.DDLEventActions.Counts()["exec_ignore"]) + require.Equal(t, int64(1), testStats.controllers[1].blpStats.DDLEventActions.Counts()["stop"]) + var tm int64 = 1234567890 blpStats.RecordHeartbeat(tm) require.Equal(t, tm, blpStats.Heartbeat()) From 3d8e11203b66a078fbc221924a178c2ae73fb97c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 21 Apr 2024 16:41:41 -0400 Subject: [PATCH 03/12] Add e2e test Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 4c72781df29..3b702f7dab7 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -170,8 +170,8 @@ func TestVReplicationDDLHandling(t *testing.T) { waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") // Confirm new col does exist on source waitForQueryResult(t, vtgateConn, sourceKs, checkColQuerySource, "[[INT64(1)]]") - // Also test Cancel --keep_routing_rules - moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep_routing_rules") + // Also test Cancel --keep-routing-rules + moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep-routing-rules") // Confirm that the routing rules were NOT cleared rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") require.NoError(t, err) @@ -186,6 +186,13 @@ func TestVReplicationDDLHandling(t *testing.T) { // Drop the column on source to start fresh again _, err = vtgateConn.ExecuteFetch(dropColDDL, 1, false) require.NoError(t, err, "error executing %q: %v", dropColDDL, err) + // Confirm that we updated the stats on the target tablet as expected. + jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON value looks like this: {"cproduct.1.ignore": 1} + streamIgnoreCount := gjson.Get(jsVal, fmt.Sprintf(`%s.1.ignore`, workflow)).Int() + require.Greater(t, streamIgnoreCount, int64(0)) // Test STOP behavior (new col now exists nowhere) moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=STOP") @@ -199,6 +206,13 @@ func TestVReplicationDDLHandling(t *testing.T) { // Confirm that the target does not have new col waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) + // Confirm that we updated the stats on the target tablet as expected. + jsVal, err = getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON value looks like this: {"cproduct.1.stop": 1} + streamStopCount := gjson.Get(jsVal, fmt.Sprintf(`%s.1.stop`, workflow)).Int() + require.Greater(t, streamStopCount, int64(0)) // Test EXEC behavior (new col now exists on source) moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=EXEC") @@ -214,6 +228,13 @@ func TestVReplicationDDLHandling(t *testing.T) { // Confirm new col was dropped on target waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) + // Confirm that we updated the stats on the target tablet as expected. + jsVal, err = getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON value looks like this: {"cproduct.1.exec": 1} + streamExecCount := gjson.Get(jsVal, fmt.Sprintf(`%s.1.exec`, workflow)).Int() + require.Greater(t, streamExecCount, int64(0)) } // TestVreplicationCopyThrottling tests the logic that is used From 522700e15e4b23fe142e2bbccc805b5a100dfe36 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 21 Apr 2024 21:23:16 -0400 Subject: [PATCH 04/12] Update manual e2e test Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 3b702f7dab7..5ae10d66dbc 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -170,6 +170,19 @@ func TestVReplicationDDLHandling(t *testing.T) { waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") // Confirm new col does exist on source waitForQueryResult(t, vtgateConn, sourceKs, checkColQuerySource, "[[INT64(1)]]") + // Confirm that we updated the stats on the target tablet as expected. + jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON value looks like this: {"onddl_test.1.ignore": 1} + streamIgnoreCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.1\.ignore`, workflow)).Int() + require.Equal(t, int64(1), streamIgnoreCount, "expected ignore stat counter of 1 but got %d, full value: %s", streamIgnoreCount, jsVal) + // The JSON value looks like this: {"onddl_test.1.stop": 0} + streamStopCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.1\.stop`, workflow)).Int() + require.Equal(t, int64(0), streamStopCount, "expected stop stat counter of 0 but got %d, full value: %s", streamStopCount, jsVal) + // The JSON value looks like this: {"onddl_test.1.exec": 0} + streamExecCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.1\.exec`, workflow)).Int() + require.Equal(t, int64(0), streamExecCount, "expected exec stat counter of 0 but got %d, full value: %s", streamExecCount, jsVal) // Also test Cancel --keep-routing-rules moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep-routing-rules") // Confirm that the routing rules were NOT cleared @@ -186,13 +199,6 @@ func TestVReplicationDDLHandling(t *testing.T) { // Drop the column on source to start fresh again _, err = vtgateConn.ExecuteFetch(dropColDDL, 1, false) require.NoError(t, err, "error executing %q: %v", dropColDDL, err) - // Confirm that we updated the stats on the target tablet as expected. - jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) - require.NoError(t, err) - require.NotEqual(t, "{}", jsVal) - // The JSON value looks like this: {"cproduct.1.ignore": 1} - streamIgnoreCount := gjson.Get(jsVal, fmt.Sprintf(`%s.1.ignore`, workflow)).Int() - require.Greater(t, streamIgnoreCount, int64(0)) // Test STOP behavior (new col now exists nowhere) moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=STOP") @@ -205,14 +211,20 @@ func TestVReplicationDDLHandling(t *testing.T) { waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String(), fmt.Sprintf("Message==Stopped at DDL %s", addColDDL)) // Confirm that the target does not have new col waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") - moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) // Confirm that we updated the stats on the target tablet as expected. jsVal, err = getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) require.NoError(t, err) require.NotEqual(t, "{}", jsVal) - // The JSON value looks like this: {"cproduct.1.stop": 1} - streamStopCount := gjson.Get(jsVal, fmt.Sprintf(`%s.1.stop`, workflow)).Int() - require.Greater(t, streamStopCount, int64(0)) + // The JSON value looks like this: {"onddl_test.2.stop": 1} + streamStopCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.2\.stop`, workflow)).Int() + require.Equal(t, int64(1), streamStopCount, "expected stop stat counter of 1 but got %d, full value: %s", streamStopCount, jsVal) + // The JSON value looks like this: {"onddl_test.2.ignore": 0} + streamIgnoreCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.2\.ignore`, workflow)).Int() + require.Equal(t, int64(0), streamIgnoreCount, "expected ignore stat counter of 0 but got %d, full value: %s", streamIgnoreCount, jsVal) + // The JSON value looks like this: {"onddl_test.2.exec": 0} + streamExecCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.2\.exec`, workflow)).Int() + require.Equal(t, int64(0), streamExecCount, "expected exec stat counter of 0 but got %d, full value: %s", streamExecCount, jsVal) + moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) // Test EXEC behavior (new col now exists on source) moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=EXEC") @@ -227,14 +239,20 @@ func TestVReplicationDDLHandling(t *testing.T) { waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) // Confirm new col was dropped on target waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") - moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) // Confirm that we updated the stats on the target tablet as expected. jsVal, err = getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) require.NoError(t, err) require.NotEqual(t, "{}", jsVal) - // The JSON value looks like this: {"cproduct.1.exec": 1} - streamExecCount := gjson.Get(jsVal, fmt.Sprintf(`%s.1.exec`, workflow)).Int() - require.Greater(t, streamExecCount, int64(0)) + // The JSON value looks like this: {"onddl_test.3.exec": 1} + streamExecCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.3\.exec`, workflow)).Int() + require.Equal(t, int64(1), streamExecCount, "expected exec stat counter of 1 but got %d, full value: %s", streamExecCount, jsVal) + // The JSON value looks like this: {"onddl_test.3.ignore": 0} + streamIgnoreCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.3\.ignore`, workflow)).Int() + require.Equal(t, int64(0), streamIgnoreCount, "expected ignore stat counter of 0 but got %d, full value: %s", streamIgnoreCount, jsVal) + moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) + // The JSON value looks like this: {"onddl_test.3.stop": 0} + streamStopCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.2\.stop`, workflow)).Int() + require.Equal(t, int64(0), streamStopCount, "expected stop stat counter of 0 but got %d, full value: %s", streamStopCount, jsVal) } // TestVreplicationCopyThrottling tests the logic that is used From ed8535eb6372950a5113ab497ff8e08e0861da2c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 21 Apr 2024 21:42:59 -0400 Subject: [PATCH 05/12] Minor change after self review Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vreplication_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 5ae10d66dbc..d82d4b71419 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -251,7 +251,7 @@ func TestVReplicationDDLHandling(t *testing.T) { require.Equal(t, int64(0), streamIgnoreCount, "expected ignore stat counter of 0 but got %d, full value: %s", streamIgnoreCount, jsVal) moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) // The JSON value looks like this: {"onddl_test.3.stop": 0} - streamStopCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.2\.stop`, workflow)).Int() + streamStopCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.3\.stop`, workflow)).Int() require.Equal(t, int64(0), streamStopCount, "expected stop stat counter of 0 but got %d, full value: %s", streamStopCount, jsVal) } From cbe3a55977cd82a34a17a990bd0380307b0c818b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 23 Apr 2024 17:41:51 -0400 Subject: [PATCH 06/12] Address review comment Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 57 +++++++------------ 1 file changed, 20 insertions(+), 37 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index d82d4b71419..abb515214c9 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -157,6 +157,23 @@ func TestVReplicationDDLHandling(t *testing.T) { checkColQueryTarget := fmt.Sprintf("select count(column_name) from information_schema.columns where table_schema='vt_%s' and table_name='%s' and column_name='%s'", targetKs, table, newColumn) + // action is the specific action, e.g. ignore, that should have a count of 1. All other + // actions should have a count of 0. + checkOnDDLStats := func(expectedAction string, id int) { + jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON values look like this: {"onddl_test.3.ignore": 1} + for _, action := range []string{"ignore", "exec", "exec_ignore", "stop"} { + count := gjson.Get(jsVal, fmt.Sprintf(`%s\.%d\.%s`, workflow, id, action)).Int() + expectedCount := int64(0) + if action == expectedAction { + expectedCount = 1 + } + require.Equal(t, expectedCount, count, "expected %s stat counter of %d but got %d, full value: %s", action, expectedCount, count, jsVal) + } + } + // Test IGNORE behavior moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=IGNORE") // Wait until we get through the copy phase... @@ -171,18 +188,7 @@ func TestVReplicationDDLHandling(t *testing.T) { // Confirm new col does exist on source waitForQueryResult(t, vtgateConn, sourceKs, checkColQuerySource, "[[INT64(1)]]") // Confirm that we updated the stats on the target tablet as expected. - jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) - require.NoError(t, err) - require.NotEqual(t, "{}", jsVal) - // The JSON value looks like this: {"onddl_test.1.ignore": 1} - streamIgnoreCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.1\.ignore`, workflow)).Int() - require.Equal(t, int64(1), streamIgnoreCount, "expected ignore stat counter of 1 but got %d, full value: %s", streamIgnoreCount, jsVal) - // The JSON value looks like this: {"onddl_test.1.stop": 0} - streamStopCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.1\.stop`, workflow)).Int() - require.Equal(t, int64(0), streamStopCount, "expected stop stat counter of 0 but got %d, full value: %s", streamStopCount, jsVal) - // The JSON value looks like this: {"onddl_test.1.exec": 0} - streamExecCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.1\.exec`, workflow)).Int() - require.Equal(t, int64(0), streamExecCount, "expected exec stat counter of 0 but got %d, full value: %s", streamExecCount, jsVal) + checkOnDDLStats("ignore", 1) // Also test Cancel --keep-routing-rules moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep-routing-rules") // Confirm that the routing rules were NOT cleared @@ -212,18 +218,7 @@ func TestVReplicationDDLHandling(t *testing.T) { // Confirm that the target does not have new col waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") // Confirm that we updated the stats on the target tablet as expected. - jsVal, err = getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) - require.NoError(t, err) - require.NotEqual(t, "{}", jsVal) - // The JSON value looks like this: {"onddl_test.2.stop": 1} - streamStopCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.2\.stop`, workflow)).Int() - require.Equal(t, int64(1), streamStopCount, "expected stop stat counter of 1 but got %d, full value: %s", streamStopCount, jsVal) - // The JSON value looks like this: {"onddl_test.2.ignore": 0} - streamIgnoreCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.2\.ignore`, workflow)).Int() - require.Equal(t, int64(0), streamIgnoreCount, "expected ignore stat counter of 0 but got %d, full value: %s", streamIgnoreCount, jsVal) - // The JSON value looks like this: {"onddl_test.2.exec": 0} - streamExecCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.2\.exec`, workflow)).Int() - require.Equal(t, int64(0), streamExecCount, "expected exec stat counter of 0 but got %d, full value: %s", streamExecCount, jsVal) + checkOnDDLStats("stop", 2) moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) // Test EXEC behavior (new col now exists on source) @@ -240,19 +235,7 @@ func TestVReplicationDDLHandling(t *testing.T) { // Confirm new col was dropped on target waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") // Confirm that we updated the stats on the target tablet as expected. - jsVal, err = getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) - require.NoError(t, err) - require.NotEqual(t, "{}", jsVal) - // The JSON value looks like this: {"onddl_test.3.exec": 1} - streamExecCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.3\.exec`, workflow)).Int() - require.Equal(t, int64(1), streamExecCount, "expected exec stat counter of 1 but got %d, full value: %s", streamExecCount, jsVal) - // The JSON value looks like this: {"onddl_test.3.ignore": 0} - streamIgnoreCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.3\.ignore`, workflow)).Int() - require.Equal(t, int64(0), streamIgnoreCount, "expected ignore stat counter of 0 but got %d, full value: %s", streamIgnoreCount, jsVal) - moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) - // The JSON value looks like this: {"onddl_test.3.stop": 0} - streamStopCount = gjson.Get(jsVal, fmt.Sprintf(`%s\.3\.stop`, workflow)).Int() - require.Equal(t, int64(0), streamStopCount, "expected stop stat counter of 0 but got %d, full value: %s", streamStopCount, jsVal) + checkOnDDLStats("exec", 3) } // TestVreplicationCopyThrottling tests the logic that is used From f0fc18e4b0da422d26cbe1fac41e9d0cefa7facc Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 24 Apr 2024 16:02:42 -0400 Subject: [PATCH 07/12] Minor stat cleanup on final self review Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vreplication_test.go | 4 ++-- go/vt/binlog/binlogplayer/binlog_player.go | 14 +++++++------- go/vt/vttablet/tabletmanager/vdiff/controller.go | 8 ++++---- go/vt/vttablet/tabletmanager/vdiff/stats.go | 2 +- go/vt/vttablet/tabletmanager/vdiff/stats_test.go | 8 ++++---- go/vt/vttablet/tabletmanager/vdiff/table_differ.go | 2 +- .../tabletmanager/vdiff/workflow_differ.go | 2 +- .../tabletmanager/vreplication/stats_test.go | 8 ++++---- .../vttablet/tabletmanager/vreplication/vplayer.go | 8 ++++---- 9 files changed, 28 insertions(+), 28 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index abb515214c9..42330343274 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -157,8 +157,8 @@ func TestVReplicationDDLHandling(t *testing.T) { checkColQueryTarget := fmt.Sprintf("select count(column_name) from information_schema.columns where table_schema='vt_%s' and table_name='%s' and column_name='%s'", targetKs, table, newColumn) - // action is the specific action, e.g. ignore, that should have a count of 1. All other - // actions should have a count of 0. + // expectedAction is the specific action, e.g. ignore, that should have a count of 1. All other + // actions should have a count of 0. id is the stream ID to check. checkOnDDLStats := func(expectedAction string, id int) { jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) require.NoError(t, err) diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index dfc8cd58963..77f76c2997e 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -116,7 +116,7 @@ type Stats struct { ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component - DDLEventActions *stats.CountersWithMultiLabels + DDLEventActions *stats.CountersWithSingleLabel } // RecordHeartbeat updates the time the last heartbeat from vstreamer was seen @@ -173,21 +173,21 @@ func NewStats() *Stats { bps.ReplicationLagSeconds.Store(math.MaxInt64) bps.PhaseTimings = stats.NewTimings("", "", "Phase") bps.QueryTimings = stats.NewTimings("", "", "Phase") - bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "") - bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") - bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") + bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase") + bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement") + bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement") bps.CopyRowCount = stats.NewCounter("", "") bps.CopyLoopCount = stats.NewCounter("", "") bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"}) - bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") + bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement") bps.VReplicationLags = stats.NewTimings("", "", "") bps.VReplicationLagRates = stats.NewRates("", bps.VReplicationLags, 15*60/5, 5*time.Second) - bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table", "") + bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table") bps.TableCopyTimings = stats.NewTimings("", "", "Table") bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"}) - bps.DDLEventActions = stats.NewCountersWithMultiLabels("", "", []string{"action"}) + bps.DDLEventActions = stats.NewCountersWithSingleLabel("", "", "action") return bps } diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 0265e8a0a35..20c1501989e 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -78,8 +78,8 @@ type controller struct { externalCluster string // For Mount+Migrate // Information used in vdiff stats/metrics. - Errors *stats.CountersWithMultiLabels - TableDiffRowCounts *stats.CountersWithMultiLabels + Errors *stats.CountersWithSingleLabel + TableDiffRowCounts *stats.CountersWithSingleLabel TableDiffPhaseTimings *stats.Timings } @@ -100,8 +100,8 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac tmc: vde.tmClientFactory(), sources: make(map[string]*migrationSource), options: options, - Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), - TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), + Errors: stats.NewCountersWithSingleLabel("", "", "Error"), + TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"), TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), } ctx, ct.cancel = context.WithCancel(ctx) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index b68e1f86556..04cda6ac0c1 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -47,7 +47,7 @@ type vdiffStats struct { func (vds *vdiffStats) register() { globalStats.Count = stats.NewGauge("", "") globalStats.ErrorCount = stats.NewCounter("", "") - globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "") + globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table") globalStats.RowsDiffedCount = stats.NewCounter("", "") stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats_test.go b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go index b4f02d7b192..21b2caa9992 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go @@ -31,7 +31,7 @@ import ( func TestVDiffStats(t *testing.T) { testStats := &vdiffStats{ ErrorCount: stats.NewCounter("", ""), - RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table", ""), + RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table"), RowsDiffedCount: stats.NewCounter("", ""), } id := int64(1) @@ -41,8 +41,8 @@ func TestVDiffStats(t *testing.T) { workflow: "testwf", workflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, uuid: uuid.New().String(), - Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), - TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), + Errors: stats.NewCountersWithSingleLabel("", "", "Error"), + TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"), TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), }, } @@ -65,7 +65,7 @@ func TestVDiffStats(t *testing.T) { testStats.ErrorCount.Set(11) require.Equal(t, int64(11), testStats.ErrorCount.Get()) - testStats.controllers[id].Errors.Add([]string{"test error"}, int64(12)) + testStats.controllers[id].Errors.Add("test error", int64(12)) require.Equal(t, int64(12), testStats.controllers[id].Errors.Counts()["test error"]) testStats.RestartedTableDiffs.Add("t1", int64(5)) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 1b64662e551..9a98dd3f10b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -763,7 +763,7 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } - td.wd.ct.TableDiffRowCounts.Add([]string{td.table.Name}, dr.ProcessedRows) + td.wd.ct.TableDiffRowCounts.Add(td.table.Name, dr.ProcessedRows) return nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 56b8d663a3c..8c00b61b784 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -240,7 +240,7 @@ func (wd *workflowDiffer) diff(ctx context.Context) (err error) { defer func() { if err != nil { globalStats.ErrorCount.Add(1) - wd.ct.Errors.Add([]string{err.Error()}, 1) + wd.ct.Errors.Add(err.Error(), 1) } }() dbClient := wd.ct.dbClientFactory() diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index c7f089fb366..c82b3a1f720 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -195,10 +195,10 @@ func TestVReplicationStats(t *testing.T) { require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"]) require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"]) - blpStats.DDLEventActions.Add([]string{"ignore"}, 4) - blpStats.DDLEventActions.Add([]string{"exec"}, 3) - blpStats.DDLEventActions.Add([]string{"exec_ignore"}, 2) - blpStats.DDLEventActions.Add([]string{"stop"}, 1) + blpStats.DDLEventActions.Add("ignore", 4) + blpStats.DDLEventActions.Add("exec", 3) + blpStats.DDLEventActions.Add("exec_ignore", 2) + blpStats.DDLEventActions.Add("stop", 1) require.Equal(t, int64(4), testStats.controllers[1].blpStats.DDLEventActions.Counts()["ignore"]) require.Equal(t, int64(3), testStats.controllers[1].blpStats.DDLEventActions.Counts()["exec"]) require.Equal(t, int64(2), testStats.controllers[1].blpStats.DDLEventActions.Counts()["exec_ignore"]) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 307bb2d6657..3e2c07f3861 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -664,7 +664,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add([]string{"ignore"}, 1) + vp.vr.stats.DDLEventActions.Add("ignore", 1) if posReached { return io.EOF } @@ -681,7 +681,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.commit(); err != nil { return err } - vp.vr.stats.DDLEventActions.Add([]string{"stop"}, 1) + vp.vr.stats.DDLEventActions.Add("stop", 1) return io.EOF case binlogdatapb.OnDDLAction_EXEC: // It's impossible to save the position transactionally with the statement. @@ -696,7 +696,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add([]string{"exec"}, 1) + vp.vr.stats.DDLEventActions.Add("exec", 1) if posReached { return io.EOF } @@ -709,7 +709,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add([]string{"exec_ignore"}, 1) + vp.vr.stats.DDLEventActions.Add("exec_ignore", 1) if posReached { return io.EOF } From 8ac3107c259e9c47314a69bae8ceaade0d604cd8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 24 Apr 2024 16:16:46 -0400 Subject: [PATCH 08/12] Minor improvements Signed-off-by: Matt Lord --- .../endtoend/vreplication/vreplication_test.go | 14 +++++++------- .../tabletmanager/vreplication/stats_test.go | 17 +++++++++-------- .../tabletmanager/vreplication/vplayer.go | 8 ++++---- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 42330343274..d8b7754d025 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -159,15 +159,15 @@ func TestVReplicationDDLHandling(t *testing.T) { // expectedAction is the specific action, e.g. ignore, that should have a count of 1. All other // actions should have a count of 0. id is the stream ID to check. - checkOnDDLStats := func(expectedAction string, id int) { + checkOnDDLStats := func(expectedAction binlogdatapb.OnDDLAction, id int) { jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"}) require.NoError(t, err) require.NotEqual(t, "{}", jsVal) - // The JSON values look like this: {"onddl_test.3.ignore": 1} - for _, action := range []string{"ignore", "exec", "exec_ignore", "stop"} { + // The JSON values look like this: {"onddl_test.3.IGNORE": 1} + for _, action := range binlogdatapb.OnDDLAction_name { count := gjson.Get(jsVal, fmt.Sprintf(`%s\.%d\.%s`, workflow, id, action)).Int() expectedCount := int64(0) - if action == expectedAction { + if action == expectedAction.String() { expectedCount = 1 } require.Equal(t, expectedCount, count, "expected %s stat counter of %d but got %d, full value: %s", action, expectedCount, count, jsVal) @@ -188,7 +188,7 @@ func TestVReplicationDDLHandling(t *testing.T) { // Confirm new col does exist on source waitForQueryResult(t, vtgateConn, sourceKs, checkColQuerySource, "[[INT64(1)]]") // Confirm that we updated the stats on the target tablet as expected. - checkOnDDLStats("ignore", 1) + checkOnDDLStats(binlogdatapb.OnDDLAction_IGNORE, 1) // Also test Cancel --keep-routing-rules moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep-routing-rules") // Confirm that the routing rules were NOT cleared @@ -218,7 +218,7 @@ func TestVReplicationDDLHandling(t *testing.T) { // Confirm that the target does not have new col waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") // Confirm that we updated the stats on the target tablet as expected. - checkOnDDLStats("stop", 2) + checkOnDDLStats(binlogdatapb.OnDDLAction_STOP, 2) moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) // Test EXEC behavior (new col now exists on source) @@ -235,7 +235,7 @@ func TestVReplicationDDLHandling(t *testing.T) { // Confirm new col was dropped on target waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]") // Confirm that we updated the stats on the target tablet as expected. - checkOnDDLStats("exec", 3) + checkOnDDLStats(binlogdatapb.OnDDLAction_EXEC, 3) } // TestVreplicationCopyThrottling tests the logic that is used diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index c82b3a1f720..84fdb52ac66 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -195,14 +196,14 @@ func TestVReplicationStats(t *testing.T) { require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"]) require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"]) - blpStats.DDLEventActions.Add("ignore", 4) - blpStats.DDLEventActions.Add("exec", 3) - blpStats.DDLEventActions.Add("exec_ignore", 2) - blpStats.DDLEventActions.Add("stop", 1) - require.Equal(t, int64(4), testStats.controllers[1].blpStats.DDLEventActions.Counts()["ignore"]) - require.Equal(t, int64(3), testStats.controllers[1].blpStats.DDLEventActions.Counts()["exec"]) - require.Equal(t, int64(2), testStats.controllers[1].blpStats.DDLEventActions.Counts()["exec_ignore"]) - require.Equal(t, int64(1), testStats.controllers[1].blpStats.DDLEventActions.Counts()["stop"]) + blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_IGNORE.String(), 4) + blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC.String(), 3) + blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), 2) + blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_STOP.String(), 1) + require.Equal(t, int64(4), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_IGNORE.String()]) + require.Equal(t, int64(3), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_EXEC.String()]) + require.Equal(t, int64(2), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_EXEC_IGNORE.String()]) + require.Equal(t, int64(1), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_STOP.String()]) var tm int64 = 1234567890 blpStats.RecordHeartbeat(tm) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 3e2c07f3861..ee60710a327 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -664,7 +664,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add("ignore", 1) + vp.vr.stats.DDLEventActions.Add(binlogdatapb.OnDDLAction_IGNORE.String(), 1) if posReached { return io.EOF } @@ -681,7 +681,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.commit(); err != nil { return err } - vp.vr.stats.DDLEventActions.Add("stop", 1) + vp.vr.stats.DDLEventActions.Add(binlogdatapb.OnDDLAction_STOP.String(), 1) return io.EOF case binlogdatapb.OnDDLAction_EXEC: // It's impossible to save the position transactionally with the statement. @@ -696,7 +696,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add("exec", 1) + vp.vr.stats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC.String(), 1) if posReached { return io.EOF } @@ -709,7 +709,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add("exec_ignore", 1) + vp.vr.stats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), 1) if posReached { return io.EOF } From df9907751ac948b592d32f7790e1d01d7df6f813 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 24 Apr 2024 16:20:24 -0400 Subject: [PATCH 09/12] Nitting myself to deth Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vreplication_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index d8b7754d025..c06489006f8 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -175,7 +175,7 @@ func TestVReplicationDDLHandling(t *testing.T) { } // Test IGNORE behavior - moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=IGNORE") + moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_IGNORE.String()) // Wait until we get through the copy phase... catchup(t, targetTab, workflow, "MoveTables") // Add new col on source @@ -207,7 +207,7 @@ func TestVReplicationDDLHandling(t *testing.T) { require.NoError(t, err, "error executing %q: %v", dropColDDL, err) // Test STOP behavior (new col now exists nowhere) - moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=STOP") + moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_STOP.String()) // Wait until we get through the copy phase... catchup(t, targetTab, workflow, "MoveTables") // Add new col on the source @@ -222,7 +222,7 @@ func TestVReplicationDDLHandling(t *testing.T) { moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table) // Test EXEC behavior (new col now exists on source) - moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=EXEC") + moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_EXEC.String()) // Wait until we get through the copy phase... catchup(t, targetTab, workflow, "MoveTables") // Confirm target has new col from copy phase From 6080eaf6ac79fa5c6019970a393cc1393c412e83 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 24 Apr 2024 17:07:50 -0400 Subject: [PATCH 10/12] Send halp Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/stats_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 84fdb52ac66..692e725f09d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -27,11 +27,10 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) From d957df9feef2f63dfc806de86fc1240e69880f45 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 24 Apr 2024 17:08:30 -0400 Subject: [PATCH 11/12] Ded Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/stats_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 692e725f09d..12b79008d0b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -28,7 +28,6 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/proto/binlogdata" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -93,7 +92,7 @@ func TestStatusHtml(t *testing.T) { testStats.controllers = map[int32]*controller{ 1: { id: 1, - source: &binlogdata.BinlogSource{ + source: &binlogdatapb.BinlogSource{ Keyspace: "ks", Shard: "0", }, @@ -103,7 +102,7 @@ func TestStatusHtml(t *testing.T) { }, 2: { id: 2, - source: &binlogdata.BinlogSource{ + source: &binlogdatapb.BinlogSource{ Keyspace: "ks", Shard: "1", }, @@ -140,7 +139,7 @@ func TestVReplicationStats(t *testing.T) { testStats.controllers = map[int32]*controller{ 1: { id: 1, - source: &binlogdata.BinlogSource{ + source: &binlogdatapb.BinlogSource{ Keyspace: "ks", Shard: "0", }, From ee0bd6fe34bf07389d1a016adccf3da3e24cf17b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 24 Apr 2024 17:15:06 -0400 Subject: [PATCH 12/12] Simplify the code a bit Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index ee60710a327..d7b60a104c4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -657,6 +657,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m log.Errorf("internal error: vplayer is in a transaction on event: %v", event) return fmt.Errorf("internal error: vplayer is in a transaction on event: %v", event) } + vp.vr.stats.DDLEventActions.Add(vp.vr.source.OnDdl.String(), 1) // Record the DDL handling switch vp.vr.source.OnDdl { case binlogdatapb.OnDDLAction_IGNORE: // We still have to update the position. @@ -664,7 +665,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add(binlogdatapb.OnDDLAction_IGNORE.String(), 1) if posReached { return io.EOF } @@ -681,7 +681,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.commit(); err != nil { return err } - vp.vr.stats.DDLEventActions.Add(binlogdatapb.OnDDLAction_STOP.String(), 1) return io.EOF case binlogdatapb.OnDDLAction_EXEC: // It's impossible to save the position transactionally with the statement. @@ -696,7 +695,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC.String(), 1) if posReached { return io.EOF } @@ -709,7 +707,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err != nil { return err } - vp.vr.stats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), 1) if posReached { return io.EOF }