diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index ce104fa94ec..cdb6b61f91a 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -1022,3 +1022,32 @@ func TestReadingUnresolvedTransactions(t *testing.T) { }) } } + +// TestSemiSyncRequiredWithTwoPC tests that semi-sync is required when using two-phase commit. +func TestSemiSyncRequiredWithTwoPC(t *testing.T) { + // cleanup all the old data. + conn, closer := start(t) + defer closer() + + out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none") + require.NoError(t, err, out) + defer clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") + + // After changing the durability policy for the given keyspace to none, we run PRS. + shard := clusterInstance.Keyspaces[0].Shards[2] + newPrimary := shard.Vttablets[1] + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( + "PlannedReparentShard", + fmt.Sprintf("%s/%s", keyspaceName, shard.Name), + "--new-primary", newPrimary.Alias) + require.NoError(t, err) + + // A new distributed transaction should fail. + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") + _, err = utils.ExecAllowError(t, conn, "commit") + require.Error(t, err) + require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not") +} diff --git a/go/test/endtoend/transaction/tx_test.go b/go/test/endtoend/transaction/tx_test.go index 40621a1d84b..475b17cfa2c 100644 --- a/go/test/endtoend/transaction/tx_test.go +++ b/go/test/endtoend/transaction/tx_test.go @@ -69,9 +69,10 @@ func TestMain(m *testing.M) { // Start keyspace keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: SchemaSQL, - VSchema: VSchema, + Name: keyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + DurabilityPolicy: "semi_sync", } if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { return 1, err diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index b34e94a16a7..1bd05493a59 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -22,11 +22,10 @@ import ( "strings" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/protoutil" - - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -348,6 +347,15 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string } defer tm.unlock() + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) + if err != nil { + return "", err + } + + // If semi-sync is enabled, we need to set two pc to be allowed. + // Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness.. + tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet) + // Setting super_read_only `OFF` so that we can run the DDL commands if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil { if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERUnknownSystemVariable { @@ -369,11 +377,6 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string return "", err } - semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) - if err != nil { - return "", err - } - // Set the server read-write, from now on we can accept real // client writes. Note that if semi-sync replication is enabled, // we'll still need some replicas to be able to commit transactions. @@ -595,6 +598,10 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e return err } + // If semi-sync is enabled, we need to set two pc to be allowed. + // Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness.. + tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet) + // If using semi-sync, we need to enable source-side. if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { return err @@ -911,12 +918,16 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str } defer tm.unlock() - pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv()) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return "", err } - semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) + // If semi-sync is enabled, we need to set two pc to be allowed. + // Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness.. + tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet) + + pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv()) if err != nil { return "", err } diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index 69d2edbfdc1..aae07fb96f6 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -95,6 +95,9 @@ type Controller interface { GetThrottlerStatus(ctx context.Context) *throttle.ThrottlerStatus RedoPreparedTransactions() + + // SetTwoPCAllowed sets whether TwoPC is allowed or not. + SetTwoPCAllowed(bool) } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index a08cd9dc635..94e540c9a28 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -53,6 +53,9 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } + if !dte.te.twopcAllowed { + return vterrors.VT10002("two-pc is enabled, but semi-sync is not") + } defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now()) dte.logStats.TransactionID = transactionID diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 62cc5ca32f0..05466721224 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1692,6 +1692,11 @@ func (tsv *TabletServer) RedoPreparedTransactions() { tsv.te.RedoPreparedTransactions() } +// SetTwoPCAllowed sets whether TwoPC is allowed or not. +func (tsv *TabletServer) SetTwoPCAllowed(allowed bool) { + tsv.te.twopcAllowed = allowed +} + // HandlePanic is part of the queryservice.QueryService interface func (tsv *TabletServer) HandlePanic(err *error) { if x := recover(); x != nil { diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index ea4e0b1e41d..c465dc00578 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -75,7 +75,11 @@ type TxEngine struct { // transition while creating new transactions beginRequests sync.WaitGroup - twopcEnabled bool + // twopcEnabled is the flag value of whether the user has enabled twopc or not. + twopcEnabled bool + // twopcAllowed is wether it is safe to allow two pc transactions or not. + // If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls. + twopcAllowed bool shutdownGracePeriod time.Duration coordinatorAddress string abandonAge time.Duration @@ -100,6 +104,9 @@ func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine { } limiter := txlimiter.New(env) te.txPool = NewTxPool(env, limiter) + // We initially allow twoPC (handles vttablet restarts). + // We will disallow them, when a new tablet is promoted if semi-sync is turned off. + te.twopcAllowed = true te.twopcEnabled = config.TwoPCEnable if te.twopcEnabled { if config.TwoPCAbandonAge <= 0 { diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 52bb71abcd9..1d9115392d2 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -229,6 +229,10 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott // RedoPreparedTransactions is part of the tabletserver.Controller interface func (tqsc *Controller) RedoPreparedTransactions() {} +// SetTwoPCAllowed sets whether TwoPC is allowed or not. +func (tqsc *Controller) SetTwoPCAllowed(bool) { +} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock()