Skip to content

Commit

Permalink
unlock and sync secondary
Browse files Browse the repository at this point in the history
Signed-off-by: sayedppqq <[email protected]>
  • Loading branch information
sayedppqq committed Oct 3, 2023
1 parent f84a16a commit 5797bf7
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 147 deletions.
183 changes: 36 additions & 147 deletions pkg/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic

if parameters.ConfigServer != "" {
// sharded cluster. so disable the balancer first. then perform the 'usual' tasks.
primary, secondary, err := getPrimaryNSecondaryMember(parameters.ConfigServer)
primary, secondary, secondaryMembers, err := getPrimaryNSecondaryMember(parameters.ConfigServer)
if err != nil {
return nil, err
}
Expand All @@ -464,7 +464,19 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic
backupHost = secondary
}

err = lockConfigServer(parameters.ConfigServer, secondary)
// Check if secondary is already locked before locking it.
// If yes, unlock it and sync with primary
for _, secondary := range secondaryMembers {
if err := checkIfSecondaryLockedAndSync(secondary); err != nil {
return nil, err
}
}

if err := setupConfigServer(parameters.ConfigServer, secondary); err != nil {
return nil, err
}

err = lockSecondaryMember(secondary)

cleanupFuncs = append(cleanupFuncs, func() error {
// even if error occurs, try to unlock the server
Expand All @@ -479,7 +491,7 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic

for key, host := range parameters.ReplicaSets {
// do the task
primary, secondary, err := getPrimaryNSecondaryMember(host)
primary, secondary, secondaryMembers, err := getPrimaryNSecondaryMember(host)
if err != nil {
klog.Errorf("error while getting primary and secondary member of %v. error: %v", host, err)
return nil, err
Expand All @@ -492,6 +504,14 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic
backupHost = secondary
}

// Check if secondary is already locked before locking it.
// If yes, unlock it and sync with primary
for _, secondary := range secondaryMembers {
if err := checkIfSecondaryLockedAndSync(secondary); err != nil {
return nil, err
}
}

err = lockSecondaryMember(secondary)
cleanupFuncs = append(cleanupFuncs, func() error {
// even if error occurs, try to unlock the server
Expand Down Expand Up @@ -573,11 +593,10 @@ func getSSLUser(path string) (string, error) {
return strings.TrimSpace(user), nil
}

func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, err error) {
func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, secondaryMembers []string, err error) {
klog.Infoln("finding primary and secondary instances of", mongoDSN)
v := make(map[string]interface{})

// stop balancer
args := append([]interface{}{
"config",
"--host", mongoDSN,
Expand All @@ -586,33 +605,35 @@ func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, err
}, mongoCreds...)
// even --quiet doesn't skip replicaset PrimaryConnection log. so take tha last line. issue tracker: https://jira.mongodb.org/browse/SERVER-27159
if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&v); err != nil {
return "", "", err
return "", "", secondaryMembers, err
}

primary, ok := v["primary"].(string)
if !ok || primary == "" {
return "", "", fmt.Errorf("unable to get primary instance using rs.isMaster(). got response: %v", v)
return "", "", secondaryMembers, fmt.Errorf("unable to get primary instance using rs.isMaster(). got response: %v", v)
}

hosts, ok := v["hosts"].([]interface{})
if !ok {
return "", "", fmt.Errorf("unable to get hosts using rs.isMaster(). got response: %v", v)
return "", "", secondaryMembers, fmt.Errorf("unable to get hosts using rs.isMaster(). got response: %v", v)
}

for _, host := range hosts {
secHost, ok := host.(string)
if !ok || secHost == "" {
curHost, ok := host.(string)

if !ok || curHost == "" {
err = fmt.Errorf("unable to get secondary instance using rs.isMaster(). got response: %v", v)
continue
}

if secHost != primary {
klog.Infof("Primary %s & Secondary %s found for mongoDSN %s \n", primary, secHost, mongoDSN)
return primary, secHost, nil
if curHost != primary {
secondaryMembers = append(secondaryMembers, curHost)
}
}
if len(secondaryMembers) > 0 {
return primary, secondaryMembers[0], secondaryMembers, err
}

return primary, "", err
return primary, "", secondaryMembers, err
}

// run from mongos instance
Expand Down Expand Up @@ -699,135 +720,3 @@ func enableBalancer(mongosHost string) error {
klog.Info("Balancer successfully re-enabled.")
return nil
}

func lockConfigServer(configSVRDSN, secondaryHost string) error {
klog.Infoln("Attempting to lock configserver", configSVRDSN)

if secondaryHost == "" {
klog.Warningln("locking configserver is skipped. secondary host is empty")
return nil
}
v := make(map[string]interface{})
// findAndModify BackupControlDocument. skip single quote inside single quote: https://stackoverflow.com/a/28786747/4628962
args := append([]interface{}{
"config",
"--host", configSVRDSN,
"--quiet",
"--eval", "db.BackupControl.findAndModify({query: { _id: 'BackupControlDocument' }, update: { $inc: { counter : 1 } }, new: true, upsert: true, writeConcern: { w: 'majority', wtimeout: 15000 }});",
}, mongoCreds...)

output, err := sh.Command(MongoCMD, args...).Output()
if err != nil {
klog.Errorf("Error while running findAndModify to lock configServer : %s ; output : %s \n", err.Error(), output)
return err
}

err = json.Unmarshal(output, &v)
if err != nil {
klog.Errorf("Unmarshal error while running findAndModify to lock configServer : %s \n", err.Error())
return err
}
val, ok := v["counter"].(float64)
if !ok || int(val) == 0 {
return fmt.Errorf("unable to modify BackupControlDocument. got response: %v", v)
}
val2 := float64(0)
timer := 0 // wait approximately 5 minutes.
for timer < 60 && (int(val2) == 0 || int(val) != int(val2)) {
timer++
// find backupDocument from secondary configServer
args = append([]interface{}{
"config",
"--host", secondaryHost,
"--quiet",
"--eval", "rs.secondaryOk(); db.BackupControl.find({ '_id' : 'BackupControlDocument' }).readConcern('majority');",
}, mongoCreds...)

if err := sh.Command(MongoCMD, args...).UnmarshalJSON(&v); err != nil {
return err
}

val2, ok = v["counter"].(float64)
if !ok {
return fmt.Errorf("unable to get BackupControlDocument. got response: %v", v)
}
if int(val) != int(val2) {
klog.V(5).Infof("BackupDocument counter in secondary is not same. Expected %v, but got %v. Full response: %v", val, val2, v)
time.Sleep(time.Second * 5)
}
}
if timer >= 60 {
return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary to be same as primary. Expected %v, but got %v. Full response: %v", val, val2, v)
}
// lock secondary
return lockSecondaryMember(secondaryHost)
}

func lockSecondaryMember(mongohost string) error {
klog.Infoln("Attempting to lock secondary member", mongohost)
if mongohost == "" {
klog.Warningln("locking secondary member is skipped. secondary host is empty")
return nil
}
v := make(map[string]interface{})

// lock file
args := append([]interface{}{
"config",
"--host", mongohost,
"--quiet",
"--eval", "JSON.stringify(db.fsyncLock())",
}, mongoCreds...)

output, err := sh.Command(MongoCMD, args...).Output()
if err != nil {
klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output)
return err
}

err = json.Unmarshal(output, &v)
if err != nil {
klog.Errorf("Unmarshal error while running fsyncLock on secondary : %s \n", err.Error())
return err
}

if val, ok := v["ok"].(float64); !ok || int(val) != 1 {
return fmt.Errorf("unable to lock the secondary host. got response: %v", v)
}
klog.Infof("secondary %s locked.", mongohost)
return nil
}

func unlockSecondaryMember(mongohost string) error {
klog.Infoln("Attempting to unlock secondary member", mongohost)
if mongohost == "" {
klog.Warningln("skipped unlocking secondary member. secondary host is empty")
return nil
}
v := make(map[string]interface{})

// unlock file
args := append([]interface{}{
"config",
"--host", mongohost,
"--quiet",
"--eval", "JSON.stringify(db.fsyncUnlock())",
}, mongoCreds...)

output, err := sh.Command(MongoCMD, args...).Output()
if err != nil {
klog.Errorf("Error while running fsyncUnlock on secondary : %s ; output : %s \n", err.Error(), output)
return err
}
err = json.Unmarshal(output, &v)
if err != nil {
klog.Errorf("Unmarshal error while running fsyncUnlock on secondary : %s \n", err.Error())
return err
}

if val, ok := v["ok"].(float64); !ok || int(val) != 1 {
return fmt.Errorf("unable to lock the secondary host. got response: %v", v)
}
klog.Infof("secondary %s unlocked.", mongohost)
return nil
}
Loading

0 comments on commit 5797bf7

Please sign in to comment.