From c193c694dfee53dfdee6440a506be17370d4e9b7 Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Wed, 4 Oct 2023 15:05:24 +0600 Subject: [PATCH] unlock and sync secondary Signed-off-by: sayedppqq --- pkg/backup.go | 189 +++++++--------------------------- pkg/lock.go | 274 ++++++++++++++++++++++++++++++++++++++++++++++++++ pkg/utils.go | 11 ++ 3 files changed, 321 insertions(+), 153 deletions(-) create mode 100644 pkg/lock.go diff --git a/pkg/backup.go b/pkg/backup.go index 470fb8aef..2c5663cd5 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -442,7 +442,7 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic if parameters.ConfigServer != "" { // sharded cluster. so disable the balancer first. then perform the 'usual' tasks. - primary, secondary, err := getPrimaryNSecondaryMember(parameters.ConfigServer) + primary, secondary, secondaryMembers, err := getPrimaryNSecondaryMember(parameters.ConfigServer) if err != nil { return nil, err } @@ -464,7 +464,19 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic backupHost = secondary } - err = lockConfigServer(parameters.ConfigServer, secondary) + // Check if secondary is already locked before locking it. + // If yes, unlock it and sync with primary + for _, secondary := range secondaryMembers { + if err := checkIfSecondaryLockedAndSync(secondary); err != nil { + return nil, err + } + } + + if err := setupConfigServer(parameters.ConfigServer, secondary); err != nil { + return nil, err + } + + err = lockSecondaryMember(secondary) cleanupFuncs = append(cleanupFuncs, func() error { // even if error occurs, try to unlock the server @@ -479,7 +491,7 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic for key, host := range parameters.ReplicaSets { // do the task - primary, secondary, err := getPrimaryNSecondaryMember(host) + primary, secondary, secondaryMembers, err := getPrimaryNSecondaryMember(host) if err != nil { klog.Errorf("error while getting primary and secondary member of %v. error: %v", host, err) return nil, err @@ -492,6 +504,14 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic backupHost = secondary } + // Check if secondary is already locked before locking it. + // If yes, unlock it and sync with primary + for _, secondary := range secondaryMembers { + if err := checkIfSecondaryLockedAndSync(secondary); err != nil { + return nil, err + } + } + err = lockSecondaryMember(secondary) cleanupFuncs = append(cleanupFuncs, func() error { // even if error occurs, try to unlock the server @@ -573,11 +593,10 @@ func getSSLUser(path string) (string, error) { return strings.TrimSpace(user), nil } -func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, err error) { +func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, secondaryMembers []string, err error) { klog.Infoln("finding primary and secondary instances of", mongoDSN) v := make(map[string]interface{}) - // stop balancer args := append([]interface{}{ "config", "--host", mongoDSN, @@ -586,33 +605,35 @@ func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, err }, mongoCreds...) // even --quiet doesn't skip replicaset PrimaryConnection log. so take tha last line. issue tracker: https://jira.mongodb.org/browse/SERVER-27159 if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&v); err != nil { - return "", "", err + return "", "", secondaryMembers, err } primary, ok := v["primary"].(string) if !ok || primary == "" { - return "", "", fmt.Errorf("unable to get primary instance using rs.isMaster(). got response: %v", v) + return "", "", secondaryMembers, fmt.Errorf("unable to get primary instance using rs.isMaster(). got response: %v", v) } hosts, ok := v["hosts"].([]interface{}) if !ok { - return "", "", fmt.Errorf("unable to get hosts using rs.isMaster(). got response: %v", v) + return "", "", secondaryMembers, fmt.Errorf("unable to get hosts using rs.isMaster(). got response: %v", v) } for _, host := range hosts { - secHost, ok := host.(string) - if !ok || secHost == "" { + curHost, ok := host.(string) + + if !ok || curHost == "" { err = fmt.Errorf("unable to get secondary instance using rs.isMaster(). got response: %v", v) continue } - - if secHost != primary { - klog.Infof("Primary %s & Secondary %s found for mongoDSN %s \n", primary, secHost, mongoDSN) - return primary, secHost, nil + if curHost != primary { + secondaryMembers = append(secondaryMembers, curHost) } } + if len(secondaryMembers) > 0 { + return primary, secondaryMembers[0], secondaryMembers, err + } - return primary, "", err + return primary, "", secondaryMembers, err } // run from mongos instance @@ -699,141 +720,3 @@ func enableBalancer(mongosHost string) error { klog.Info("Balancer successfully re-enabled.") return nil } - -func lockConfigServer(configSVRDSN, secondaryHost string) error { - klog.Infoln("Attempting to lock configserver", configSVRDSN) - - if secondaryHost == "" { - klog.Warningln("locking configserver is skipped. secondary host is empty") - return nil - } - v := make(map[string]interface{}) - // findAndModify BackupControlDocument. skip single quote inside single quote: https://stackoverflow.com/a/28786747/4628962 - args := append([]interface{}{ - "config", - "--host", configSVRDSN, - "--quiet", - "--eval", "db.BackupControl.findAndModify({query: { _id: 'BackupControlDocument' }, update: { $inc: { counter : 1 } }, new: true, upsert: true, writeConcern: { w: 'majority', wtimeout: 15000 }});", - }, mongoCreds...) - - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running findAndModify to lock configServer : %s ; output : %s \n", err.Error(), output) - return err - } - s := fmt.Sprintf(`/bin/echo '%s' | /usr/bin/tail -1`, strings.TrimSuffix(string(output), "\n")) - output, err = sh.Command("/bin/sh", "-c", s).Output() - if err != nil { - klog.Errorf("Error while running tail in findAndModify to lock configServer : %s ; output : %s \n", err.Error(), output) - return err - } - - err = json.Unmarshal(output, &v) - if err != nil { - klog.Errorf("Unmarshal error while running findAndModify to lock configServer : %s \n", err.Error()) - return err - } - val, ok := v["counter"].(float64) - if !ok || int(val) == 0 { - return fmt.Errorf("unable to modify BackupControlDocument. got response: %v", v) - } - val2 := float64(0) - timer := 0 // wait approximately 5 minutes. - for timer < 60 && (int(val2) == 0 || int(val) != int(val2)) { - timer++ - // find backupDocument from secondary configServer - args = append([]interface{}{ - "config", - "--host", secondaryHost, - "--quiet", - "--eval", "rs.slaveOk(); db.BackupControl.find({ '_id' : 'BackupControlDocument' }).readConcern('majority');", - }, mongoCreds...) - - if err := sh.Command(MongoCMD, args...).UnmarshalJSON(&v); err != nil { - return err - } - - val2, ok = v["counter"].(float64) - if !ok { - return fmt.Errorf("unable to get BackupControlDocument. got response: %v", v) - } - if int(val) != int(val2) { - klog.V(5).Infof("BackupDocument counter in secondary is not same. Expected %v, but got %v. Full response: %v", val, val2, v) - time.Sleep(time.Second * 5) - } - } - if timer >= 60 { - return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary to be same as primary. Expected %v, but got %v. Full response: %v", val, val2, v) - } - // lock secondary - return lockSecondaryMember(secondaryHost) -} - -func lockSecondaryMember(mongohost string) error { - klog.Infoln("Attempting to lock secondary member", mongohost) - if mongohost == "" { - klog.Warningln("locking secondary member is skipped. secondary host is empty") - return nil - } - v := make(map[string]interface{}) - - // lock file - args := append([]interface{}{ - "config", - "--host", mongohost, - "--quiet", - "--eval", "JSON.stringify(db.fsyncLock())", - }, mongoCreds...) - - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output) - return err - } - - err = json.Unmarshal(output, &v) - if err != nil { - klog.Errorf("Unmarshal error while running fsyncLock on secondary : %s \n", err.Error()) - return err - } - - if val, ok := v["ok"].(float64); !ok || int(val) != 1 { - return fmt.Errorf("unable to lock the secondary host. got response: %v", v) - } - klog.Infof("secondary %s locked.", mongohost) - return nil -} - -func unlockSecondaryMember(mongohost string) error { - klog.Infoln("Attempting to unlock secondary member", mongohost) - if mongohost == "" { - klog.Warningln("skipped unlocking secondary member. secondary host is empty") - return nil - } - v := make(map[string]interface{}) - - // unlock file - args := append([]interface{}{ - "config", - "--host", mongohost, - "--quiet", - "--eval", "JSON.stringify(db.fsyncUnlock())", - }, mongoCreds...) - - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running fsyncUnlock on secondary : %s ; output : %s \n", err.Error(), output) - return err - } - err = json.Unmarshal(output, &v) - if err != nil { - klog.Errorf("Unmarshal error while running fsyncUnlock on secondary : %s \n", err.Error()) - return err - } - - if val, ok := v["ok"].(float64); !ok || int(val) != 1 { - return fmt.Errorf("unable to lock the secondary host. got response: %v", v) - } - klog.Infof("secondary %s unlocked.", mongohost) - return nil -} diff --git a/pkg/lock.go b/pkg/lock.go new file mode 100644 index 000000000..1fd890c70 --- /dev/null +++ b/pkg/lock.go @@ -0,0 +1,274 @@ +/* +Copyright AppsCode Inc. and Contributors + +Licensed under the AppsCode Free Trial License 1.0.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/appscode/licenses/raw/1.0.0/AppsCode-Free-Trial-1.0.0.md + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pkg + +import ( + "encoding/json" + "fmt" + "time" + + "gomodules.xyz/go-sh" + "k8s.io/klog/v2" +) + +func setupConfigServer(configSVRDSN, secondaryHost string) error { + klog.Infof("Attempting to setup configserver %s\n", configSVRDSN) + + if secondaryHost == "" { + klog.Warningln("locking configserver is skipped. secondary host is empty") + return nil + } + v := make(map[string]interface{}) + // findAndModify BackupControlDocument. skip single quote inside single quote: https://stackoverflow.com/a/28786747/4628962 + args := append([]interface{}{ + "config", + "--host", configSVRDSN, + "--quiet", + "--eval", "db.BackupControl.findAndModify({query: { _id: 'BackupControlDocument' }, update: { $inc: { counter : 1 } }, new: true, upsert: true, writeConcern: { w: 'majority', wtimeout: 15000 }});", + }, mongoCreds...) + + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running findAndModify to setup configServer : %s ; output : %s \n", err.Error(), output) + return err + } + + err = json.Unmarshal(output, &v) + if err != nil { + klog.Errorf("Unmarshal error while running findAndModify to setup configServer : %s \n", err.Error()) + return err + } + val, ok := v["counter"].(float64) + if !ok || int(val) == 0 { + return fmt.Errorf("unable to modify BackupControlDocument. got response: %v", v) + } + val2 := float64(0) + timer := 0 // wait approximately 5 minutes. + for timer < 60 && (int(val2) == 0 || int(val) != int(val2)) { + timer++ + // find backupDocument from secondary configServer + args = append([]interface{}{ + "config", + "--host", secondaryHost, + "--quiet", + "--eval", "rs.secondaryOk(); db.BackupControl.find({ '_id' : 'BackupControlDocument' }).readConcern('majority');", + }, mongoCreds...) + + if err := sh.Command(MongoCMD, args...).UnmarshalJSON(&v); err != nil { + return err + } + + val2, ok = v["counter"].(float64) + if !ok { + return fmt.Errorf("unable to get BackupControlDocument. got response: %v", v) + } + if int(val) != int(val2) { + klog.V(5).Infof("BackupDocument counter in secondary %v is not same. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) + time.Sleep(time.Second * 5) + } + } + if timer >= 60 { + return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary %v to be same as primary. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) + } + + return nil +} + +func lockSecondaryMember(mongohost string) error { + klog.Infof("Attempting to lock secondary member %s\n", mongohost) + + if mongohost == "" { + klog.Warningln("locking secondary member is skipped. secondary host is empty") + return nil + } + + // lock file + v := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(db.fsyncLock())", + }, mongoCreds...) + + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + + err = json.Unmarshal(output, &v) + if err != nil { + klog.Errorf("Unmarshal error while running fsyncLock on secondary : %s \n", err.Error()) + return err + } + + if val, ok := v["ok"].(float64); !ok || int(val) != 1 { + return fmt.Errorf("unable to lock the secondary host. got response: %v", v) + } + klog.Infof("secondary %s locked.\n", mongohost) + return nil +} + +func checkIfSecondaryLockedAndSync(mongohost string) error { + klog.Infof("Checking if secondary %s is already locked\n", mongohost) + + x := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(db.currentOp())", + }, mongoCreds...) + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running currentOp on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + err = json.Unmarshal(output, &x) + if err != nil { + klog.Errorf("Unmarshal error while running currentOp on secondary : %s \n", err.Error()) + return err + } + + val, ok := x["fsyncLock"].(bool) + if ok && val { + klog.Infoln("Found fsyncLock true while locking") + err := unlockSecondaryMember(mongohost) + if err != nil { + return err + } + if err := waitForSecondarySync(mongohost); err != nil { + return err + } + } + return nil +} + +func waitForSecondarySync(mongohost string) error { + klog.Infof("Attempting to sync secondary %s with primary\n", mongohost) + + for { + status := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(rs.status())", + }, mongoCreds...) + + if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&status); err != nil { + klog.Errorf("Error while running status on secondary : %s ; output : %s \n", mongohost, err.Error()) + return err + } + + members, ok := status["members"].([]interface{}) + if !ok { + return fmt.Errorf("unable to get members using rs.status(). got response: %v", status) + } + + var masterOptimeDate, curOptimeDate time.Time + + for _, member := range members { + + memberInfo, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get member info of primary using rs.status(). got response: %v", member) + } + + if memberInfo["stateStr"] == "PRIMARY" { + optimedate, ok := memberInfo["optimeDate"].(string) + if !ok { + return fmt.Errorf("unable to get optimedate of primary using rs.status(). got response: %v", memberInfo) + } + + convTime, err := getTime(optimedate) + if err != nil { + return err + } + masterOptimeDate = convTime + break + } + } + synced := true + for _, member := range members { + + memberInfo, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get member info of secondary using rs.status(). got response: %v", member) + } + + if memberInfo["stateStr"] == "SECONDARY" && memberInfo["name"] == mongohost { + + optimedate, ok := memberInfo["optimeDate"].(string) + if !ok { + return fmt.Errorf("unable to get optimedate of secondary using rs.status(). got response: %v", memberInfo) + } + + convTime, err := getTime(optimedate) + if err != nil { + return err + } + curOptimeDate = convTime + if curOptimeDate.Before(masterOptimeDate) { + synced = false + } + break + } + } + if synced { + klog.Infoln("database successfully synced") + break + } + + klog.Infoln("Waiting... database is not synced yet") + time.Sleep(5 * time.Second) + } + return nil +} + +func unlockSecondaryMember(mongohost string) error { + klog.Infof("Attempting to unlock secondary member %s\n", mongohost) + if mongohost == "" { + klog.Warningln("skipped unlocking secondary member. secondary host is empty") + return nil + } + v := make(map[string]interface{}) + + // unlock file + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(db.fsyncUnlock())", + }, mongoCreds...) + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running fsyncUnlock on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + err = json.Unmarshal(output, &v) + if err != nil { + klog.Errorf("Unmarshal error while running fsyncUnlock on secondary : %s \n", err.Error()) + return err + } + if val, ok := v["ok"].(float64); !ok || int(val) != 1 { + return fmt.Errorf("unable to lock the secondary host. got response: %v", v) + } + klog.Infof("secondary %s unlocked\n", mongohost) + return nil +} diff --git a/pkg/utils.go b/pkg/utils.go index 347b81234..9c4831f50 100644 --- a/pkg/utils.go +++ b/pkg/utils.go @@ -103,3 +103,14 @@ func containsString(a []string, e string) bool { } return false } + +func getTime(t string) (time.Time, error) { + // Define the layout or format of the input string + layout := "2006-01-02T15:04:05Z" + + parsedTime, err := time.Parse(layout, t) + if err != nil { + return time.Time{}, err + } + return parsedTime, nil +}