Skip to content

Commit

Permalink
Fix binlog backup not to be missing
Browse files Browse the repository at this point in the history
  • Loading branch information
shunki-fujita committed Nov 13, 2023
1 parent 6495742 commit 0f97ffa
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 95 deletions.
4 changes: 4 additions & 0 deletions api/v1beta1/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ type BackupStatus struct {
// SourceUUID is the `server_uuid` of the backup source instance.
SourceUUID string `json:"sourceUUID"`

// UUIDSet is the `server_uuid` set of all candidate instances for the backup source.
// +optional
UUIDSet map[string]string `json:"uuidSet"`

// BinlogFilename is the binlog filename that the backup source instance was writing to
// at the backup.
BinlogFilename string `json:"binlogFilename"`
Expand Down
2 changes: 2 additions & 0 deletions api/v1beta1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ type BackupStatus struct {
// SourceUUID is the `server_uuid` of the backup source instance.
SourceUUID string `json:"sourceUUID"`

// UUIDSet is the `server_uuid` set of all candidate instances for the backup source.
// +optional
UUIDSet map[string]string `json:"uuidSet"`

// BinlogFilename is the binlog filename that the backup source instance was writing to
// at the backup.
BinlogFilename string `json:"binlogFilename"`
Expand Down
7 changes: 7 additions & 0 deletions api/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

136 changes: 77 additions & 59 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package backup
import (
"context"
"fmt"
"golang.org/x/exp/slices"
"io"
"io/fs"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -45,6 +47,7 @@ type BackupManager struct {
startTime time.Time
sourceIndex int
status bkop.ServerStatus
uuidSet map[string]string
gtidSet string
dumpSize int64
binlogSize int64
Expand Down Expand Up @@ -117,7 +120,13 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
orderedPods[index] = &pods.Items[i]
}

sourceIndex, err := bm.ChoosePod(ctx, orderedPods)
uuidSet, err := bm.GetUUIDSet(ctx, orderedPods)
if err != nil {
return fmt.Errorf("failed to get server_uuid set: %w", err)
}
bm.uuidSet = uuidSet

sourceIndex, skipBackupBinlog, err := bm.ChoosePod(ctx, orderedPods)
if err != nil {
return fmt.Errorf("failed to choose source instance: %w", err)
}
Expand Down Expand Up @@ -146,8 +155,7 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
}

// dump and upload binlog for the second or later backups
lastBackup := &bm.cluster.Status.Backup
if !lastBackup.Time.IsZero() {
if !skipBackupBinlog {
if err := bm.backupBinlog(ctx, op); err != nil {
// since the full backup has succeeded, we should continue
ev := event.BackupNoBinlog.ToEvent(bm.clusterRef)
Expand All @@ -172,6 +180,7 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
sb.Elapsed = metav1.Duration{Duration: elapsed}
sb.SourceIndex = sourceIndex
sb.SourceUUID = bm.status.UUID
sb.UUIDSet = uuidSet
sb.BinlogFilename = bm.status.CurrentBinlog
sb.GTIDSet = bm.gtidSet
sb.DumpSize = bm.dumpSize
Expand All @@ -194,89 +203,98 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
return nil
}

func (bm *BackupManager) ChoosePod(ctx context.Context, pods []*corev1.Pod) (int, error) {
func (bm *BackupManager) GetUUIDSet(ctx context.Context, pods []*corev1.Pod) (map[string]string, error) {
cluster := bm.cluster
// if this is the first time
if cluster.Status.Backup.Time.IsZero() {
if len(pods) == 1 {
return 0, nil
}

for i := range pods {
if i == int(cluster.Status.CurrentPrimaryIndex) {
continue
uuids := map[string]string{}
for i := range pods {
if podIsReady(pods[i]) {
op, err := newOperator(cluster.PodHostname(i),
constants.MySQLPort,
constants.BackupUser,
bm.mysqlPassword,
bm.threads)
if err != nil {
return nil, fmt.Errorf("failed to create operator: %w", err)
}
if podIsReady(pods[i]) {
return i, nil
defer op.Close()

if err := op.GetServerStatus(ctx, &bm.status); err != nil {
return nil, fmt.Errorf("failed to get server status: %w", err)
}
uuids[strconv.Itoa(i)] = bm.status.UUID
}
return int(cluster.Status.CurrentPrimaryIndex), nil
}

lastIndex := cluster.Status.Backup.SourceIndex
op, err := newOperator(cluster.PodHostname(lastIndex),
constants.MySQLPort,
constants.BackupUser,
bm.mysqlPassword,
bm.threads)
if err != nil {
return -1, err
}
defer op.Close()
return uuids, nil
}

st := &bkop.ServerStatus{}
if err := op.GetServerStatus(ctx, st); err != nil {
return -1, err
func (bm *BackupManager) GetChoosableIndexes(ctx context.Context, current, last map[string]string) ([]int, error) {
choosable := []int{}
for i, currentUUID := range current {
if lastUUID, ok := last[i]; ok && currentUUID == lastUUID {
index, err := strconv.Atoi(i)
if err == nil {
choosable = append(choosable, index)
}
}
}
sort.Ints(choosable)
return choosable, nil
}

if st.UUID != cluster.Status.Backup.SourceUUID {
bm.log.Info("server_uuid of the last backup source has changed", "index", lastIndex)
func (bm *BackupManager) ChoosePod(ctx context.Context, pods []*corev1.Pod) (int, bool, error) {
currentPrimaryIndex := int(bm.cluster.Status.CurrentPrimaryIndex)
lastBackup := &bm.cluster.Status.Backup
// if this is the first time
if lastBackup.Time.IsZero() {
if len(pods) == 1 {
return 0, true, nil
}

for i := range pods {
if i == lastIndex {
continue
}
if i == int(cluster.Status.CurrentPrimaryIndex) {
if i == currentPrimaryIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
return i, true, nil
}
}
return cluster.Status.CurrentPrimaryIndex, nil
return currentPrimaryIndex, true, nil
}

if !podIsReady(pods[lastIndex]) {
bm.log.Info("the last backup source is not ready", "index", lastIndex)
lastIndex := lastBackup.SourceIndex
choosableIndexes, err := bm.GetChoosableIndexes(ctx, bm.uuidSet, lastBackup.UUIDSet)
if err != nil {
return 0, true, fmt.Errorf("failed to get choosable pods: %w", err)
}

for i := range pods {
if i == lastIndex {
continue
}
if i == int(cluster.Status.CurrentPrimaryIndex) {
if len(choosableIndexes) == 0 {
bm.log.Info("the server_uuid of all pods has changed")
bm.warnings = append(bm.warnings, "skip binlog backups because some binlog files may be missing")
return currentPrimaryIndex, true, nil
}

if !slices.Contains(choosableIndexes, lastIndex) {
bm.log.Info("the last backup source is not available or server_uuid has been changed", "index", lastIndex)
for _, i := range choosableIndexes {
if i == currentPrimaryIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
}
return i, false, nil
}
return cluster.Status.CurrentPrimaryIndex, nil
return currentPrimaryIndex, false, nil
}

if lastIndex == int(cluster.Status.CurrentPrimaryIndex) {
if lastIndex == currentPrimaryIndex {
bm.log.Info("the last backup source is not a replica", "index", lastIndex)
for i := range pods {
if i == lastIndex {
for _, i := range choosableIndexes {
if i == currentPrimaryIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
}
return i, false, nil
}
return cluster.Status.CurrentPrimaryIndex, nil
return currentPrimaryIndex, false, nil
}

return lastIndex, nil
return lastIndex, false, nil
}

func (bm *BackupManager) backupFull(ctx context.Context, op bkop.Operator) error {
Expand Down Expand Up @@ -361,7 +379,7 @@ func (bm *BackupManager) backupBinlog(ctx context.Context, op bkop.Operator) err
}

if err := op.DumpBinlog(ctx, binlogDir, binlogName, lastBackup.GTIDSet); err != nil {
return fmt.Errorf("failed to take a full dump: %w", err)
return fmt.Errorf("failed to exec mysqlbinlog command: %w", err)
}

usage, err := dirUsage(binlogDir)
Expand Down
Loading

0 comments on commit 0f97ffa

Please sign in to comment.