diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index df63d5a84a1..c8251846fe6 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/test/endtoend/throttler" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -358,6 +359,52 @@ func TestInitialThrottler(t *testing.T) { }) } +func TestThrottleViaApplySchema(t *testing.T) { + defer cluster.PanicHandler(t) + t.Run("throttling via ApplySchema", func(t *testing.T) { + vtctlParams := &cluster.ApplySchemaParams{DDLStrategy: "online"} + _, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput( + keyspaceName, "alter vitess_migration throttle all", *vtctlParams, + ) + assert.NoError(t, err) + }) + t.Run("validate keyspace configuration after throttle", func(t *testing.T) { + keyspace, err := clusterInstance.VtctldClientProcess.GetKeyspace(keyspaceName) + require.NoError(t, err) + require.NotNil(t, keyspace) + require.NotNil(t, keyspace.Keyspace.ThrottlerConfig) + require.NotEmpty(t, keyspace.Keyspace.ThrottlerConfig.ThrottledApps, "throttler config: %+v", keyspace.Keyspace.ThrottlerConfig) + appRule, ok := keyspace.Keyspace.ThrottlerConfig.ThrottledApps[throttlerapp.OnlineDDLName.String()] + require.True(t, ok, "throttled apps: %v", keyspace.Keyspace.ThrottlerConfig.ThrottledApps) + require.NotNil(t, appRule) + assert.Equal(t, throttlerapp.OnlineDDLName.String(), appRule.Name) + assert.EqualValues(t, 1.0, appRule.Ratio) + expireAt := time.Unix(appRule.ExpiresAt.Seconds, int64(appRule.ExpiresAt.Nanoseconds)) + assert.True(t, expireAt.After(time.Now()), "expected rule to expire in the future: %v", expireAt) + }) + t.Run("unthrottling via ApplySchema", func(t *testing.T) { + vtctlParams := &cluster.ApplySchemaParams{DDLStrategy: "online"} + _, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput( + keyspaceName, "alter vitess_migration unthrottle all", *vtctlParams, + ) + assert.NoError(t, err) + }) + t.Run("validate keyspace configuration after unthrottle", func(t *testing.T) { + keyspace, err := clusterInstance.VtctldClientProcess.GetKeyspace(keyspaceName) + require.NoError(t, err) + require.NotNil(t, keyspace) + require.NotNil(t, keyspace.Keyspace.ThrottlerConfig) + require.NotEmpty(t, keyspace.Keyspace.ThrottlerConfig.ThrottledApps, "throttler config: %+v", keyspace.Keyspace.ThrottlerConfig) + appRule, ok := keyspace.Keyspace.ThrottlerConfig.ThrottledApps[throttlerapp.OnlineDDLName.String()] + require.True(t, ok, "throttled apps: %v", keyspace.Keyspace.ThrottlerConfig.ThrottledApps) + require.NotNil(t, appRule) + assert.Equal(t, throttlerapp.OnlineDDLName.String(), appRule.Name) + assert.EqualValues(t, 1.0, appRule.Ratio) + expireAt := time.Unix(appRule.ExpiresAt.Seconds, int64(appRule.ExpiresAt.Nanoseconds)) + assert.True(t, expireAt.Before(time.Now()), "expected rule to have expired, but it has not: %v", expireAt) + }) +} + func TestThrottlerAfterMetricsCollected(t *testing.T) { defer cluster.PanicHandler(t) diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index 592c64e7073..5397cf9343f 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -19,12 +19,14 @@ package schemamanager import ( "context" "fmt" + "strconv" "strings" "sync" "time" "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/schema" @@ -32,12 +34,15 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tmclient" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/proto/vtrpc" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // TabletExecutor applies schema changes to all tablets. @@ -146,7 +151,7 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) error { for _, sql := range sqls { stmt, err := exec.parser.Parse(sql) if err != nil { - return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err) } switch stmt.(type) { case sqlparser.DDLStatement: @@ -155,7 +160,7 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) error { case *sqlparser.AlterMigration: default: if len(exec.tablets) != 1 { - return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "non-ddl statements can only be executed for single shard keyspaces: %s", sql) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "non-ddl statements can only be executed for single shard keyspaces: %s", sql) } } } @@ -190,6 +195,76 @@ func (exec *TabletExecutor) isOnlineSchemaDDL(stmt sqlparser.Statement) (isOnlin return false } +func validateThrottleParams(alterMigrationType sqlparser.AlterMigrationType, expireString string, ratioLiteral *sqlparser.Literal) (duration time.Duration, ratio float64, err error) { + switch alterMigrationType { + case sqlparser.UnthrottleMigrationType, + sqlparser.UnthrottleAllMigrationType: + // Unthrottling is like throttling with duration=0 + duration = 0 + default: + duration = throttle.DefaultAppThrottleDuration + if expireString != "" { + duration, err = time.ParseDuration(expireString) + if err != nil || duration < 0 { + return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid EXPIRE value: %s. Try '120s', '30m', '1h', etc. Allowed units are (s)ec, (m)in, (h)hour", expireString) + } + } + } + ratio = 1.0 + if ratioLiteral != nil { + ratio, err = strconv.ParseFloat(ratioLiteral.Val, 64) + if err != nil || ratio < 0 || ratio > 1 { + return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid RATIO value: %s. Try any decimal number between '0.0' (no throttle) and `1.0` (fully throttled)", ratioLiteral.Val) + } + } + return duration, ratio, nil +} + +func (exec *TabletExecutor) executeAlterMigrationThrottle(ctx context.Context, alterMigration *sqlparser.AlterMigration) (err error) { + duration, ratio, err := validateThrottleParams(alterMigration.Type, alterMigration.Expire, alterMigration.Ratio) + if err != nil { + return err + } + expireAt := time.Now().Add(duration) + appName := alterMigration.UUID + if appName == "" { + appName = throttlerapp.OnlineDDLName.String() + } + throttledAppRule := &topodatapb.ThrottledAppRule{ + Name: appName, + ExpiresAt: protoutil.TimeToProto(expireAt), + Ratio: ratio, + } + + req := &vtctldatapb.UpdateThrottlerConfigRequest{ + Keyspace: exec.keyspace, + ThrottledApp: throttledAppRule, + } + + update := func(throttlerConfig *topodatapb.ThrottlerConfig) *topodatapb.ThrottlerConfig { + if throttlerConfig == nil { + throttlerConfig = &topodatapb.ThrottlerConfig{} + } + if throttlerConfig.ThrottledApps == nil { + throttlerConfig.ThrottledApps = make(map[string]*topodatapb.ThrottledAppRule) + } + throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp + return throttlerConfig + } + // We have already locked the keyspace + ki, err := exec.ts.GetKeyspace(ctx, req.Keyspace) + if err != nil { + return err + } + ki.ThrottlerConfig = update(ki.ThrottlerConfig) + err = exec.ts.UpdateKeyspace(ctx, ki) + if err != nil { + return err + } + _, err = exec.ts.UpdateSrvKeyspaceThrottlerConfig(ctx, req.Keyspace, []string{}, update) + return err +} + // executeSQL executes a single SQL statement either as online DDL or synchronously on all tablets. // In online DDL case, the query may be exploded into multiple queries during func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, providedUUID string, execResult *ExecuteResult) (executedAsynchronously bool, err error) { @@ -235,8 +310,21 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, provided exec.logger.Printf("%s\n", onlineDDL.UUID) return true, nil case *sqlparser.AlterMigration: - exec.executeOnAllTablets(ctx, execResult, sql, true) - return true, nil + switch stmt.Type { + case sqlparser.ThrottleMigrationType, + sqlparser.ThrottleAllMigrationType, + sqlparser.UnthrottleMigrationType, + sqlparser.UnthrottleAllMigrationType: + err := exec.executeAlterMigrationThrottle(ctx, stmt) + if err != nil { + execResult.ExecutorErr = err.Error() + return false, err + } + return true, nil + default: + exec.executeOnAllTablets(ctx, execResult, sql, true) + return true, nil + } } // Got here? The statement needs to be executed directly. return executeViaFetch() @@ -267,7 +355,7 @@ func allSQLsAreCreateQueries(sqls []string, parser *sqlparser.Parser) (bool, err for _, sql := range sqls { stmt, err := parser.Parse(sql) if err != nil { - return false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err) + return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err) } switch stmt.(type) { case *sqlparser.CreateTable, *sqlparser.CreateView: