From 16b05c151a4d32758830d97353d7fe80d8ffa5c7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 9 Jul 2024 22:00:49 -0400 Subject: [PATCH] VReplication: use new topo named locks and TTL override for workflow coordination (#16260) Signed-off-by: Matt Lord --- .../vreplication/common/switchtraffic.go | 3 + .../command/vreplication/common/utils.go | 3 +- .../command/vreplication/vdiff/vdiff.go | 3 +- go/test/endtoend/topotest/etcd2/main_test.go | 69 +++++++ .../vreplication/vreplication_test_env.go | 4 +- go/vt/topo/conn.go | 16 ++ go/vt/topo/consultopo/lock.go | 40 +++- go/vt/topo/consultopo/server.go | 2 +- go/vt/topo/etcd2topo/election.go | 2 +- go/vt/topo/etcd2topo/lock.go | 31 ++- go/vt/topo/faketopo/faketopo.go | 15 ++ go/vt/topo/keyspace_lock.go | 4 +- go/vt/topo/keyspace_lock_test.go | 35 +++- go/vt/topo/locks.go | 100 +++++++++- go/vt/topo/memorytopo/lock.go | 33 ++- go/vt/topo/named_lock.go | 58 ++++++ go/vt/topo/named_lock_test.go | 88 ++++++++ go/vt/topo/routing_rules_lock.go | 4 +- go/vt/topo/server.go | 1 + go/vt/topo/shard_lock.go | 6 +- go/vt/topo/stats_conn.go | 37 +++- go/vt/topo/stats_conn_test.go | 50 +++-- go/vt/topo/zk2topo/lock.go | 12 ++ go/vt/vtctl/workflow/server.go | 188 ++++++++++++++---- go/vt/vtctl/workflow/server_test.go | 8 +- go/vt/vtctl/workflow/switcher.go | 6 +- go/vt/vtctl/workflow/switcher_dry_run.go | 3 +- go/vt/vtctl/workflow/switcher_interface.go | 4 +- go/vt/vtctl/workflow/traffic_switcher.go | 2 - .../tabletmanager/vdiff/table_differ.go | 9 +- 30 files changed, 724 insertions(+), 112 deletions(-) create mode 100644 go/vt/topo/named_lock.go create mode 100644 go/vt/topo/named_lock_test.go diff --git a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go index 4004afc0ac0..3429c10303c 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go +++ b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go @@ -48,6 +48,9 @@ func GetSwitchTrafficCommand(opts *SubCommandsOpts) *cobra.Command { topodatapb.TabletType_RDONLY, } } + if SwitchTrafficOptions.Timeout.Seconds() < 1 { + return fmt.Errorf("timeout value must be at least 1 second") + } return nil }, RunE: commandSwitchTraffic, diff --git a/go/cmd/vtctldclient/command/vreplication/common/utils.go b/go/cmd/vtctldclient/command/vreplication/common/utils.go index cb408add490..da774647c38 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/utils.go +++ b/go/cmd/vtctldclient/command/vreplication/common/utils.go @@ -47,7 +47,6 @@ var ( } onDDLDefault = binlogdatapb.OnDDLAction_IGNORE.String() MaxReplicationLagDefault = 30 * time.Second - TimeoutDefault = 30 * time.Second BaseOptions = struct { Workflow string @@ -245,7 +244,7 @@ var SwitchTrafficOptions = struct { func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences bool) { cmd.Flags().StringSliceVarP(&SwitchTrafficOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to switch traffic in.") cmd.Flags().Var((*topoproto.TabletTypeListFlag)(&SwitchTrafficOptions.TabletTypes), "tablet-types", "Tablet types to switch traffic for.") - cmd.Flags().DurationVar(&SwitchTrafficOptions.Timeout, "timeout", TimeoutDefault, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.") + cmd.Flags().DurationVar(&SwitchTrafficOptions.Timeout, "timeout", workflow.DefaultTimeout, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.") cmd.Flags().DurationVar(&SwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", MaxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this.") cmd.Flags().BoolVar(&SwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover.") cmd.Flags().BoolVar(&SwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.") diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go index dfca3386491..9355049e39b 100644 --- a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go +++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" @@ -874,7 +875,7 @@ func registerCommands(root *cobra.Command) { create.Flags().StringSliceVar(&createOptions.TargetCells, "target-cells", nil, "The target cell(s) to compare with; default is any available cell.") create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.") create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.") - create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.") + create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", workflow.DefaultTimeout, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.") create.Flags().Int64Var(&createOptions.Limit, "limit", math.MaxInt64, "Max rows to stop comparing after.") create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.") create.Flags().Int64Var(&createOptions.MaxReportSampleRows, "max-report-sample-rows", 10, "Maximum number of row differences to report (0 for all differences). NOTE: when increasing this value it is highly recommended to also specify --only-pks") diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go index 747f2721cdc..67b0dbbc8f7 100644 --- a/go/test/endtoend/topotest/etcd2/main_test.go +++ b/go/test/endtoend/topotest/etcd2/main_test.go @@ -19,6 +19,7 @@ package ectd2 import ( "context" "flag" + "fmt" "os" "testing" "time" @@ -201,6 +202,74 @@ func TestKeyspaceLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true) } +// TestLockingWithTTL tests that locking with the TTL override works as intended. +func TestLockingWithTTL(t *testing.T) { + // Create the topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + ctx := context.Background() + + // Acquire a keyspace lock with a short custom TTL. + ttl := 1 * time.Second + ctx, unlock, err := ts.LockKeyspace(ctx, KeyspaceName, "TestLockingWithTTL", topo.WithTTL(ttl)) + require.NoError(t, err) + defer unlock(&err) + + // Check that CheckKeyspaceLocked DOES return an error after waiting more than + // the specified TTL as we should have lost our lock. + time.Sleep(ttl * 2) + err = topo.CheckKeyspaceLocked(ctx, KeyspaceName) + require.Error(t, err) +} + +// TestNamedLocking tests that named locking works as intended. +func TestNamedLocking(t *testing.T) { + // Create topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + ctx := context.Background() + lockName := "TestNamedLocking" + action := "Testing" + + // Acquire a named lock. + ctx, unlock, err := ts.LockName(ctx, lockName, action) + require.NoError(t, err) + + // Check that we can't reacquire it from the same context. + _, _, err = ts.LockName(ctx, lockName, action) + require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName)) + + // Check that CheckNameLocked doesn't return an error as we should still be + // holding the lock. + err = topo.CheckNameLocked(ctx, lockName) + require.NoError(t, err) + + // We'll now try to acquire the lock from a different goroutine. + secondCallerAcquired := false + go func() { + _, unlock, err := ts.LockName(context.Background(), lockName, action) + defer unlock(&err) + require.NoError(t, err) + secondCallerAcquired = true + }() + + // Wait for some time and ensure that the second attempt at acquiring the lock + // is blocked. + time.Sleep(100 * time.Millisecond) + require.False(t, secondCallerAcquired) + + // Unlock the name. + unlock(&err) + // Check that we no longer have the named lock. + err = topo.CheckNameLocked(ctx, lockName) + require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName)) + + // Wait to see that the second goroutine WAS now able to acquire the named lock. + topoutils.WaitForBoolValue(t, &secondCallerAcquired, true) +} + func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { t.Helper() var res []*sqltypes.Result diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 238242f0e65..c62d871380d 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -17,9 +17,9 @@ limitations under the License. package vreplication var dryRunResultsSwitchWritesCustomerShard = []string{ - "Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [PRIMARY]", "Lock keyspace product", "Lock keyspace customer", + "Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [PRIMARY]", "/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:", "Wait for vreplication on stopped streams to catchup for up to 30s", "Create reverse vreplication workflow p2c_reverse", @@ -36,8 +36,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ } var dryRunResultsReadCustomerShard = []string{ - "Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [RDONLY,REPLICA]", "Lock keyspace product", + "Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [RDONLY,REPLICA]", "Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]", "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", "Serving VSchema will be rebuilt for the customer keyspace", diff --git a/go/vt/topo/conn.go b/go/vt/topo/conn.go index 348fc569ecf..b00bdc67207 100644 --- a/go/vt/topo/conn.go +++ b/go/vt/topo/conn.go @@ -19,6 +19,7 @@ package topo import ( "context" "sort" + "time" ) // Conn defines the interface that must be implemented by topology @@ -120,6 +121,21 @@ type Conn interface { // Returns ErrInterrupted if ctx is canceled. Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) + // LockWithTTL is similar to `Lock` but the difference is that it allows + // you to override the global default TTL that is configured for the + // implementation (--topo_etcd_lease_ttl and --topo_consul_lock_session_ttl). + // Note: this is no different than `Lock` for ZooKeeper as it does not + // support lock TTLs and they exist until released or the session ends. + LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (LockDescriptor, error) + + // LockName is similar to `Lock` but the difference is that it does not require + // the path to exist and have children in order to lock it. This is because with + // named locks you are NOT locking an actual topo entity such as a Keyspace record. + // Because this lock is not blocking any Vitess operations OTHER than another + // caller that is trying to get the same named lock, there is a static 24 hour + // TTL on them to ensure that they are eventually cleaned up. + LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error) + // TryLock takes lock on the given directory with a fail-fast approach. // It is similar to `Lock` but the difference is it attempts to acquire the lock // if it is likely to succeed. If there is already a lock on given path, then unlike `Lock` diff --git a/go/vt/topo/consultopo/lock.go b/go/vt/topo/consultopo/lock.go index ae47b91cc6c..49554474677 100644 --- a/go/vt/topo/consultopo/lock.go +++ b/go/vt/topo/consultopo/lock.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path" + "time" "github.com/hashicorp/consul/api" @@ -49,7 +50,27 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD return nil, convertError(err, dirPath) } - return s.lock(ctx, dirPath, contents) + return s.lock(ctx, dirPath, contents, s.lockTTL) +} + +// LockWithTTL is part of the topo.Conn interface. +func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) { + // We list the directory first to make sure it exists. + if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil { + // We need to return the right error codes, like + // topo.ErrNoNode and topo.ErrInterrupted, and the + // easiest way to do this is to return convertError(err). + // It may lose some of the context, if this is an issue, + // maybe logging the error would work here. + return nil, convertError(err, dirPath) + } + + return s.lock(ctx, dirPath, contents, ttl.String()) +} + +// LockName is part of the topo.Conn interface. +func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + return s.lock(ctx, dirPath, contents, topo.NamedLockTTL.String()) } // TryLock is part of the topo.Conn interface. @@ -74,11 +95,11 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo } // everything is good let's acquire the lock. - return s.lock(ctx, dirPath, contents) + return s.lock(ctx, dirPath, contents, s.lockTTL) } // Lock is part of the topo.Conn interface. -func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { +func (s *Server) lock(ctx context.Context, dirPath, contents, ttl string) (topo.LockDescriptor, error) { lockPath := path.Join(s.root, dirPath, locksFilename) lockOpts := &api.LockOptions{ @@ -90,12 +111,19 @@ func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockD }, } lockOpts.SessionOpts.Checks = s.lockChecks - if s.lockDelay > 0 { - lockOpts.SessionOpts.LockDelay = s.lockDelay - } if s.lockTTL != "" { + // Override the API default with the global default from + // --topo_consul_lock_session_ttl. lockOpts.SessionOpts.TTL = s.lockTTL } + if ttl != "" { + // Override the global default with the one provided by the + // caller. + lockOpts.SessionOpts.TTL = ttl + } + if s.lockDelay > 0 { + lockOpts.SessionOpts.LockDelay = s.lockDelay + } // Build the lock structure. l, err := s.client.LockOpts(lockOpts) if err != nil { diff --git a/go/vt/topo/consultopo/server.go b/go/vt/topo/consultopo/server.go index a7a5446c274..ab61a40b1e8 100644 --- a/go/vt/topo/consultopo/server.go +++ b/go/vt/topo/consultopo/server.go @@ -111,7 +111,7 @@ type Server struct { locks map[string]*lockInstance lockChecks []string - lockTTL string + lockTTL string // This is the default used for all non-named locks lockDelay time.Duration } diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go index 276d9e60355..94768b50470 100644 --- a/go/vt/topo/etcd2topo/election.go +++ b/go/vt/topo/etcd2topo/election.go @@ -91,7 +91,7 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error) // Try to get the primaryship, by getting a lock. var err error - ld, err = mp.s.lock(lockCtx, electionPath, mp.id) + ld, err = mp.s.lock(lockCtx, electionPath, mp.id, leaseTTL) if err != nil { // It can be that we were interrupted. return nil, err diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go index 89095156471..7fb761611cd 100644 --- a/go/vt/topo/etcd2topo/lock.go +++ b/go/vt/topo/etcd2topo/lock.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path" + "time" "github.com/spf13/pflag" @@ -34,7 +35,7 @@ import ( ) var ( - leaseTTL = 30 + leaseTTL = 30 // This is the default used for all non-named locks ) func init() { @@ -153,7 +154,7 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo } // everything is good let's acquire the lock. - return s.lock(ctx, dirPath, contents) + return s.lock(ctx, dirPath, contents, leaseTTL) } // Lock is part of the topo.Conn interface. @@ -168,15 +169,35 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD return nil, convertError(err, dirPath) } - return s.lock(ctx, dirPath, contents) + return s.lock(ctx, dirPath, contents, leaseTTL) +} + +// LockWithTTL is part of the topo.Conn interface. +func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) { + // We list the directory first to make sure it exists. + if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil { + // We need to return the right error codes, like + // topo.ErrNoNode and topo.ErrInterrupted, and the + // easiest way to do this is to return convertError(err). + // It may lose some of the context, if this is an issue, + // maybe logging the error would work here. + return nil, convertError(err, dirPath) + } + + return s.lock(ctx, dirPath, contents, int(ttl.Seconds())) +} + +// LockName is part of the topo.Conn interface. +func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + return s.lock(ctx, dirPath, contents, int(topo.NamedLockTTL.Seconds())) } // lock is used by both Lock() and primary election. -func (s *Server) lock(ctx context.Context, nodePath, contents string) (topo.LockDescriptor, error) { +func (s *Server) lock(ctx context.Context, nodePath, contents string, ttl int) (topo.LockDescriptor, error) { nodePath = path.Join(s.root, nodePath, locksPath) // Get a lease, set its KeepAlive. - lease, err := s.cli.Grant(ctx, int64(leaseTTL)) + lease, err := s.cli.Grant(ctx, int64(ttl)) if err != nil { return nil, convertError(err, nodePath) } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 52a2a41c5df..0c88b95e3da 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -20,6 +20,7 @@ import ( "context" "strings" "sync" + "time" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -291,6 +292,20 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc return &fakeLockDescriptor{}, nil } +// LockWithTTL implements the Conn interface. +func (f *FakeConn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) { + f.mu.Lock() + defer f.mu.Unlock() + return &fakeLockDescriptor{}, nil +} + +// LockName implements the Conn interface. +func (f *FakeConn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + f.mu.Lock() + defer f.mu.Unlock() + return &fakeLockDescriptor{}, nil +} + // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (f *FakeConn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { return f.Lock(ctx, dirPath, contents) diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index 7df1b2ee64f..fe9da9a5f8b 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -43,10 +43,10 @@ func (s *keyspaceLock) Path() string { // - a context with a locksInfo structure for future reference. // - an unlock method // - an error if anything failed. -func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) { +func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string, opts ...LockOption) (context.Context, func(*error), error) { return ts.internalLock(ctx, &keyspaceLock{ keyspace: keyspace, - }, action, true) + }, action, opts...) } // CheckKeyspaceLocked can be called on a context to make sure we have the lock diff --git a/go/vt/topo/keyspace_lock_test.go b/go/vt/topo/keyspace_lock_test.go index 6d0a34de554..35fd804db9a 100644 --- a/go/vt/topo/keyspace_lock_test.go +++ b/go/vt/topo/keyspace_lock_test.go @@ -19,12 +19,14 @@ package topo_test import ( "context" "testing" + "time" "github.com/stretchr/testify/require" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // TestTopoKeyspaceLock tests keyspace lock operations. @@ -82,3 +84,34 @@ func TestTopoKeyspaceLock(t *testing.T) { require.NoError(t, err) defer unlock(&err) } + +// TestTopoKeyspaceLockWithTTL tests keyspace lock with a custom TTL. +func TestTopoKeyspaceLockWithTTL(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, tsf := memorytopo.NewServerAndFactory(ctx, "zone1") + defer ts.Close() + + currentTopoLockTimeout := topo.LockTimeout + topo.LockTimeout = testLockTimeout + defer func() { + topo.LockTimeout = currentTopoLockTimeout + }() + + ks1 := "ks1" + ttl := time.Second + err := ts.CreateKeyspace(ctx, ks1, &topodatapb.Keyspace{}) + require.NoError(t, err) + + ctx, unlock, err := ts.LockKeyspace(ctx, ks1, ks1, topo.WithTTL(ttl)) + require.NoError(t, err) + defer unlock(&err) + + err = topo.CheckKeyspaceLocked(ctx, ks1) + require.NoError(t, err) + + // Confirm the new stats. + stats := tsf.GetCallStats() + require.NotNil(t, stats) + require.Equal(t, int64(1), stats.Counts()["LockWithTTL"]) +} diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index 16caaf49dfb..f46e5f06e4b 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -46,6 +46,11 @@ var ( RemoteOperationTimeout = 15 * time.Second ) +// How long named locks are kept in the topo server. +// This ensures that orphaned named locks are not kept around forever. +// This should never happen, but it provides a final safety net. +const NamedLockTTL = 24 * time.Hour + // Lock describes a long-running lock on a keyspace or a shard. // It needs to be public as we JSON-serialize it. type Lock struct { @@ -54,6 +59,7 @@ type Lock struct { HostName string UserName string Time string + Options lockOptions // Status is the current status of the Lock. Status string @@ -120,6 +126,28 @@ type locksKeyType int var locksKey locksKeyType +// Support different lock types. +type LockType int + +const ( + // Blocking is the default lock type when no other valid type + // is specified. + Blocking LockType = iota + NonBlocking // Uses TryLock + Named // Uses LockName +) + +func (lt LockType) String() string { + switch lt { + case NonBlocking: + return "non blocking" + case Named: + return "named" + default: + return "blocking" + } +} + // iTopoLock is the interface for knowing the resource that is being locked. // It allows for better controlling nuances for different lock types and log messages. type iTopoLock interface { @@ -129,8 +157,11 @@ type iTopoLock interface { } // perform the topo lock operation -func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, isBlocking bool) (LockDescriptor, error) { - log.Infof("Locking %v %v for action %v", lt.Type(), lt.ResourceName(), l.Action) +func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, opts ...LockOption) (LockDescriptor, error) { + for _, o := range opts { + o.apply(&l.Options) + } + log.Infof("Locking %s %s for action %s with options: %+v", lt.Type(), lt.ResourceName(), l.Action, l.Options) ctx, cancel := context.WithTimeout(ctx, LockTimeout) defer cancel() @@ -143,10 +174,18 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, isBlocking bo if err != nil { return nil, err } - if isBlocking { + + switch l.Options.lockType { + case NonBlocking: + return ts.globalCell.TryLock(ctx, lt.Path(), j) + case Named: + return ts.globalCell.LockName(ctx, lt.Path(), j) + default: + if l.Options.ttl != 0 { + return ts.globalCell.LockWithTTL(ctx, lt.Path(), j, l.Options.ttl) + } return ts.globalCell.Lock(ctx, lt.Path(), j) } - return ts.globalCell.TryLock(ctx, lt.Path(), j) } // unlock unlocks a previously locked key. @@ -174,7 +213,7 @@ func (l *Lock) unlock(ctx context.Context, lt iTopoLock, lockDescriptor LockDesc return lockDescriptor.Unlock(ctx) } -func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, isBlocking bool) (context.Context, func(*error), error) { +func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, opts ...LockOption) (context.Context, func(*error), error) { i, ok := ctx.Value(locksKey).(*locksInfo) if !ok { i = &locksInfo{ @@ -191,7 +230,7 @@ func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, // lock it l := newLock(action) - lockDescriptor, err := l.lock(ctx, ts, lt, isBlocking) + lockDescriptor, err := l.lock(ctx, ts, lt, opts...) if err != nil { return nil, nil, err } @@ -246,3 +285,52 @@ func checkLocked(ctx context.Context, lt iTopoLock) error { // Check the lock server implementation still holds the lock. return li.lockDescriptor.Check(ctx) } + +// lockOptions configure a Lock call. lockOptions are set by the LockOption +// values passed to the lock functions. +type lockOptions struct { + lockType LockType + ttl time.Duration +} + +// LockOption configures how we perform the locking operation. +type LockOption interface { + apply(*lockOptions) +} + +// funcLockOption wraps a function that modifies lockOptions into an +// implementation of the LockOption interface. +type funcLockOption struct { + f func(*lockOptions) +} + +func (flo *funcLockOption) apply(lo *lockOptions) { + flo.f(lo) +} + +func newFuncLockOption(f func(*lockOptions)) *funcLockOption { + return &funcLockOption{ + f: f, + } +} + +// WithTTL allows you to specify how long the underlying topo server +// implementation should hold the lock before releasing it — even if the caller +// has not explicitly released it. This provides a way to override the global +// ttl values that are set via --topo_consul_lock_session_ttl and +// --topo_etcd_lease_ttl. +// Note: This option is ignored by the ZooKeeper implementation as it does not +// support TTLs. +func WithTTL(ttl time.Duration) LockOption { + return newFuncLockOption(func(o *lockOptions) { + o.ttl = ttl + }) +} + +// WithType determines the type of lock we take. The options are defined +// by the LockType type. +func WithType(lt LockType) LockOption { + return newFuncLockOption(func(o *lockOptions) { + o.lockType = lt + }) +} diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index d0943c7058d..0dafcf250ed 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -19,6 +19,7 @@ package memorytopo import ( "context" "fmt" + "time" "vitess.io/vitess/go/vt/topo" ) @@ -65,11 +66,32 @@ func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDes return nil, err } - return c.lock(ctx, dirPath, contents) + return c.lock(ctx, dirPath, contents, false) +} + +// LockWithTTL is part of the topo.Conn interface. It behaves the same as Lock +// as TTLs are not supported in memorytopo. +func (c *Conn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) { + c.factory.callstats.Add([]string{"LockWithTTL"}, 1) + + c.factory.mu.Lock() + err := c.factory.getOperationError(Lock, dirPath) + c.factory.mu.Unlock() + if err != nil { + return nil, err + } + + return c.lock(ctx, dirPath, contents, false) +} + +// LockName is part of the topo.Conn interface. +func (c *Conn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + c.factory.callstats.Add([]string{"LockName"}, 1) + return c.lock(ctx, dirPath, contents, true) } // Lock is part of the topo.Conn interface. -func (c *Conn) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { +func (c *Conn) lock(ctx context.Context, dirPath, contents string, named bool) (topo.LockDescriptor, error) { for { if err := c.dial(ctx); err != nil { return nil, err @@ -82,7 +104,12 @@ func (c *Conn) lock(ctx context.Context, dirPath, contents string) (topo.LockDes return nil, c.factory.err } - n := c.factory.nodeByPath(c.cell, dirPath) + var n *node + if named { + n = c.factory.getOrCreatePath(c.cell, dirPath) + } else { + n = c.factory.nodeByPath(c.cell, dirPath) + } if n == nil { c.factory.mu.Unlock() return nil, topo.NewError(topo.NoNode, dirPath) diff --git a/go/vt/topo/named_lock.go b/go/vt/topo/named_lock.go new file mode 100644 index 00000000000..533317f8d9d --- /dev/null +++ b/go/vt/topo/named_lock.go @@ -0,0 +1,58 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo + +import ( + "context" + "path" +) + +type namedLock struct { + name string +} + +var _ iTopoLock = (*namedLock)(nil) + +func (s *namedLock) Type() string { + return "named" +} + +func (s *namedLock) ResourceName() string { + return s.name +} + +func (s *namedLock) Path() string { + return path.Join(NamedLocksPath, s.name) +} + +// LockName will lock the opaque identifier, and return: +// - a context with a locksInfo structure for future reference. +// - an unlock method +// - an error if anything failed. +func (ts *Server) LockName(ctx context.Context, name, action string) (context.Context, func(*error), error) { + return ts.internalLock(ctx, &namedLock{ + name: name, + }, action, WithType(Named)) +} + +// CheckNameLocked can be called on a context to make sure we have the lock +// for a given opaque identifier. +func CheckNameLocked(ctx context.Context, name string) error { + return checkLocked(ctx, &namedLock{ + name: name, + }) +} diff --git a/go/vt/topo/named_lock_test.go b/go/vt/topo/named_lock_test.go new file mode 100644 index 00000000000..a2bb82c73e9 --- /dev/null +++ b/go/vt/topo/named_lock_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo_test + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +// TestTopoNamedLock tests named lock operations. +func TestTopoNamedLock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, tsf := memorytopo.NewServerAndFactory(ctx, "zone1") + defer ts.Close() + + currentTopoLockTimeout := topo.LockTimeout + topo.LockTimeout = testLockTimeout + defer func() { + topo.LockTimeout = currentTopoLockTimeout + }() + + lockName := "testy" + action := "testing" + lockNameCalls := int64(0) + + ctx, unlock, err := ts.LockName(ctx, lockName, action) + require.NoError(t, err) + lockNameCalls++ + + // Locking the same name again, without unlocking, should return an error. + // This does not attempt the lock within the topo server implementation + // as we first check the context and see that the lock is held, thus the + // lockNameCalls should not be increased. + _, _, err = ts.LockName(ctx, lockName, action) + require.ErrorContains(t, err, fmt.Sprintf("%s is already held", lockName)) + + // Check that we have the named lock. + err = topo.CheckNameLocked(ctx, lockName) + require.NoError(t, err) + + // Confirm that we can acquire a different named lock. + lockName2 := "testy2" + ctx, unlock2, err := ts.LockName(ctx, lockName2, action) + require.NoError(t, err) + defer unlock2(&err) + lockNameCalls++ + + // Unlock the first name. + unlock(&err) + + // Confirm that we no longer have the first named lock. + err = topo.CheckNameLocked(ctx, lockName) + require.ErrorContains(t, err, fmt.Sprintf("%s is not locked", lockName)) + err = topo.CheckNameLocked(ctx, lockName2) + require.NoError(t, err) + + // Confirm that the first named lock can be re-acquired after unlocking. + _, unlock, err = ts.LockName(ctx, lockName, action) + require.NoError(t, err) + defer unlock(&err) + lockNameCalls++ + + // Confirm the stats. + stats := tsf.GetCallStats() + require.NotNil(t, stats) + require.Equal(t, lockNameCalls, stats.Counts()["LockName"]) +} diff --git a/go/vt/topo/routing_rules_lock.go b/go/vt/topo/routing_rules_lock.go index c45ddb738c9..682eff30dc5 100644 --- a/go/vt/topo/routing_rules_lock.go +++ b/go/vt/topo/routing_rules_lock.go @@ -37,8 +37,8 @@ func (s *routingRules) Path() string { } // LockRoutingRules acquires a lock for routing rules. -func (ts *Server) LockRoutingRules(ctx context.Context, action string) (context.Context, func(*error), error) { - return ts.internalLock(ctx, &routingRules{}, action, true) +func (ts *Server) LockRoutingRules(ctx context.Context, action string, opts ...LockOption) (context.Context, func(*error), error) { + return ts.internalLock(ctx, &routingRules{}, action, opts...) } // CheckRoutingRulesLocked checks if a lock for routing rules is still possessed. diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index f2a3e8774e7..23b7be6cfa1 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -95,6 +95,7 @@ const ( ExternalClusterVitess = "vitess" RoutingRulesPath = "routing_rules" KeyspaceRoutingRulesPath = "keyspace" + NamedLocksPath = "internal/named_locks" ) // Factory is a factory interface to create Conn objects. diff --git a/go/vt/topo/shard_lock.go b/go/vt/topo/shard_lock.go index 72d0b1c8ca4..23326dcd23f 100644 --- a/go/vt/topo/shard_lock.go +++ b/go/vt/topo/shard_lock.go @@ -59,11 +59,11 @@ func (s *shardLock) Path() string { // // * operations that we don't want to conflict with re-parenting: // - DeleteTablet when it's the shard's current primary -func (ts *Server) LockShard(ctx context.Context, keyspace, shard, action string) (context.Context, func(*error), error) { +func (ts *Server) LockShard(ctx context.Context, keyspace, shard, action string, opts ...LockOption) (context.Context, func(*error), error) { return ts.internalLock(ctx, &shardLock{ keyspace: keyspace, shard: shard, - }, action, true) + }, action, opts...) } // TryLockShard will lock the shard, and return: @@ -85,7 +85,7 @@ func (ts *Server) TryLockShard(ctx context.Context, keyspace, shard, action stri return ts.internalLock(ctx, &shardLock{ keyspace: keyspace, shard: shard, - }, action, false) + }, action, WithType(NonBlocking)) } // CheckShardLocked can be called on a context to make sure we have the lock diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 34c45d793ac..39bc8c9bc43 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -159,17 +159,33 @@ func (st *StatsConn) Delete(ctx context.Context, filePath string, version Versio // Lock is part of the Conn interface func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { - return st.internalLock(ctx, dirPath, contents, true) + return st.internalLock(ctx, dirPath, contents, Blocking, 0) +} + +// LockWithTTL is part of the Conn interface +func (st *StatsConn) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (LockDescriptor, error) { + return st.internalLock(ctx, dirPath, contents, Blocking, ttl) +} + +// LockName is part of the Conn interface +func (st *StatsConn) LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { + return st.internalLock(ctx, dirPath, contents, Named, 0) } // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (st *StatsConn) TryLock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { - return st.internalLock(ctx, dirPath, contents, false) + return st.internalLock(ctx, dirPath, contents, NonBlocking, 0) } // TryLock is part of the topo.Conn interface. Its implementation is same as Lock -func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, isBlocking bool) (LockDescriptor, error) { - statsKey := []string{"Lock", st.cell} +func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, lockType LockType, ttl time.Duration) (LockDescriptor, error) { + statsKey := []string{"Lock", st.cell} // Also used for NonBlocking / TryLock + switch { + case lockType == Named: + statsKey[0] = "LockName" + case ttl != 0: + statsKey[0] = "LockWithTTL" + } if st.readOnly { return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], dirPath) } @@ -177,10 +193,17 @@ func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, defer topoStatsConnTimings.Record(statsKey, startTime) var res LockDescriptor var err error - if isBlocking { - res, err = st.conn.Lock(ctx, dirPath, contents) - } else { + switch lockType { + case NonBlocking: res, err = st.conn.TryLock(ctx, dirPath, contents) + case Named: + res, err = st.conn.LockName(ctx, dirPath, contents) + default: + if ttl != 0 { + res, err = st.conn.LockWithTTL(ctx, dirPath, contents, ttl) + } else { + res, err = st.conn.Lock(ctx, dirPath, contents) + } } if err != nil { topoStatsConnErrors.Add(statsKey, int64(1)) diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 78f9cc6eb72..605487697cc 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -20,6 +20,9 @@ import ( "context" "fmt" "testing" + "time" + + "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -108,7 +111,28 @@ func (st *fakeConn) Lock(ctx context.Context, dirPath, contents string) (lock Lo } if dirPath == "error" { return lock, fmt.Errorf("dummy error") + } + return lock, err +} +// LockWithTTL is part of the Conn interface. +func (st *fakeConn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (lock LockDescriptor, err error) { + if st.readOnly { + return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, "topo server connection is read-only") + } + if dirPath == "error" { + return lock, fmt.Errorf("dummy error") + } + return lock, err +} + +// LockName is part of the Conn interface. +func (st *fakeConn) LockName(ctx context.Context, dirPath, contents string) (lock LockDescriptor, err error) { + if st.readOnly { + return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, "topo server connection is read-only") + } + if dirPath == "error" { + return lock, fmt.Errorf("dummy error") } return lock, err } @@ -121,7 +145,6 @@ func (st *fakeConn) TryLock(ctx context.Context, dirPath, contents string) (lock } if dirPath == "error" { return lock, fmt.Errorf("dummy error") - } return lock, err } @@ -140,7 +163,6 @@ func (st *fakeConn) WatchRecursive(ctx context.Context, path string) (current [] func (st *fakeConn) NewLeaderParticipation(name, id string) (mp LeaderParticipation, err error) { if name == "error" { return mp, fmt.Errorf("dummy error") - } return mp, err } @@ -302,23 +324,25 @@ func TestStatsConnTopoLock(t *testing.T) { statsConn.Lock(ctx, "", "") timingCounts := topoStatsConnTimings.Counts()["Lock.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, timingCounts, int64(1)) - // error is zero before getting an error + statsConn.LockWithTTL(ctx, "", "", time.Second) + timingCounts = topoStatsConnTimings.Counts()["LockWithTTL.global"] + require.Equal(t, timingCounts, int64(1)) + + statsConn.LockName(ctx, "", "") + timingCounts = topoStatsConnTimings.Counts()["LockName.global"] + require.Equal(t, timingCounts, int64(1)) + + // Error is zero before getting an error. errorCount := topoStatsConnErrors.Counts()["Lock.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, errorCount, int64(0)) statsConn.Lock(ctx, "error", "") - // error stats gets emitted + // Error stats gets emitted. errorCount = topoStatsConnErrors.Counts()["Lock.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, errorCount, int64(1)) } // TestStatsConnTopoWatch emits stats on Watch diff --git a/go/vt/topo/zk2topo/lock.go b/go/vt/topo/zk2topo/lock.go index 5baf1f7f33f..fdd9fbd0137 100644 --- a/go/vt/topo/zk2topo/lock.go +++ b/go/vt/topo/zk2topo/lock.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path" + "time" "github.com/z-division/go-zookeeper/zk" @@ -42,6 +43,17 @@ func (zs *Server) Lock(ctx context.Context, dirPath, contents string) (topo.Lock return zs.lock(ctx, dirPath, contents) } +// LockWithTTL is part of the topo.Conn interface. It behaves the same as Lock +// as TTLs are not supported in Zookeeper. +func (zs *Server) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) { + return zs.lock(ctx, dirPath, contents) +} + +// LockName is part of the topo.Conn interface. +func (zs *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + return zs.lock(ctx, dirPath, contents) +} + // TryLock is part of the topo.Conn interface. func (zs *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { // We list all the entries under dirPath diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 86a3a6c6ead..5b6c3f05343 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -81,6 +81,8 @@ const ( rdonlyTabletSuffix = "@rdonly" // Globally routable tables don't have a keyspace prefix. globalTableQualifier = "" + // Default duration used for lag, timeout, etc. + DefaultTimeout = 30 * time.Second ) var tabletTypeSuffixes = []string{primaryTabletSuffix, replicaTabletSuffix, rdonlyTabletSuffix} @@ -130,9 +132,6 @@ const ( lockTablesCycles = 2 // Time to wait between LOCK TABLES cycles on the sources during SwitchWrites. lockTablesCycleDelay = time.Duration(100 * time.Millisecond) - - // Default duration used for lag, timeout, etc. - defaultDuration = 30 * time.Second ) var ( @@ -1458,13 +1457,21 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } sw := &switcher{s: s, ts: ts} - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") + + // When creating the workflow, locking the workflow and its target keyspace is sufficient. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "MoveTablesCreate") + if lockErr != nil { + ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + return nil, lockErr + } + defer workflowUnlock(&err) + ctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") if lockErr != nil { ts.Logger().Errorf("Locking target keyspace %s failed: %v", ts.TargetKeyspaceName(), lockErr) return nil, lockErr } defer targetUnlock(&err) - ctx = lockCtx // If we get an error after this point, where the vreplication streams/records // have been created, then we clean up the workflow's artifacts. @@ -2534,7 +2541,7 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) { s.sem.Release(1) } }() - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) defer cancel() sqlOptimizeTable := "optimize table _vt.copy_state" if _, err := s.tmc.ExecuteFetchAsAllPrivs(ctx, tablet, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{ @@ -2570,24 +2577,30 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, } else { sw = &switcher{s: s, ts: ts} } - var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") + + // Lock the workflow along with its source and target keyspaces. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropTargets") + if lockErr != nil { + ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + } + defer workflowUnlock(&err) + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) - ctx = tctx - if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) - ctx = tctx + ctx = lockCtx } + if !keepData { switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: @@ -2762,23 +2775,30 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy } else { sw = &switcher{ts: ts, s: s} } - var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") + + // Lock the workflow and its source and target keyspaces. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropSources") + if lockErr != nil { + ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + } + defer workflowUnlock(&err) + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) - ctx = tctx if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) - ctx = tctx + ctx = lockCtx } + if !force { if err := sw.validateWorkflowHasCompleted(ctx); err != nil { ts.Logger().Errorf("Workflow has not completed, cannot DropSources: %v", err) @@ -2996,14 +3016,21 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche } else { sw = &switcher{s: s, ts: ts} } - var tctx context.Context - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") + + // Lock the workflow and its target keyspace. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "completeMigrateWorkflow") + if lockErr != nil { + ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + } + defer workflowUnlock(&err) + ctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) - ctx = tctx + if err := sw.dropTargetVReplicationStreams(ctx); err != nil { return nil, err } @@ -3031,13 +3058,19 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor rdDryRunResults, wrDryRunResults *[]string hasReplica, hasRdonly, hasPrimary bool ) - timeout, set, err := protoutil.DurationFromProto(req.Timeout) + timeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) if err != nil { err = vterrors.Wrapf(err, "unable to parse Timeout into a valid duration") return nil, err } if !set { - timeout = defaultDuration + timeout = DefaultTimeout + } + // We enforce the 1 second minimum as some things that use it, such as Etcd, only takes + // a seconds value so you'd get unexpected behavior if you e.g. set the timeout to + // 500ms as Etcd would get a value of 0 or a never-ending TTL. + if timeout.Seconds() < 1 { + return nil, vterrors.Wrap(err, "Timeout must be at least 1 second") } ts, startState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) if err != nil { @@ -3054,7 +3087,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, err } if !set { - maxReplicationLagAllowed = defaultDuration + maxReplicationLagAllowed = DefaultTimeout } direction := TrafficSwitchDirection(req.Direction) if direction == DirectionBackward { @@ -3237,18 +3270,37 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError("workflow validation failed", err) } - // Remove mirror rules for the specified tablet types. - if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil { - return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types", - ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + // For switching reads, locking the source keyspace is sufficient. + // We need to hold the keyspace locks longer than the command timeout. + ksLockTTL, set, err := protoutil.DurationFromProto(req.GetTimeout()) + if err != nil { + return nil, vterrors.Wrapf(err, "unable to parse Timeout into a valid duration") + } + if !set { + ksLockTTL = DefaultTimeout } // For reads, locking the source keyspace is sufficient. - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") + ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTTL)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer unlock(&err) + confirmKeyspaceLocksHeld := func() error { + if req.DryRun { // We don't actually take locks + return nil + } + if err := topo.CheckKeyspaceLocked(ctx, ts.SourceKeyspaceName()); err != nil { + return vterrors.Wrapf(err, "%s keyspace lock was lost", ts.SourceKeyspaceName()) + } + return nil + } + + // Remove mirror rules for the specified tablet types. + if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil { + return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types", + ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { switch { @@ -3268,11 +3320,18 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } return sw.logs(), nil } + + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the shards", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", @@ -3283,7 +3342,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } // switchWrites is a generic way of migrating write traffic for a workflow. -func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, timeout time.Duration, +func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, waitTimeout time.Duration, cancel bool, ) (journalID int64, dryRunResults *[]string, err error) { var sw iswitcher @@ -3315,27 +3374,53 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } - // Remove mirror rules for the primary tablet type. - if err := sw.mirrorTableTraffic(ctx, []topodata.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { - return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type", - ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + // Lock the workflow and its source and target keyspaces. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + if lockErr != nil { + return handleError(fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr) } + defer workflowUnlock(&err) + + // We need to hold the keyspace locks longer than waitTimeout*X -- where X + // is the number of sub-steps where the waitTimeout value is used: stopping + // existing streams, waiting for replication to catch up, and initializing + // the target sequences -- to be sure the lock is not lost. + ksLockTTL := waitTimeout * 3 // Need to lock both source and target keyspaces. - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTTL)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } - ctx = tctx defer sourceUnlock(&err) + if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTTL)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } - ctx = tctx + ctx = lockCtx defer targetUnlock(&err) } + confirmKeyspaceLocksHeld := func() error { + if req.DryRun { // We don't actually take locks + return nil + } + if err := topo.CheckKeyspaceLocked(ctx, ts.SourceKeyspaceName()); err != nil { + return vterrors.Wrapf(err, "%s keyspace lock was lost", ts.SourceKeyspaceName()) + } + if err := topo.CheckKeyspaceLocked(ctx, ts.TargetKeyspaceName()); err != nil { + return vterrors.Wrapf(err, "%s keyspace lock was lost", ts.TargetKeyspaceName()) + } + return nil + } + + // Remove mirror rules for the primary tablet type. + if err := sw.mirrorTableTraffic(ctx, []topodata.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { + return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type", + ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + } // Find out if the target is using any sequence tables for auto_increment // value generation. If so, then we'll need to ensure that they are @@ -3386,7 +3471,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // materializations then we have to wait for them to catchup before switching traffic for the // Reshard workflow. We use the the same timeout value here that is used for VReplication catchup // with the inter-keyspace workflows. - stopCtx, stopCancel := context.WithTimeout(ctx, timeout) + stopCtx, stopCancel := context.WithTimeout(ctx, waitTimeout) defer stopCancel() sourceWorkflows, err = sw.stopStreams(stopCtx, sm) if err != nil { @@ -3414,37 +3499,51 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("Waiting for streams to catchup") - if err := sw.waitForCatchup(ctx, timeout); err != nil { + if err := sw.waitForCatchup(ctx, waitTimeout); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to sync up replication between the source and target", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to migrate the workflow streams", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to reset the sequences", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to create the reverse vreplication streams", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } // Initialize any target sequences, if there are any, before allowing new writes. if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { ts.Logger().Infof("Initializing target sequences") // Writes are blocked so we can safely initialize the sequence tables but - // we also want to use a shorter timeout than the parent context. - // We use at most half of the overall timeout. - initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) + // we also want to use a shorter timeout than the the default. + initSeqCtx, cancel := context.WithTimeout(ctx, waitTimeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { sw.cancelMigration(ctx, sm) @@ -3464,6 +3563,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } if err := sw.createJournals(ctx, sourceWorkflows); err != nil { return handleError("failed to create the journal", err) } @@ -3645,7 +3747,7 @@ func (s *Server) applySQLShard(ctx context.Context, tabletInfo *topo.TabletInfo, if err != nil { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "fillStringTemplate failed: %v", err) } - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(ctx, DefaultTimeout) defer cancel() // Need to make sure that replication is enabled since we're only applying // the statement on primaries. diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index f2b9f6b2496..c67d45bb9e6 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -689,14 +689,14 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { DryRun: true, }, want: []string{ - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", sourceKeyspaceName, tablesStr, sourceKeyspaceName, position, sourceKeyspaceName, position), "Wait for vreplication on stopped streams to catchup for up to 30s", @@ -730,14 +730,14 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { DryRun: true, }, want: []string{ - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", targetKeyspaceName, tablesStr, targetKeyspaceName, position, targetKeyspaceName, position), "Wait for vreplication on stopped streams to catchup for up to 30s", diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index 3fbd1c507e2..789822b5be9 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -20,6 +20,8 @@ import ( "context" "time" + "vitess.io/vitess/go/vt/topo" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -126,8 +128,8 @@ func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { r.ts.cancelMigration(ctx, sm) } -func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) { - return r.s.ts.LockKeyspace(ctx, keyspace, action) +func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) { + return r.s.ts.LockKeyspace(ctx, keyspace, action, opts...) } func (r *switcher) freezeTargetVReplication(ctx context.Context) error { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index 55c843b07cb..29c40f85a69 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -27,6 +27,7 @@ import ( "golang.org/x/exp/maps" "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/vt/topo" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -304,7 +305,7 @@ func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrato dr.drLog.Log("Cancel migration as requested") } -func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), error) { +func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ ...topo.LockOption) (context.Context, func(*error), error) { dr.drLog.Logf("Lock keyspace %s", keyspace) return ctx, func(e *error) { dr.drLog.Logf("Unlock keyspace %s", keyspace) diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 9dea0af30a1..560b7a695fd 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -20,11 +20,13 @@ import ( "context" "time" + "vitess.io/vitess/go/vt/topo" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) type iswitcher interface { - lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) + lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) cancelMigration(ctx context.Context, sm *StreamMigrator) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 35930bc3e97..c9d9952ef8f 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1747,8 +1747,6 @@ func (ts *trafficSwitcher) IsMultiTenantMigration() bool { } func (ts *trafficSwitcher) mirrorTableTraffic(ctx context.Context, types []topodatapb.TabletType, percent float32) error { - log.Infof("mirrorTableTraffic") - mrs, err := topotools.GetMirrorRules(ctx, ts.TopoServer()) if err != nil { return err diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index a98a3ce90f9..f91a82b9d2c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -115,10 +115,11 @@ func (td *tableDiffer) initialize(ctx context.Context) error { defer dbClient.Close() targetKeyspace := td.wd.ct.vde.thisTablet.Keyspace - log.Infof("Locking target keyspace %s", targetKeyspace) - ctx, unlock, lockErr := td.wd.ct.ts.LockKeyspace(ctx, targetKeyspace, "vdiff") + lockName := fmt.Sprintf("%s/%s", targetKeyspace, td.wd.ct.workflow) + log.Infof("Locking workflow %s", lockName) + ctx, unlock, lockErr := td.wd.ct.ts.LockName(ctx, lockName, "vdiff") if lockErr != nil { - log.Errorf("LockKeyspace failed: %v", lockErr) + log.Errorf("Locking workfkow %s failed: %v", lockName, lockErr) return lockErr } @@ -126,7 +127,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { defer func() { unlock(&err) if err != nil { - log.Errorf("UnlockKeyspace %s failed: %v", targetKeyspace, err) + log.Errorf("Unlocking workflow %s failed: %v", lockName, err) } }()