diff --git a/pkg/backup.go b/pkg/backup.go index 8c4410c9..3dec7b05 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -464,8 +464,17 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic backupHost = secondary } - err = lockConfigServer(parameters.ConfigServer, secondary) + // Check if secondary is already locked before locking it. + // If yes, unlock it and sync with primary + if err := checkIfSecondaryLockedAndSync(secondary); err != nil { + return nil, err + } + if err := setupConfigServer(parameters.ConfigServer, secondary); err != nil { + return nil, err + } + + err = lockSecondaryMember(secondary) cleanupFuncs = append(cleanupFuncs, func() error { // even if error occurs, try to unlock the server return unlockSecondaryMember(secondary) @@ -492,6 +501,12 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic backupHost = secondary } + // Check if secondary is already locked before locking it. + // If yes, unlock it and sync with primary + if err := checkIfSecondaryLockedAndSync(secondary); err != nil { + return nil, err + } + err = lockSecondaryMember(secondary) cleanupFuncs = append(cleanupFuncs, func() error { // even if error occurs, try to unlock the server @@ -700,8 +715,8 @@ func enableBalancer(mongosHost string) error { return nil } -func lockConfigServer(configSVRDSN, secondaryHost string) error { - klog.Infoln("Attempting to lock configserver", configSVRDSN) +func setupConfigServer(configSVRDSN, secondaryHost string) error { + klog.Infoln("Attempting to setup configserver", configSVRDSN) if secondaryHost == "" { klog.Warningln("locking configserver is skipped. secondary host is empty") @@ -718,13 +733,13 @@ func lockConfigServer(configSVRDSN, secondaryHost string) error { output, err := sh.Command(MongoCMD, args...).Output() if err != nil { - klog.Errorf("Error while running findAndModify to lock configServer : %s ; output : %s \n", err.Error(), output) + klog.Errorf("Error while running findAndModify to setup configServer : %s ; output : %s \n", err.Error(), output) return err } err = json.Unmarshal(output, &v) if err != nil { - klog.Errorf("Unmarshal error while running findAndModify to lock configServer : %s \n", err.Error()) + klog.Errorf("Unmarshal error while running findAndModify to setup configServer : %s \n", err.Error()) return err } val, ok := v["counter"].(float64) @@ -752,67 +767,35 @@ func lockConfigServer(configSVRDSN, secondaryHost string) error { return fmt.Errorf("unable to get BackupControlDocument. got response: %v", v) } if int(val) != int(val2) { - klog.V(5).Infof("BackupDocument counter in secondary is not same. Expected %v, but got %v. Full response: %v", val, val2, v) + klog.V(5).Infof("BackupDocument counter in secondary %v is not same. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) time.Sleep(time.Second * 5) } } if timer >= 60 { - return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary to be same as primary. Expected %v, but got %v. Full response: %v", val, val2, v) + return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary %v to be same as primary. Expected %v, but got %v. Full response: %v", secondaryHost, val, val2, v) } - // lock secondary - return lockSecondaryMember(secondaryHost) + + return nil } func lockSecondaryMember(mongohost string) error { klog.Infoln("Attempting to lock secondary member", mongohost) + if mongohost == "" { klog.Warningln("locking secondary member is skipped. secondary host is empty") return nil } - // Check if it is already locked - x := make(map[string]interface{}) - args := append([]interface{}{ - "config", - "--host", mongohost, - "--quiet", - "--eval", "JSON.stringify(db.currentOp())", - }, mongoCreds...) - output, err := sh.Command(MongoCMD, args...).Output() - if err != nil { - klog.Errorf("Error while running currentOp on secondary : %s ; output : %s \n", err.Error(), output) - return err - } - err = json.Unmarshal(output, &x) - if err != nil { - klog.Errorf("Unmarshal error while running currentOp on secondary : %s \n", err.Error()) - return err - } - - val, ok := x["fsyncLock"].(bool) - if ok && bool(val) == true { - // Already locked - err := unlockSecondaryMember(mongohost) - if err != nil { - return err - } - if err := waitForSecondarySync(mongohost); err != nil { - return err - } - } else if !ok { - return fmt.Errorf("unable to get fsyncLock of secondary using db.currentOp(). got response: %v", x) - } - // lock file v := make(map[string]interface{}) - args = append([]interface{}{ + args := append([]interface{}{ "config", "--host", mongohost, "--quiet", "--eval", "JSON.stringify(db.fsyncLock())", }, mongoCreds...) - output, err = sh.Command(MongoCMD, args...).Output() + output, err := sh.Command(MongoCMD, args...).Output() if err != nil { klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output) return err @@ -831,69 +814,108 @@ func lockSecondaryMember(mongohost string) error { return nil } -func waitForSecondarySync(mongohost string) error { - status := make(map[string]interface{}) +func checkIfSecondaryLockedAndSync(mongohost string) error { + klog.Infoln("Checking if secondary %s is already locked.", mongohost) + + x := make(map[string]interface{}) args := append([]interface{}{ "config", "--host", mongohost, "--quiet", - "--eval", "JSON.stringify(rs.status())", + "--eval", "JSON.stringify(db.currentOp())", }, mongoCreds...) - - if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&status); err != nil { - klog.Errorf("Error while running status on secondary : %s ; output : %s \n", mongohost, err.Error()) + output, err := sh.Command(MongoCMD, args...).Output() + if err != nil { + klog.Errorf("Error while running currentOp on secondary : %s ; output : %s \n", err.Error(), output) + return err + } + err = json.Unmarshal(output, &x) + if err != nil { + klog.Errorf("Unmarshal error while running currentOp on secondary : %s \n", err.Error()) return err } - members, ok := status["members"].([]map[string]interface{}) - if !ok { - return fmt.Errorf("unable to get members using rs.status(). got response: %v", status) + val, ok := x["fsyncLock"].(bool) + if ok && bool(val) == true { + klog.Infoln("Found fsyncLock true while locking") + err := unlockSecondaryMember(mongohost) + if err != nil { + return err + } + if err := waitForSecondarySync(mongohost); err != nil { + return err + } } + return nil +} - var masterOptime, masterOptimeDate, curOptime, curOptimeDate time.Time +func waitForSecondarySync(mongohost string) error { + klog.Infoln("Attempting to sync secondary with primary") - for _, member := range members { - if member["stateStr"] == "PRIMARY" { - optime, ok := member["optime"].(map[string]interface{}) - if !ok { - return fmt.Errorf("unable to get optime of primary using rs.status(). got response: %v", member) - } - optimedate, ok := member["optimeDate"] - if !ok { - return fmt.Errorf("unable to get optimedate of primary using rs.status(). got response: %v", member) - } + for { + status := make(map[string]interface{}) + args := append([]interface{}{ + "config", + "--host", mongohost, + "--quiet", + "--eval", "JSON.stringify(rs.status())", + }, mongoCreds...) - masterOptime, ok = optime["ts"].(time.Time) - if !ok { - return fmt.Errorf("unable to get timestamp of primary using rs.status(). got response: %v", optime) - } - masterOptimeDate = optimedate.(time.Time) - break + if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&status); err != nil { + klog.Errorf("Error while running status on secondary : %s ; output : %s \n", mongohost, err.Error()) + return err } - } - for { - synced := false + members, ok := status["members"].([]interface{}) + if !ok { + return fmt.Errorf("unable to get members using rs.status(). got response: %v", status) + } + + var masterOptimeDate, curOptimeDate time.Time + for _, member := range members { - if member["stateStr"] == "SECONDARY" && member["name"] == mongohost { - optime, ok := member["optime"].(map[string]interface{}) + memberInfo, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get member info of primary using rs.status(). got response: %v", member) + } + + if memberInfo["stateStr"] == "PRIMARY" { + optimedate, ok := memberInfo["optimeDate"].(string) if !ok { - return fmt.Errorf("unable to get optime of secondary using rs.status(). got response: %v", member) + return fmt.Errorf("unable to get optimedate of primary using rs.status(). got response: %v", memberInfo) } - optimedate, ok := member["optimeDate"] - if !ok { - return fmt.Errorf("unable to get optimedate of secondary using rs.status(). got response: %v", member) + + convTime, err := getTime(optimedate) + if err != nil { + return err } + masterOptimeDate = convTime + break + } + } + synced := true + for _, member := range members { - curOptime, ok = optime["ts"].(time.Time) + memberInfo, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("unable to get member info of secondary using rs.status(). got response: %v", member) + } + + if memberInfo["stateStr"] == "SECONDARY" && memberInfo["name"] == mongohost { + + optimedate, ok := memberInfo["optimeDate"].(string) if !ok { - return fmt.Errorf("unable to get timestamp of primary using rs.status(). got response: %v", optime) + return fmt.Errorf("unable to get optimedate of secondary using rs.status(). got response: %v", memberInfo) } - curOptimeDate = optimedate.(time.Time) - if curOptime == masterOptime && curOptimeDate == masterOptimeDate { - synced = true + convTime, err := getTime(optimedate) + if err != nil { + return err + } + curOptimeDate = convTime + if curOptimeDate.Before(masterOptimeDate) { + synced = false } break } diff --git a/pkg/utils.go b/pkg/utils.go index c28de5cf..227c5cbe 100644 --- a/pkg/utils.go +++ b/pkg/utils.go @@ -94,3 +94,18 @@ func containsArg(args []string, checklist sets.String) bool { } return false } + +func getTime(t string) (time.Time, error) { + // Define the layout or format of the input string + layout := "2006-01-02T15:04:05Z" + + parsedTime, err := time.Parse(layout, t) + if err != nil { + return time.Time{}, err + } + return parsedTime, nil +} + +func trim(t interface{}) (int64, error) { + return 0, nil +}