From 87fda05901e593a1d5abfd85f479b5ddcaa6d44d Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Tue, 26 Sep 2023 11:15:55 +0600 Subject: [PATCH 1/5] secondary sync --- pkg/backup.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 113 insertions(+), 3 deletions(-) diff --git a/pkg/backup.go b/pkg/backup.go index a0cdb1023..e0ff4a5a3 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -769,17 +769,50 @@ func lockSecondaryMember(mongohost string) error { klog.Warningln("locking secondary member is skipped. secondary host is empty") return nil } - v := make(map[string]interface{}) - // lock file + // Check if it is already locked + x := make(map[string]interface{}) args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(db.currentOp())", + }, mongoCreds...) + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running currentOp on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + err = json.Unmarshal(output, &x) + if err != nil { + klog.Errorf("Unmarshal error while running currentOp on secondary : %s \n", err.Error()) + return err + } + + val, ok := x["fsyncLock"].(bool) + if ok && bool(val) == true { + // Already locked + err := unlockSecondaryMember(mongohost) + if err != nil { + return err + } + if err := waitForSecondarySync(mongohost); err != nil { + return err + } + } else if !ok { + return fmt.Errorf("unable to get fsyncLock of secondary using db.currentOp(). got response: %v", x) + } + + // lock file + v := make(map[string]interface{}) + args = append([]interface{}{ "config", "--host", mongohost, "--quiet", "--eval", "JSON.stringify(db.fsyncLock())", }, mongoCreds...) - output, err := sh.Command(MongoCMD, args...).Output() + output, err = sh.Command(MongoCMD, args...).Output() if err != nil { klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output) return err @@ -798,6 +831,83 @@ func lockSecondaryMember(mongohost string) error { return nil } +func waitForSecondarySync(mongohost string) error { + status := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(rs.status())", + }, mongoCreds...) + + if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&status); err != nil { + klog.Errorf("Error while running status on secondary : %s ; output : %s \n", mongohost, err.Error()) + return err + } + + members, ok := status["members"].([]map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get members using rs.status(). got response: %v", status) + } + + var masterOptime, masterOptimeDate, curOptime, curOptimeDate time.Time + + for _, member := range members { + if member["stateStr"] == "PRIMARY" { + optime, ok := member["optime"].(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get optime of primary using rs.status(). got response: %v", member) + } + optimedate, ok := member["optimeDate"] + if !ok { + return fmt.Errorf("unable to get optimedate of primary using rs.status(). got response: %v", member) + } + + masterOptime, ok = optime["ts"].(time.Time) + if !ok { + return fmt.Errorf("unable to get timestamp of primary using rs.status(). got response: %v", optime) + } + masterOptimeDate = optimedate.(time.Time) + break + } + } + + for { + synced := false + for _, member := range members { + if member["stateStr"] == "SECONDARY" && member["name"] == mongohost { + + optime, ok := member["optime"].(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get optime of secondary using rs.status(). got response: %v", member) + } + optimedate, ok := member["optimeDate"] + if !ok { + return fmt.Errorf("unable to get optimedate of secondary using rs.status(). got response: %v", member) + } + + curOptime, ok = optime["ts"].(time.Time) + if !ok { + return fmt.Errorf("unable to get timestamp of primary using rs.status(). got response: %v", optime) + } + curOptimeDate = optimedate.(time.Time) + + if curOptime == masterOptime && curOptimeDate == masterOptimeDate { + synced = true + } + break + } + } + if synced { + break + } + + klog.Infoln("Waiting... database is not synced yet") + time.Sleep(5 * time.Second) + } + return nil +} + func unlockSecondaryMember(mongohost string) error { klog.Infoln("Attempting to unlock secondary member", mongohost) if mongohost == "" { From 49f11b5ad59f6449741175d739ce34efa5d62538 Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Tue, 26 Sep 2023 19:00:11 +0600 Subject: [PATCH 2/5] Sync secondary properly before lock Signed-off-by: sayedppqq --- pkg/backup.go | 190 ++++++++++++++++++++++++++++---------------------- pkg/utils.go | 11 +++ 2 files changed, 117 insertions(+), 84 deletions(-) diff --git a/pkg/backup.go b/pkg/backup.go index e0ff4a5a3..4c55d7ea1 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -464,8 +464,17 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic backupHost = secondary } - err = lockConfigServer(parameters.ConfigServer, secondary) + // Check if secondary is already locked before locking it. + // If yes, unlock it and sync with primary + if err := checkIfSecondaryLockedAndSync(secondary); err != nil { + return nil, err + } + if err := setupConfigServer(parameters.ConfigServer, secondary); err != nil { + return nil, err + } + + err = lockSecondaryMember(secondary) cleanupFuncs = append(cleanupFuncs, func() error { // even if error occurs, try to unlock the server return unlockSecondaryMember(secondary) @@ -492,6 +501,12 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic backupHost = secondary } + // Check if secondary is already locked before locking it. + // If yes, unlock it and sync with primary + if err := checkIfSecondaryLockedAndSync(secondary); err != nil { + return nil, err + } + err = lockSecondaryMember(secondary) cleanupFuncs = append(cleanupFuncs, func() error { // even if error occurs, try to unlock the server @@ -700,8 +715,8 @@ func enableBalancer(mongosHost string) error { return nil } -func lockConfigServer(configSVRDSN, secondaryHost string) error { - klog.Infoln("Attempting to lock configserver", configSVRDSN) +func setupConfigServer(configSVRDSN, secondaryHost string) error { + klog.Infoln("Attempting to setup configserver", configSVRDSN) if secondaryHost == "" { klog.Warningln("locking configserver is skipped. secondary host is empty") @@ -718,13 +733,13 @@ func lockConfigServer(configSVRDSN, secondaryHost string) error { output, err := sh.Command(MongoCMD, args...).Output() if err != nil { - klog.Errorf("Error while running findAndModify to lock configServer : %s ; output : %s \n", err.Error(), output) + klog.Errorf("Error while running findAndModify to setup configServer : %s ; output : %s \n", err.Error(), output) return err } err = json.Unmarshal(output, &v) if err != nil { - klog.Errorf("Unmarshal error while running findAndModify to lock configServer : %s \n", err.Error()) + klog.Errorf("Unmarshal error while running findAndModify to setup configServer : %s \n", err.Error()) return err } val, ok := v["counter"].(float64) @@ -752,67 +767,35 @@ func lockConfigServer(configSVRDSN, secondaryHost string) error { return fmt.Errorf("unable to get BackupControlDocument. got response: %v", v) } if int(val) != int(val2) { - klog.V(5).Infof("BackupDocument counter in secondary is not same. Expected %v, but got %v. Full response: %v", val, val2, v) + klog.V(5).Infof("BackupDocument counter in secondary %v is not same. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) time.Sleep(time.Second * 5) } } if timer >= 60 { - return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary to be same as primary. Expected %v, but got %v. Full response: %v", val, val2, v) + return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary %v to be same as primary. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) } - // lock secondary - return lockSecondaryMember(secondaryHost) + + return nil } func lockSecondaryMember(mongohost string) error { klog.Infoln("Attempting to lock secondary member", mongohost) + if mongohost == "" { klog.Warningln("locking secondary member is skipped. secondary host is empty") return nil } - // Check if it is already locked - x := make(map[string]interface{}) - args := append([]interface{}{ - "config", - "--host", mongohost, - "--quiet", - "--eval", "JSON.stringify(db.currentOp())", - }, mongoCreds...) - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running currentOp on secondary : %s ; output : %s \n", err.Error(), output) - return err - } - err = json.Unmarshal(output, &x) - if err != nil { - klog.Errorf("Unmarshal error while running currentOp on secondary : %s \n", err.Error()) - return err - } - - val, ok := x["fsyncLock"].(bool) - if ok && bool(val) == true { - // Already locked - err := unlockSecondaryMember(mongohost) - if err != nil { - return err - } - if err := waitForSecondarySync(mongohost); err != nil { - return err - } - } else if !ok { - return fmt.Errorf("unable to get fsyncLock of secondary using db.currentOp(). got response: %v", x) - } - // lock file v := make(map[string]interface{}) - args = append([]interface{}{ + args := append([]interface{}{ "config", "--host", mongohost, "--quiet", "--eval", "JSON.stringify(db.fsyncLock())", }, mongoCreds...) - output, err = sh.Command(MongoCMD, args...).Output() + output, err := sh.Command(MongoCMD, args...).Output() if err != nil { klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output) return err @@ -831,69 +814,108 @@ func lockSecondaryMember(mongohost string) error { return nil } -func waitForSecondarySync(mongohost string) error { - status := make(map[string]interface{}) +func checkIfSecondaryLockedAndSync(mongohost string) error { + klog.Infoln("Checking if secondary %s is already locked.", mongohost) + + x := make(map[string]interface{}) args := append([]interface{}{ "config", "--host", mongohost, "--quiet", - "--eval", "JSON.stringify(rs.status())", + "--eval", "JSON.stringify(db.currentOp())", }, mongoCreds...) - - if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&status); err != nil { - klog.Errorf("Error while running status on secondary : %s ; output : %s \n", mongohost, err.Error()) + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running currentOp on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + err = json.Unmarshal(output, &x) + if err != nil { + klog.Errorf("Unmarshal error while running currentOp on secondary : %s \n", err.Error()) return err } - members, ok := status["members"].([]map[string]interface{}) - if !ok { - return fmt.Errorf("unable to get members using rs.status(). got response: %v", status) + val, ok := x["fsyncLock"].(bool) + if ok && val { + klog.Infoln("Found fsyncLock true while locking") + err := unlockSecondaryMember(mongohost) + if err != nil { + return err + } + if err := waitForSecondarySync(mongohost); err != nil { + return err + } } + return nil +} - var masterOptime, masterOptimeDate, curOptime, curOptimeDate time.Time +func waitForSecondarySync(mongohost string) error { + klog.Infoln("Attempting to sync secondary with primary") - for _, member := range members { - if member["stateStr"] == "PRIMARY" { - optime, ok := member["optime"].(map[string]interface{}) - if !ok { - return fmt.Errorf("unable to get optime of primary using rs.status(). got response: %v", member) - } - optimedate, ok := member["optimeDate"] - if !ok { - return fmt.Errorf("unable to get optimedate of primary using rs.status(). got response: %v", member) - } + for { + status := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(rs.status())", + }, mongoCreds...) - masterOptime, ok = optime["ts"].(time.Time) - if !ok { - return fmt.Errorf("unable to get timestamp of primary using rs.status(). got response: %v", optime) - } - masterOptimeDate = optimedate.(time.Time) - break + if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&status); err != nil { + klog.Errorf("Error while running status on secondary : %s ; output : %s \n", mongohost, err.Error()) + return err } - } - for { - synced := false + members, ok := status["members"].([]interface{}) + if !ok { + return fmt.Errorf("unable to get members using rs.status(). got response: %v", status) + } + + var masterOptimeDate, curOptimeDate time.Time + for _, member := range members { - if member["stateStr"] == "SECONDARY" && member["name"] == mongohost { - optime, ok := member["optime"].(map[string]interface{}) + memberInfo, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get member info of primary using rs.status(). got response: %v", member) + } + + if memberInfo["stateStr"] == "PRIMARY" { + optimedate, ok := memberInfo["optimeDate"].(string) if !ok { - return fmt.Errorf("unable to get optime of secondary using rs.status(). got response: %v", member) + return fmt.Errorf("unable to get optimedate of primary using rs.status(). got response: %v", memberInfo) } - optimedate, ok := member["optimeDate"] - if !ok { - return fmt.Errorf("unable to get optimedate of secondary using rs.status(). got response: %v", member) + + convTime, err := getTime(optimedate) + if err != nil { + return err } + masterOptimeDate = convTime + break + } + } + synced := true + for _, member := range members { - curOptime, ok = optime["ts"].(time.Time) + memberInfo, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get member info of secondary using rs.status(). got response: %v", member) + } + + if memberInfo["stateStr"] == "SECONDARY" && memberInfo["name"] == mongohost { + + optimedate, ok := memberInfo["optimeDate"].(string) if !ok { - return fmt.Errorf("unable to get timestamp of primary using rs.status(). got response: %v", optime) + return fmt.Errorf("unable to get optimedate of secondary using rs.status(). got response: %v", memberInfo) } - curOptimeDate = optimedate.(time.Time) - if curOptime == masterOptime && curOptimeDate == masterOptimeDate { - synced = true + convTime, err := getTime(optimedate) + if err != nil { + return err + } + curOptimeDate = convTime + if curOptimeDate.Before(masterOptimeDate) { + synced = false } break } diff --git a/pkg/utils.go b/pkg/utils.go index c28de5cf1..e9ead367d 100644 --- a/pkg/utils.go +++ b/pkg/utils.go @@ -94,3 +94,14 @@ func containsArg(args []string, checklist sets.String) bool { } return false } + +func getTime(t string) (time.Time, error) { + // Define the layout or format of the input string + layout := "2006-01-02T15:04:05Z" + + parsedTime, err := time.Parse(layout, t) + if err != nil { + return time.Time{}, err + } + return parsedTime, nil +} From 25fa86616043fbec2dbfcb88d47c7594678fa672 Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Wed, 27 Sep 2023 12:04:58 +0600 Subject: [PATCH 3/5] move lock related fucn to separate file Signed-off-by: sayedppqq --- pkg/backup.go | 248 --------------------------------------------- pkg/lock.go | 273 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 273 insertions(+), 248 deletions(-) create mode 100644 pkg/lock.go diff --git a/pkg/backup.go b/pkg/backup.go index 4c55d7ea1..5015efd01 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -714,251 +714,3 @@ func enableBalancer(mongosHost string) error { klog.Info("Balancer successfully re-enabled.") return nil } - -func setupConfigServer(configSVRDSN, secondaryHost string) error { - klog.Infoln("Attempting to setup configserver", configSVRDSN) - - if secondaryHost == "" { - klog.Warningln("locking configserver is skipped. secondary host is empty") - return nil - } - v := make(map[string]interface{}) - // findAndModify BackupControlDocument. skip single quote inside single quote: https://stackoverflow.com/a/28786747/4628962 - args := append([]interface{}{ - "config", - "--host", configSVRDSN, - "--quiet", - "--eval", "db.BackupControl.findAndModify({query: { _id: 'BackupControlDocument' }, update: { $inc: { counter : 1 } }, new: true, upsert: true, writeConcern: { w: 'majority', wtimeout: 15000 }});", - }, mongoCreds...) - - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running findAndModify to setup configServer : %s ; output : %s \n", err.Error(), output) - return err - } - - err = json.Unmarshal(output, &v) - if err != nil { - klog.Errorf("Unmarshal error while running findAndModify to setup configServer : %s \n", err.Error()) - return err - } - val, ok := v["counter"].(float64) - if !ok || int(val) == 0 { - return fmt.Errorf("unable to modify BackupControlDocument. got response: %v", v) - } - val2 := float64(0) - timer := 0 // wait approximately 5 minutes. - for timer < 60 && (int(val2) == 0 || int(val) != int(val2)) { - timer++ - // find backupDocument from secondary configServer - args = append([]interface{}{ - "config", - "--host", secondaryHost, - "--quiet", - "--eval", "rs.secondaryOk(); db.BackupControl.find({ '_id' : 'BackupControlDocument' }).readConcern('majority');", - }, mongoCreds...) - - if err := sh.Command(MongoCMD, args...).UnmarshalJSON(&v); err != nil { - return err - } - - val2, ok = v["counter"].(float64) - if !ok { - return fmt.Errorf("unable to get BackupControlDocument. got response: %v", v) - } - if int(val) != int(val2) { - klog.V(5).Infof("BackupDocument counter in secondary %v is not same. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) - time.Sleep(time.Second * 5) - } - } - if timer >= 60 { - return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary %v to be same as primary. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) - } - - return nil -} - -func lockSecondaryMember(mongohost string) error { - klog.Infoln("Attempting to lock secondary member", mongohost) - - if mongohost == "" { - klog.Warningln("locking secondary member is skipped. secondary host is empty") - return nil - } - - // lock file - v := make(map[string]interface{}) - args := append([]interface{}{ - "config", - "--host", mongohost, - "--quiet", - "--eval", "JSON.stringify(db.fsyncLock())", - }, mongoCreds...) - - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output) - return err - } - - err = json.Unmarshal(output, &v) - if err != nil { - klog.Errorf("Unmarshal error while running fsyncLock on secondary : %s \n", err.Error()) - return err - } - - if val, ok := v["ok"].(float64); !ok || int(val) != 1 { - return fmt.Errorf("unable to lock the secondary host. got response: %v", v) - } - klog.Infof("secondary %s locked.", mongohost) - return nil -} - -func checkIfSecondaryLockedAndSync(mongohost string) error { - klog.Infoln("Checking if secondary %s is already locked.", mongohost) - - x := make(map[string]interface{}) - args := append([]interface{}{ - "config", - "--host", mongohost, - "--quiet", - "--eval", "JSON.stringify(db.currentOp())", - }, mongoCreds...) - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running currentOp on secondary : %s ; output : %s \n", err.Error(), output) - return err - } - err = json.Unmarshal(output, &x) - if err != nil { - klog.Errorf("Unmarshal error while running currentOp on secondary : %s \n", err.Error()) - return err - } - - val, ok := x["fsyncLock"].(bool) - if ok && val { - klog.Infoln("Found fsyncLock true while locking") - err := unlockSecondaryMember(mongohost) - if err != nil { - return err - } - if err := waitForSecondarySync(mongohost); err != nil { - return err - } - } - return nil -} - -func waitForSecondarySync(mongohost string) error { - klog.Infoln("Attempting to sync secondary with primary") - - for { - status := make(map[string]interface{}) - args := append([]interface{}{ - "config", - "--host", mongohost, - "--quiet", - "--eval", "JSON.stringify(rs.status())", - }, mongoCreds...) - - if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&status); err != nil { - klog.Errorf("Error while running status on secondary : %s ; output : %s \n", mongohost, err.Error()) - return err - } - - members, ok := status["members"].([]interface{}) - if !ok { - return fmt.Errorf("unable to get members using rs.status(). got response: %v", status) - } - - var masterOptimeDate, curOptimeDate time.Time - - for _, member := range members { - - memberInfo, ok := member.(map[string]interface{}) - if !ok { - return fmt.Errorf("unable to get member info of primary using rs.status(). got response: %v", member) - } - - if memberInfo["stateStr"] == "PRIMARY" { - optimedate, ok := memberInfo["optimeDate"].(string) - if !ok { - return fmt.Errorf("unable to get optimedate of primary using rs.status(). got response: %v", memberInfo) - } - - convTime, err := getTime(optimedate) - if err != nil { - return err - } - masterOptimeDate = convTime - break - } - } - synced := true - for _, member := range members { - - memberInfo, ok := member.(map[string]interface{}) - if !ok { - return fmt.Errorf("unable to get member info of secondary using rs.status(). got response: %v", member) - } - - if memberInfo["stateStr"] == "SECONDARY" && memberInfo["name"] == mongohost { - - optimedate, ok := memberInfo["optimeDate"].(string) - if !ok { - return fmt.Errorf("unable to get optimedate of secondary using rs.status(). got response: %v", memberInfo) - } - - convTime, err := getTime(optimedate) - if err != nil { - return err - } - curOptimeDate = convTime - if curOptimeDate.Before(masterOptimeDate) { - synced = false - } - break - } - } - if synced { - break - } - - klog.Infoln("Waiting... database is not synced yet") - time.Sleep(5 * time.Second) - } - return nil -} - -func unlockSecondaryMember(mongohost string) error { - klog.Infoln("Attempting to unlock secondary member", mongohost) - if mongohost == "" { - klog.Warningln("skipped unlocking secondary member. secondary host is empty") - return nil - } - v := make(map[string]interface{}) - - // unlock file - args := append([]interface{}{ - "config", - "--host", mongohost, - "--quiet", - "--eval", "JSON.stringify(db.fsyncUnlock())", - }, mongoCreds...) - - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running fsyncUnlock on secondary : %s ; output : %s \n", err.Error(), output) - return err - } - err = json.Unmarshal(output, &v) - if err != nil { - klog.Errorf("Unmarshal error while running fsyncUnlock on secondary : %s \n", err.Error()) - return err - } - if val, ok := v["ok"].(float64); !ok || int(val) != 1 { - return fmt.Errorf("unable to lock the secondary host. got response: %v", v) - } - klog.Infof("secondary %s unlocked.", mongohost) - return nil -} diff --git a/pkg/lock.go b/pkg/lock.go new file mode 100644 index 000000000..fba190e5a --- /dev/null +++ b/pkg/lock.go @@ -0,0 +1,273 @@ +/* +Copyright AppsCode Inc. and Contributors + +Licensed under the AppsCode Free Trial License 1.0.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/appscode/licenses/raw/1.0.0/AppsCode-Free-Trial-1.0.0.md + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pkg + +import ( + "encoding/json" + "fmt" + "time" + + "gomodules.xyz/go-sh" + "k8s.io/klog/v2" +) + +func setupConfigServer(configSVRDSN, secondaryHost string) error { + klog.Infoln("Attempting to setup configserver", configSVRDSN) + + if secondaryHost == "" { + klog.Warningln("locking configserver is skipped. secondary host is empty") + return nil + } + v := make(map[string]interface{}) + // findAndModify BackupControlDocument. skip single quote inside single quote: https://stackoverflow.com/a/28786747/4628962 + args := append([]interface{}{ + "config", + "--host", configSVRDSN, + "--quiet", + "--eval", "db.BackupControl.findAndModify({query: { _id: 'BackupControlDocument' }, update: { $inc: { counter : 1 } }, new: true, upsert: true, writeConcern: { w: 'majority', wtimeout: 15000 }});", + }, mongoCreds...) + + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running findAndModify to setup configServer : %s ; output : %s \n", err.Error(), output) + return err + } + + err = json.Unmarshal(output, &v) + if err != nil { + klog.Errorf("Unmarshal error while running findAndModify to setup configServer : %s \n", err.Error()) + return err + } + val, ok := v["counter"].(float64) + if !ok || int(val) == 0 { + return fmt.Errorf("unable to modify BackupControlDocument. got response: %v", v) + } + val2 := float64(0) + timer := 0 // wait approximately 5 minutes. + for timer < 60 && (int(val2) == 0 || int(val) != int(val2)) { + timer++ + // find backupDocument from secondary configServer + args = append([]interface{}{ + "config", + "--host", secondaryHost, + "--quiet", + "--eval", "rs.secondaryOk(); db.BackupControl.find({ '_id' : 'BackupControlDocument' }).readConcern('majority');", + }, mongoCreds...) + + if err := sh.Command(MongoCMD, args...).UnmarshalJSON(&v); err != nil { + return err + } + + val2, ok = v["counter"].(float64) + if !ok { + return fmt.Errorf("unable to get BackupControlDocument. got response: %v", v) + } + if int(val) != int(val2) { + klog.V(5).Infof("BackupDocument counter in secondary %v is not same. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) + time.Sleep(time.Second * 5) + } + } + if timer >= 60 { + return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary %v to be same as primary. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) + } + + return nil +} + +func lockSecondaryMember(mongohost string) error { + klog.Infoln("Attempting to lock secondary member", mongohost) + + if mongohost == "" { + klog.Warningln("locking secondary member is skipped. secondary host is empty") + return nil + } + + // lock file + v := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(db.fsyncLock())", + }, mongoCreds...) + + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + + err = json.Unmarshal(output, &v) + if err != nil { + klog.Errorf("Unmarshal error while running fsyncLock on secondary : %s \n", err.Error()) + return err + } + + if val, ok := v["ok"].(float64); !ok || int(val) != 1 { + return fmt.Errorf("unable to lock the secondary host. got response: %v", v) + } + klog.Infof("secondary %s locked.", mongohost) + return nil +} + +func checkIfSecondaryLockedAndSync(mongohost string) error { + klog.Infoln("Checking if secondary %s is already locked.", mongohost) + + x := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(db.currentOp())", + }, mongoCreds...) + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running currentOp on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + err = json.Unmarshal(output, &x) + if err != nil { + klog.Errorf("Unmarshal error while running currentOp on secondary : %s \n", err.Error()) + return err + } + + val, ok := x["fsyncLock"].(bool) + if ok && val { + klog.Infoln("Found fsyncLock true while locking") + err := unlockSecondaryMember(mongohost) + if err != nil { + return err + } + if err := waitForSecondarySync(mongohost); err != nil { + return err + } + } + return nil +} + +func waitForSecondarySync(mongohost string) error { + klog.Infoln("Attempting to sync secondary with primary") + + for { + status := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(rs.status())", + }, mongoCreds...) + + if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&status); err != nil { + klog.Errorf("Error while running status on secondary : %s ; output : %s \n", mongohost, err.Error()) + return err + } + + members, ok := status["members"].([]interface{}) + if !ok { + return fmt.Errorf("unable to get members using rs.status(). got response: %v", status) + } + + var masterOptimeDate, curOptimeDate time.Time + + for _, member := range members { + + memberInfo, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get member info of primary using rs.status(). got response: %v", member) + } + + if memberInfo["stateStr"] == "PRIMARY" { + optimedate, ok := memberInfo["optimeDate"].(string) + if !ok { + return fmt.Errorf("unable to get optimedate of primary using rs.status(). got response: %v", memberInfo) + } + + convTime, err := getTime(optimedate) + if err != nil { + return err + } + masterOptimeDate = convTime + break + } + } + synced := true + for _, member := range members { + + memberInfo, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get member info of secondary using rs.status(). got response: %v", member) + } + + if memberInfo["stateStr"] == "SECONDARY" && memberInfo["name"] == mongohost { + + optimedate, ok := memberInfo["optimeDate"].(string) + if !ok { + return fmt.Errorf("unable to get optimedate of secondary using rs.status(). got response: %v", memberInfo) + } + + convTime, err := getTime(optimedate) + if err != nil { + return err + } + curOptimeDate = convTime + if curOptimeDate.Before(masterOptimeDate) { + synced = false + } + break + } + } + if synced { + break + } + + klog.Infoln("Waiting... database is not synced yet") + time.Sleep(5 * time.Second) + } + return nil +} + +func unlockSecondaryMember(mongohost string) error { + klog.Infoln("Attempting to unlock secondary member", mongohost) + if mongohost == "" { + klog.Warningln("skipped unlocking secondary member. secondary host is empty") + return nil + } + v := make(map[string]interface{}) + + // unlock file + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(db.fsyncUnlock())", + }, mongoCreds...) + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running fsyncUnlock on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + err = json.Unmarshal(output, &v) + if err != nil { + klog.Errorf("Unmarshal error while running fsyncUnlock on secondary : %s \n", err.Error()) + return err + } + if val, ok := v["ok"].(float64); !ok || int(val) != 1 { + return fmt.Errorf("unable to lock the secondary host. got response: %v", v) + } + klog.Infof("secondary %s unlocked.", mongohost) + return nil +} From 700553ab7358beef833db34ff6d61758e07e6a2d Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Wed, 27 Sep 2023 16:16:45 +0600 Subject: [PATCH 4/5] unlock and sync for all secondaries Signed-off-by: sayedppqq --- pkg/backup.go | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/pkg/backup.go b/pkg/backup.go index 5015efd01..5ddcdb874 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -442,7 +442,7 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic if parameters.ConfigServer != "" { // sharded cluster. so disable the balancer first. then perform the 'usual' tasks. - primary, secondary, err := getPrimaryNSecondaryMember(parameters.ConfigServer) + primary, secondary, secondaryMembers, err := getPrimaryNSecondaryMember(parameters.ConfigServer) if err != nil { return nil, err } @@ -466,8 +466,10 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic // Check if secondary is already locked before locking it. // If yes, unlock it and sync with primary - if err := checkIfSecondaryLockedAndSync(secondary); err != nil { - return nil, err + for _, secondary := range secondaryMembers { + if err := checkIfSecondaryLockedAndSync(secondary); err != nil { + return nil, err + } } if err := setupConfigServer(parameters.ConfigServer, secondary); err != nil { @@ -488,7 +490,7 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic for key, host := range parameters.ReplicaSets { // do the task - primary, secondary, err := getPrimaryNSecondaryMember(host) + primary, secondary, secondaryMembers, err := getPrimaryNSecondaryMember(host) if err != nil { klog.Errorf("error while getting primary and secondary member of %v. error: %v", host, err) return nil, err @@ -503,8 +505,10 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic // Check if secondary is already locked before locking it. // If yes, unlock it and sync with primary - if err := checkIfSecondaryLockedAndSync(secondary); err != nil { - return nil, err + for _, secondary := range secondaryMembers { + if err := checkIfSecondaryLockedAndSync(secondary); err != nil { + return nil, err + } } err = lockSecondaryMember(secondary) @@ -588,11 +592,10 @@ func getSSLUser(path string) (string, error) { return strings.TrimSpace(user), nil } -func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, err error) { +func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, secondaryMembers []string, err error) { klog.Infoln("finding primary and secondary instances of", mongoDSN) v := make(map[string]interface{}) - // stop balancer args := append([]interface{}{ "config", "--host", mongoDSN, @@ -601,33 +604,35 @@ func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, err }, mongoCreds...) // even --quiet doesn't skip replicaset PrimaryConnection log. so take tha last line. issue tracker: https://jira.mongodb.org/browse/SERVER-27159 if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&v); err != nil { - return "", "", err + return "", "", secondaryMembers, err } primary, ok := v["primary"].(string) if !ok || primary == "" { - return "", "", fmt.Errorf("unable to get primary instance using rs.isMaster(). got response: %v", v) + return "", "", secondaryMembers, fmt.Errorf("unable to get primary instance using rs.isMaster(). got response: %v", v) } hosts, ok := v["hosts"].([]interface{}) if !ok { - return "", "", fmt.Errorf("unable to get hosts using rs.isMaster(). got response: %v", v) + return "", "", secondaryMembers, fmt.Errorf("unable to get hosts using rs.isMaster(). got response: %v", v) } for _, host := range hosts { - secHost, ok := host.(string) - if !ok || secHost == "" { + curHost, ok := host.(string) + + if !ok || curHost == "" { err = fmt.Errorf("unable to get secondary instance using rs.isMaster(). got response: %v", v) continue } - - if secHost != primary { - klog.Infof("Primary %s & Secondary %s found for mongoDSN %s \n", primary, secHost, mongoDSN) - return primary, secHost, nil + if curHost != primary { + secondaryMembers = append(secondaryMembers, curHost) } } + if len(secondaryMembers) > 0 { + return primary, secondaryMembers[0], secondaryMembers, err + } - return primary, "", err + return primary, "", secondaryMembers, err } // run from mongos instance From fc60f5f3f822c68dc6a95126c9acefc98fdaede6 Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Wed, 27 Sep 2023 17:53:42 +0600 Subject: [PATCH 5/5] logs updated Signed-off-by: sayedppqq --- pkg/lock.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/lock.go b/pkg/lock.go index fba190e5a..1fd890c70 100644 --- a/pkg/lock.go +++ b/pkg/lock.go @@ -26,7 +26,7 @@ import ( ) func setupConfigServer(configSVRDSN, secondaryHost string) error { - klog.Infoln("Attempting to setup configserver", configSVRDSN) + klog.Infof("Attempting to setup configserver %s\n", configSVRDSN) if secondaryHost == "" { klog.Warningln("locking configserver is skipped. secondary host is empty") @@ -89,7 +89,7 @@ func setupConfigServer(configSVRDSN, secondaryHost string) error { } func lockSecondaryMember(mongohost string) error { - klog.Infoln("Attempting to lock secondary member", mongohost) + klog.Infof("Attempting to lock secondary member %s\n", mongohost) if mongohost == "" { klog.Warningln("locking secondary member is skipped. secondary host is empty") @@ -120,12 +120,12 @@ func lockSecondaryMember(mongohost string) error { if val, ok := v["ok"].(float64); !ok || int(val) != 1 { return fmt.Errorf("unable to lock the secondary host. got response: %v", v) } - klog.Infof("secondary %s locked.", mongohost) + klog.Infof("secondary %s locked.\n", mongohost) return nil } func checkIfSecondaryLockedAndSync(mongohost string) error { - klog.Infoln("Checking if secondary %s is already locked.", mongohost) + klog.Infof("Checking if secondary %s is already locked\n", mongohost) x := make(map[string]interface{}) args := append([]interface{}{ @@ -160,7 +160,7 @@ func checkIfSecondaryLockedAndSync(mongohost string) error { } func waitForSecondarySync(mongohost string) error { - klog.Infoln("Attempting to sync secondary with primary") + klog.Infof("Attempting to sync secondary %s with primary\n", mongohost) for { status := make(map[string]interface{}) @@ -231,6 +231,7 @@ func waitForSecondarySync(mongohost string) error { } } if synced { + klog.Infoln("database successfully synced") break } @@ -241,7 +242,7 @@ func waitForSecondarySync(mongohost string) error { } func unlockSecondaryMember(mongohost string) error { - klog.Infoln("Attempting to unlock secondary member", mongohost) + klog.Infof("Attempting to unlock secondary member %s\n", mongohost) if mongohost == "" { klog.Warningln("skipped unlocking secondary member. secondary host is empty") return nil @@ -268,6 +269,6 @@ func unlockSecondaryMember(mongohost string) error { if val, ok := v["ok"].(float64); !ok || int(val) != 1 { return fmt.Errorf("unable to lock the secondary host. got response: %v", v) } - klog.Infof("secondary %s unlocked.", mongohost) + klog.Infof("secondary %s unlocked\n", mongohost) return nil }