diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index f2cb0a96e71..992618ed3eb 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -163,7 +163,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map timeLastSaved: time.Now(), tablePlans: make(map[string]*TablePlan), phase: phase, - throttlerAppName: throttlerapp.VCopierName.ConcatenateString(vr.throttlerAppName()), + throttlerAppName: throttlerapp.VPlayerName.ConcatenateString(vr.throttlerAppName()), query: queryFunc, commit: commitFunc, batchMode: batchMode, diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index 3be0525dc88..df49b468df2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/mysqlctl" @@ -810,3 +811,59 @@ func waitForQueryResult(t *testing.T, dbc binlogplayer.DBClient, query, val stri } } } + +func TestThrottlerAppNames(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tablet := addTablet(100) + defer deleteTablet(tablet) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + } + id := int32(1) + vsclient := newTabletConnector(tablet) + stats := binlogplayer.NewStats() + defer stats.Stop() + dbClient := playerEngine.dbClientFactoryFiltered() + err := dbClient.Connect() + require.NoError(t, err) + defer dbClient.Close() + dbName := dbClient.DBName() + // Ensure there's a dummy vreplication workflow record + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, options) values (%d, 'test_workflow', '', '', 99999, 99999, 0, 0, 'Running', '%s', '{}') on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s'", + id, dbName, dbName), 1) + require.NoError(t, err) + defer func() { + _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) + require.NoError(t, err) + }() + vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine) + settings, _, err := vr.loadSettings(ctx, newVDBClient(dbClient, stats)) + require.NoError(t, err) + + throttlerAppName := vr.throttlerAppName() + assert.Contains(t, throttlerAppName, "test_workflow") + assert.Contains(t, throttlerAppName, "vreplication") + assert.NotContains(t, throttlerAppName, "vcopier") + assert.NotContains(t, throttlerAppName, "vplayer") + + vp := newVPlayer(vr, settings, nil, replication.Position{}, "") + assert.Contains(t, vp.throttlerAppName, "test_workflow") + assert.Contains(t, vp.throttlerAppName, "vreplication") + assert.Contains(t, vp.throttlerAppName, "vplayer") + assert.NotContains(t, vp.throttlerAppName, "vcopier") + + vc := newVCopier(vr) + assert.Contains(t, vc.throttlerAppName, "test_workflow") + assert.Contains(t, vc.throttlerAppName, "vreplication") + assert.Contains(t, vc.throttlerAppName, "vcopier") + assert.NotContains(t, vc.throttlerAppName, "vplayer") +}