From 3bad1f52a9771ae180b4d90c346fc5387093a402 Mon Sep 17 00:00:00 2001 From: Masayuki Ishii Date: Wed, 1 Nov 2023 18:58:50 +0900 Subject: [PATCH] Kill existing connections when changing roles (#587) Signed-off-by: Masayuki Ishii --- Makefile | 2 +- clustering/manager_test.go | 28 ++++++- clustering/operations.go | 151 ++++++++++++++++++++++++++----------- clustering/status.go | 12 ++- go.mod | 2 + go.sum | 8 +- 6 files changed, 149 insertions(+), 54 deletions(-) diff --git a/Makefile b/Makefile index 6c9cec43d..9c16b2403 100644 --- a/Makefile +++ b/Makefile @@ -94,7 +94,7 @@ check-generate: envtest: setup-envtest source <($(SETUP_ENVTEST) use -p env); \ export MOCO_CHECK_INTERVAL=100ms; \ - export MOCO_WAIT_INTERVAL=100ms; \ + export MOCO_CLONE_WAIT_DURATION=100ms; \ go test -v -count 1 -race ./clustering -ginkgo.progress -ginkgo.v -ginkgo.failFast source <($(SETUP_ENVTEST) use -p env); \ export DEBUG_CONTROLLER=1; \ diff --git a/clustering/manager_test.go b/clustering/manager_test.go index 6dced0c2e..57c12f8f2 100644 --- a/clustering/manager_test.go +++ b/clustering/manager_test.go @@ -540,9 +540,12 @@ var _ = Describe("manager", func() { } } + // confirm that connections of the mysql whose role has changed are killed for i := 0; i < 3; i++ { switch i { - case 0: // connection in demoted instance should be killed + case 0: // KilleConnection is called twice: when the start of the switchover and the changing of role. + Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(2)) + case newPrimary: Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(1)) default: Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0)) @@ -705,7 +708,12 @@ var _ = Describe("manager", func() { Expect(failOverEvents).To(Equal(1)) for i := 0; i < 3; i++ { - Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0)) + switch i { + case 1: + Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(1)) + default: + Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0)) + } } By("recovering failed instance") @@ -796,6 +804,8 @@ var _ = Describe("manager", func() { }).Should(Succeed()) By("making an errant replica") + of.resetKillConnectionsCount() + // When the primary load is high, sometimes the gtid_executed of a replica precedes the primary. // pod(4) is intended for such situations. testSetGTID(cluster.PodHostname(0), "p0:1,p0:2,p0:3") // primary @@ -846,6 +856,10 @@ var _ = Describe("manager", func() { Expect(st1.ReplicaStatus.SlaveIORunning).NotTo(Equal("Yes")) } + for i := 0; i < 5; i++ { + Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0)) + } + By("triggering a failover") of.setRetrievedGTIDSet(cluster.PodHostname(2), "p0:1") of.setRetrievedGTIDSet(cluster.PodHostname(3), "p0:1,p0:2,p0:3") @@ -909,6 +923,7 @@ var _ = Describe("manager", func() { Expect(failOverEvents).To(Equal(1)) By("re-initializing the errant replica") + of.resetKillConnectionsCount() testSetGTID(cluster.PodHostname(1), "") Eventually(func() interface{} { return ms.errantReplicas @@ -937,6 +952,15 @@ var _ = Describe("manager", func() { } }).Should(Succeed()) + for i := 0; i < 5; i++ { + switch i { + case 1: + Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(1)) + default: + Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0)) + } + } + By("stopping instances to make the cluster lost") of.setFailing(cluster.PodHostname(3), true) of.setFailing(cluster.PodHostname(1), true) diff --git a/clustering/operations.go b/clustering/operations.go index db566787c..dcfaa1f93 100644 --- a/clustering/operations.go +++ b/clustering/operations.go @@ -23,10 +23,25 @@ const ( failOverTimeoutSeconds = 3600 ) -var waitForRestartDuration = 3 * time.Second +var ( + waitForCloneRestartDuration = 3 * time.Second + waitForRoleChangeDuration = 300 * time.Millisecond +) + +func init() { + intervalStr := os.Getenv("MOCO_CLONE_WAIT_DURATION") + if intervalStr == "" { + return + } + interval, err := time.ParseDuration(intervalStr) + if err != nil { + return + } + waitForCloneRestartDuration = interval +} func init() { - intervalStr := os.Getenv("MOCO_WAIT_INTERVAL") + intervalStr := os.Getenv("MOCO_ROLE_WAIT_DURATION") if intervalStr == "" { return } @@ -34,7 +49,7 @@ func init() { if err != nil { return } - waitForRestartDuration = interval + waitForRoleChangeDuration = interval } func (p *managerProcess) isCloning(ctx context.Context, ss *StatusSet) bool { @@ -115,7 +130,7 @@ func (p *managerProcess) clone(ctx context.Context, ss *StatusSet) (bool, error) // wait until the instance restarts after clone op := ss.DBOps[ss.Primary] - time.Sleep(waitForRestartDuration) + time.Sleep(waitForCloneRestartDuration) for i := 0; i < 60; i++ { select { case <-time.After(1 * time.Second): @@ -249,9 +264,90 @@ func (p *managerProcess) failover(ctx context.Context, ss *StatusSet) error { return nil } +func (p *managerProcess) removeRoleLabel(ctx context.Context, ss *StatusSet) ([]int, error) { + var noRoles []int + for i, pod := range ss.Pods { + v := pod.Labels[constants.LabelMocoRole] + if v == "" { + noRoles = append(noRoles, i) + continue + } + + if i == ss.Primary && v == constants.RolePrimary { + continue + } + if i != ss.Primary && !isErrantReplica(ss, i) && v == constants.RoleReplica { + continue + } + + noRoles = append(noRoles, i) + modified := pod.DeepCopy() + delete(modified.Labels, constants.LabelMocoRole) + if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil { + return nil, fmt.Errorf("failed to remove %s label from %s/%s: %w", constants.LabelMocoRole, pod.Namespace, pod.Name, err) + } + } + return noRoles, nil +} + +func (p *managerProcess) addRoleLabel(ctx context.Context, ss *StatusSet, noRoles []int) error { + for _, i := range noRoles { + if isErrantReplica(ss, i) { + continue + } + + var newValue string + if i == ss.Primary { + newValue = constants.RolePrimary + } else { + newValue = constants.RoleReplica + } + + pod := ss.Pods[i] + modified := pod.DeepCopy() + if modified.Labels == nil { + modified.Labels = make(map[string]string) + } + modified.Labels[constants.LabelMocoRole] = newValue + if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil { + return fmt.Errorf("failed to add %s label to pod %s/%s: %w", constants.LabelMocoRole, pod.Namespace, pod.Name, err) + } + } + return nil +} + func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, error) { redo := false + // remove old role label from mysql pods whose role is changed + // NOTE: + // I want to redo if even one pod is updated to refresh pod resources in StatusSet. + // But if some mysql instances are down, there is a wait of about 9 seconds at "(*managerProdess).GatherStatus()" after redo. + // The wait slows the recovery process, and downtime becomes longer. To prevent that, continue processing without redoing. + noRoles, err := p.removeRoleLabel(ctx, ss) + if err != nil { + return false, err + } + + // if the role of alive instances is changed, kill the connections on those instances + var alive []int + for _, i := range noRoles { + if ss.MySQLStatus[i] == nil || isErrantReplica(ss, i) { + continue + } + alive = append(alive, i) + } + if len(alive) > 0 { + // I hope the backend pods of primary and replica services will be updated during this sleep. + time.Sleep(waitForRoleChangeDuration) + } + for _, i := range alive { + if err := ss.DBOps[i].KillConnections(ctx); err != nil { + return false, fmt.Errorf("failed to kill connections in instance %d: %w", i, err) + } + } + + // configure primary instance if ss.Cluster.Spec.ReplicationSourceSecretName != nil { r, err := p.configureIntermediatePrimary(ctx, ss) if err != nil { @@ -266,6 +362,7 @@ func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, er redo = redo || r } + // configure replica instances for i, ist := range ss.MySQLStatus { if i == ss.Primary { continue @@ -280,46 +377,10 @@ func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, er redo = redo || r } - // update labels - for i, pod := range ss.Pods { - if i == ss.Primary { - if pod.Labels[constants.LabelMocoRole] != constants.RolePrimary { - redo = true - modified := pod.DeepCopy() - if modified.Labels == nil { - modified.Labels = make(map[string]string) - } - modified.Labels[constants.LabelMocoRole] = constants.RolePrimary - if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil { - return false, fmt.Errorf("failed to set role for pod %s/%s: %w", pod.Namespace, pod.Name, err) - } - } - continue - } - - if ss.MySQLStatus[i] != nil && ss.MySQLStatus[i].IsErrant { - if _, ok := pod.Labels[constants.LabelMocoRole]; ok { - redo = true - modified := pod.DeepCopy() - delete(modified.Labels, constants.LabelMocoRole) - if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil { - return false, fmt.Errorf("failed to set role for pod %s/%s: %w", pod.Namespace, pod.Name, err) - } - } - continue - } - - if pod.Labels[constants.LabelMocoRole] != constants.RoleReplica { - redo = true - modified := pod.DeepCopy() - if modified.Labels == nil { - modified.Labels = make(map[string]string) - } - modified.Labels[constants.LabelMocoRole] = constants.RoleReplica - if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil { - return false, fmt.Errorf("failed to set role for pod %s/%s: %w", pod.Namespace, pod.Name, err) - } - } + // add new role label + err = p.addRoleLabel(ctx, ss, noRoles) + if err != nil { + return false, err } // make the primary writable if it is not an intermediate primary @@ -484,7 +545,7 @@ func (p *managerProcess) configureReplica(ctx context.Context, ss *StatusSet, in log.Info("clone succeeded", "instance", index) // wait until the instance restarts after clone - time.Sleep(waitForRestartDuration) + time.Sleep(waitForCloneRestartDuration) for i := 0; i < 60; i++ { select { case <-time.After(1 * time.Second): diff --git a/clustering/status.go b/clustering/status.go index db8dc3839..15322753a 100644 --- a/clustering/status.go +++ b/clustering/status.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "slices" "sort" "strconv" "strings" @@ -208,10 +209,10 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) { } // process errors if j == statusCheckRetryMax { - logFromContext(ctx).Error(err, "failed to get mysqld status, mysqld is not ready") + logFromContext(ctx).Error(err, "failed to get mysqld status, mysqld is not ready", "instance", index) return } - logFromContext(ctx).Error(err, "failed to get mysqld status, will retry") + logFromContext(ctx).Error(err, "failed to get mysqld status, will retry", "instance", index) time.Sleep(statusCheckRetryInterval) } }(i) @@ -299,6 +300,13 @@ func containErrantTransactions(primaryUUID, gtidSet string) bool { return false } +func isErrantReplica(ss *StatusSet, index int) bool { + if ss.MySQLStatus[index] != nil && ss.MySQLStatus[index].IsErrant { + return true + } + return slices.Contains(ss.Errants, index) +} + func isPodReady(pod *corev1.Pod) bool { for _, cond := range pod.Status.Conditions { if cond.Type != corev1.PodReady { diff --git a/go.mod b/go.mod index 84a322a00..0ffce8167 100644 --- a/go.mod +++ b/go.mod @@ -114,10 +114,12 @@ require ( golang.org/x/crypto v0.14.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect + golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/term v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect + golang.org/x/tools v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 563c8097e..65fdc0474 100644 --- a/go.sum +++ b/go.sum @@ -368,8 +368,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= -golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -417,8 +417,8 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo= -golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= +golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= +golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=