Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Add stream DDL processing stats #15769

Merged
merged 12 commits into from
Apr 25, 2024
34 changes: 28 additions & 6 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,25 @@ func TestVReplicationDDLHandling(t *testing.T) {
checkColQueryTarget := fmt.Sprintf("select count(column_name) from information_schema.columns where table_schema='vt_%s' and table_name='%s' and column_name='%s'",
targetKs, table, newColumn)

// expectedAction is the specific action, e.g. ignore, that should have a count of 1. All other
// actions should have a count of 0. id is the stream ID to check.
checkOnDDLStats := func(expectedAction binlogdatapb.OnDDLAction, id int) {
jsVal, err := getDebugVar(t, targetTab.Port, []string{"VReplicationDDLActions"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON values look like this: {"onddl_test.3.IGNORE": 1}
for _, action := range binlogdatapb.OnDDLAction_name {
count := gjson.Get(jsVal, fmt.Sprintf(`%s\.%d\.%s`, workflow, id, action)).Int()
expectedCount := int64(0)
if action == expectedAction.String() {
expectedCount = 1
}
require.Equal(t, expectedCount, count, "expected %s stat counter of %d but got %d, full value: %s", action, expectedCount, count, jsVal)
}
}

// Test IGNORE behavior
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=IGNORE")
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_IGNORE.String())
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Add new col on source
Expand All @@ -170,8 +187,10 @@ func TestVReplicationDDLHandling(t *testing.T) {
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
// Confirm new col does exist on source
waitForQueryResult(t, vtgateConn, sourceKs, checkColQuerySource, "[[INT64(1)]]")
// Also test Cancel --keep_routing_rules
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep_routing_rules")
// Confirm that we updated the stats on the target tablet as expected.
checkOnDDLStats(binlogdatapb.OnDDLAction_IGNORE, 1)
// Also test Cancel --keep-routing-rules
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table, "--keep-routing-rules")
// Confirm that the routing rules were NOT cleared
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
Expand All @@ -188,7 +207,7 @@ func TestVReplicationDDLHandling(t *testing.T) {
require.NoError(t, err, "error executing %q: %v", dropColDDL, err)

// Test STOP behavior (new col now exists nowhere)
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=STOP")
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_STOP.String())
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Add new col on the source
Expand All @@ -198,10 +217,12 @@ func TestVReplicationDDLHandling(t *testing.T) {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String(), fmt.Sprintf("Message==Stopped at DDL %s", addColDDL))
// Confirm that the target does not have new col
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
// Confirm that we updated the stats on the target tablet as expected.
checkOnDDLStats(binlogdatapb.OnDDLAction_STOP, 2)
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table)

// Test EXEC behavior (new col now exists on source)
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=EXEC")
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl", binlogdatapb.OnDDLAction_EXEC.String())
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Confirm target has new col from copy phase
Expand All @@ -213,7 +234,8 @@ func TestVReplicationDDLHandling(t *testing.T) {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
// Confirm new col was dropped on target
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table)
// Confirm that we updated the stats on the target tablet as expected.
mattlord marked this conversation as resolved.
Show resolved Hide resolved
checkOnDDLStats(binlogdatapb.OnDDLAction_EXEC, 3)
}

// TestVreplicationCopyThrottling tests the logic that is used
Expand Down
13 changes: 8 additions & 5 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type Stats struct {
PartialQueryCacheSize *stats.CountersWithMultiLabels

ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component

DDLEventActions *stats.CountersWithSingleLabel
}

// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
Expand Down Expand Up @@ -171,20 +173,21 @@ func NewStats() *Stats {
bps.ReplicationLagSeconds.Store(math.MaxInt64)
bps.PhaseTimings = stats.NewTimings("", "", "Phase")
bps.QueryTimings = stats.NewTimings("", "", "Phase")
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase")
bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement")
bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement")
bps.CopyRowCount = stats.NewCounter("", "")
bps.CopyLoopCount = stats.NewCounter("", "")
bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement")
bps.VReplicationLags = stats.NewTimings("", "", "")
bps.VReplicationLagRates = stats.NewRates("", bps.VReplicationLags, 15*60/5, 5*time.Second)
bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table", "")
bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table")
bps.TableCopyTimings = stats.NewTimings("", "", "Table")
bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"})
bps.DDLEventActions = stats.NewCountersWithSingleLabel("", "", "action")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea whether these tags are case-sensitive? I mean when filtering using PromQL. In this case, there is only 1 tag, but for multi-label counters, you might want to filter that way.

Copy link
Contributor Author

@mattlord mattlord Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe they are. For example this returns values in grafana with prometheus:

vttablet_errors{error_code="ABORTED"}

But these do not:

vttablet_errors{error_code="aborted"}

vttablet_errors{error_CODE="ABORTED"}

return bps
}

Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ type controller struct {
externalCluster string // For Mount+Migrate

// Information used in vdiff stats/metrics.
Errors *stats.CountersWithMultiLabels
TableDiffRowCounts *stats.CountersWithMultiLabels
Errors *stats.CountersWithSingleLabel
TableDiffRowCounts *stats.CountersWithSingleLabel
TableDiffPhaseTimings *stats.Timings
}

Expand All @@ -100,8 +100,8 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac
tmc: vde.tmClientFactory(),
sources: make(map[string]*migrationSource),
options: options,
Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}),
TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}),
Errors: stats.NewCountersWithSingleLabel("", "", "Error"),
TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"),
TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"),
}
ctx, ct.cancel = context.WithCancel(ctx)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type vdiffStats struct {
func (vds *vdiffStats) register() {
globalStats.Count = stats.NewGauge("", "")
globalStats.ErrorCount = stats.NewCounter("", "")
globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "")
globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table")
globalStats.RowsDiffedCount = stats.NewCounter("", "")

stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vdiff/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func TestVDiffStats(t *testing.T) {
testStats := &vdiffStats{
ErrorCount: stats.NewCounter("", ""),
RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table", ""),
RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table"),
RowsDiffedCount: stats.NewCounter("", ""),
}
id := int64(1)
Expand All @@ -41,8 +41,8 @@ func TestVDiffStats(t *testing.T) {
workflow: "testwf",
workflowType: binlogdatapb.VReplicationWorkflowType_MoveTables,
uuid: uuid.New().String(),
Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}),
TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}),
Errors: stats.NewCountersWithSingleLabel("", "", "Error"),
TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"),
TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"),
},
}
Expand All @@ -65,7 +65,7 @@ func TestVDiffStats(t *testing.T) {
testStats.ErrorCount.Set(11)
require.Equal(t, int64(11), testStats.ErrorCount.Get())

testStats.controllers[id].Errors.Add([]string{"test error"}, int64(12))
testStats.controllers[id].Errors.Add("test error", int64(12))
require.Equal(t, int64(12), testStats.controllers[id].Errors.Counts()["test error"])

testStats.RestartedTableDiffs.Add("t1", int64(5))
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D
if _, err := dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
td.wd.ct.TableDiffRowCounts.Add([]string{td.table.Name}, dr.ProcessedRows)
td.wd.ct.TableDiffRowCounts.Add(td.table.Name, dr.ProcessedRows)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (wd *workflowDiffer) diff(ctx context.Context) (err error) {
defer func() {
if err != nil {
globalStats.ErrorCount.Add(1)
wd.ct.Errors.Add([]string{err.Error()}, 1)
wd.ct.Errors.Add(err.Error(), 1)
}
}()
dbClient := wd.ct.dbClientFactory()
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,22 @@ func (st *vrStats) register() {
}
return result
})

stats.NewCountersFuncWithMultiLabels(
"VReplicationDDLActions",
"vreplication DDL processing actions per stream",
[]string{"workflow", "action"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
for key, val := range ct.blpStats.DDLEventActions.Counts() {
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val
}
}
return result
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
18 changes: 13 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import (

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/proto/binlogdata"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -93,7 +92,7 @@ func TestStatusHtml(t *testing.T) {
testStats.controllers = map[int32]*controller{
1: {
id: 1,
source: &binlogdata.BinlogSource{
source: &binlogdatapb.BinlogSource{
Keyspace: "ks",
Shard: "0",
},
Expand All @@ -103,7 +102,7 @@ func TestStatusHtml(t *testing.T) {
},
2: {
id: 2,
source: &binlogdata.BinlogSource{
source: &binlogdatapb.BinlogSource{
Keyspace: "ks",
Shard: "1",
},
Expand Down Expand Up @@ -140,7 +139,7 @@ func TestVReplicationStats(t *testing.T) {
testStats.controllers = map[int32]*controller{
1: {
id: 1,
source: &binlogdata.BinlogSource{
source: &binlogdatapb.BinlogSource{
Keyspace: "ks",
Shard: "0",
},
Expand Down Expand Up @@ -195,6 +194,15 @@ func TestVReplicationStats(t *testing.T) {
require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"])
require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"])

blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_IGNORE.String(), 4)
blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC.String(), 3)
blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), 2)
blpStats.DDLEventActions.Add(binlogdatapb.OnDDLAction_STOP.String(), 1)
require.Equal(t, int64(4), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_IGNORE.String()])
require.Equal(t, int64(3), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_EXEC.String()])
require.Equal(t, int64(2), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_EXEC_IGNORE.String()])
require.Equal(t, int64(1), testStats.controllers[1].blpStats.DDLEventActions.Counts()[binlogdatapb.OnDDLAction_STOP.String()])

var tm int64 = 1234567890
blpStats.RecordHeartbeat(tm)
require.Equal(t, tm, blpStats.Heartbeat())
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
log.Errorf("internal error: vplayer is in a transaction on event: %v", event)
return fmt.Errorf("internal error: vplayer is in a transaction on event: %v", event)
}
vp.vr.stats.DDLEventActions.Add(vp.vr.source.OnDdl.String(), 1) // Record the DDL handling
switch vp.vr.source.OnDdl {
case binlogdatapb.OnDDLAction_IGNORE:
// We still have to update the position.
Expand Down
Loading