Skip to content

Commit

Permalink
Fix binlog backup not to be missing
Browse files Browse the repository at this point in the history
  • Loading branch information
shunki-fujita committed Nov 13, 2023
1 parent 6495742 commit e60ff63
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 77 deletions.
4 changes: 4 additions & 0 deletions api/v1beta1/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ type BackupStatus struct {
// SourceUUID is the `server_uuid` of the backup source instance.
SourceUUID string `json:"sourceUUID"`

// UUIDSet is the `server_uuid` set of all candidate instances for the backup source.
// +optional
UUIDSet map[string]string `json:"uuidSet"`

// BinlogFilename is the binlog filename that the backup source instance was writing to
// at the backup.
BinlogFilename string `json:"binlogFilename"`
Expand Down
2 changes: 2 additions & 0 deletions api/v1beta1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ type BackupStatus struct {
// SourceUUID is the `server_uuid` of the backup source instance.
SourceUUID string `json:"sourceUUID"`

// UUIDSet is the `server_uuid` set of all candidate instances for the backup source.
// +optional
UUIDSet map[string]string `json:"uuidSet"`

// BinlogFilename is the binlog filename that the backup source instance was writing to
// at the backup.
BinlogFilename string `json:"binlogFilename"`
Expand Down
7 changes: 7 additions & 0 deletions api/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 76 additions & 59 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package backup
import (
"context"
"fmt"
"golang.org/x/exp/slices"
"io"
"io/fs"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -45,6 +47,7 @@ type BackupManager struct {
startTime time.Time
sourceIndex int
status bkop.ServerStatus
uuidSet map[string]string
gtidSet string
dumpSize int64
binlogSize int64
Expand Down Expand Up @@ -117,7 +120,13 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
orderedPods[index] = &pods.Items[i]
}

sourceIndex, err := bm.ChoosePod(ctx, orderedPods)
uuidSet, err := bm.GetUUIDSet(ctx, orderedPods)
if err != nil {
return fmt.Errorf("failed to get server_uuid set: %w", err)
}
bm.uuidSet = uuidSet

sourceIndex, skipBackupBinlog, err := bm.ChoosePod(ctx, orderedPods)
if err != nil {
return fmt.Errorf("failed to choose source instance: %w", err)
}
Expand Down Expand Up @@ -146,8 +155,7 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
}

// dump and upload binlog for the second or later backups
lastBackup := &bm.cluster.Status.Backup
if !lastBackup.Time.IsZero() {
if !skipBackupBinlog {
if err := bm.backupBinlog(ctx, op); err != nil {
// since the full backup has succeeded, we should continue
ev := event.BackupNoBinlog.ToEvent(bm.clusterRef)
Expand All @@ -172,6 +180,7 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
sb.Elapsed = metav1.Duration{Duration: elapsed}
sb.SourceIndex = sourceIndex
sb.SourceUUID = bm.status.UUID
sb.UUIDSet = uuidSet
sb.BinlogFilename = bm.status.CurrentBinlog
sb.GTIDSet = bm.gtidSet
sb.DumpSize = bm.dumpSize
Expand All @@ -194,89 +203,97 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
return nil
}

func (bm *BackupManager) ChoosePod(ctx context.Context, pods []*corev1.Pod) (int, error) {
func (bm *BackupManager) GetUUIDSet(ctx context.Context, pods []*corev1.Pod) (map[string]string, error) {
cluster := bm.cluster
// if this is the first time
if cluster.Status.Backup.Time.IsZero() {
if len(pods) == 1 {
return 0, nil
}

for i := range pods {
if i == int(cluster.Status.CurrentPrimaryIndex) {
continue
uuids := map[string]string{}
for i := range pods {
if podIsReady(pods[i]) {
op, err := newOperator(cluster.PodHostname(i),
constants.MySQLPort,
constants.BackupUser,
bm.mysqlPassword,
bm.threads)
if err != nil {
return nil, fmt.Errorf("failed to create operator: %w", err)
}
if podIsReady(pods[i]) {
return i, nil
defer op.Close()

if err := op.GetServerStatus(ctx, &bm.status); err != nil {
return nil, fmt.Errorf("failed to get server status: %w", err)
}
uuids[strconv.Itoa(i)] = bm.status.UUID
}
return int(cluster.Status.CurrentPrimaryIndex), nil
}

lastIndex := cluster.Status.Backup.SourceIndex
op, err := newOperator(cluster.PodHostname(lastIndex),
constants.MySQLPort,
constants.BackupUser,
bm.mysqlPassword,
bm.threads)
if err != nil {
return -1, err
}
defer op.Close()
return uuids, nil
}

st := &bkop.ServerStatus{}
if err := op.GetServerStatus(ctx, st); err != nil {
return -1, err
func (bm *BackupManager) GetChoosableIndexes(ctx context.Context, current, last map[string]string) ([]int, error) {
choosable := []int{}
for i, currentUUID := range current {
if lastUUID, ok := last[i]; ok && currentUUID == lastUUID {
index, err := strconv.Atoi(i)
if err == nil {
choosable = append(choosable, index)
}
}
}
return sort.IntSlice(choosable), nil
}

if st.UUID != cluster.Status.Backup.SourceUUID {
bm.log.Info("server_uuid of the last backup source has changed", "index", lastIndex)
func (bm *BackupManager) ChoosePod(ctx context.Context, pods []*corev1.Pod) (int, bool, error) {
currentPrimaryIndex := int(bm.cluster.Status.CurrentPrimaryIndex)
lastBackup := &bm.cluster.Status.Backup
// if this is the first time
if lastBackup.Time.IsZero() {
if len(pods) == 1 {
return 0, true, nil
}

for i := range pods {
if i == lastIndex {
continue
}
if i == int(cluster.Status.CurrentPrimaryIndex) {
if i == currentPrimaryIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
return i, true, nil
}
}
return cluster.Status.CurrentPrimaryIndex, nil
return currentPrimaryIndex, true, nil
}

if !podIsReady(pods[lastIndex]) {
bm.log.Info("the last backup source is not ready", "index", lastIndex)
lastIndex := lastBackup.SourceIndex
choosableIndexes, err := bm.GetChoosableIndexes(ctx, bm.uuidSet, lastBackup.UUIDSet)
if err != nil {
return 0, true, fmt.Errorf("failed to get choosable pods: %w", err)
}

for i := range pods {
if i == lastIndex {
continue
}
if i == int(cluster.Status.CurrentPrimaryIndex) {
if len(choosableIndexes) == 0 {
bm.log.Info("the server_uuid of all pods has changed")
bm.warnings = append(bm.warnings, "skip binlog backups because some binlog files may be missing")
return currentPrimaryIndex, true, nil
}

if !slices.Contains(choosableIndexes, lastIndex) {
bm.log.Info("the last backup source is not available or server_uuid has been changed", "index", lastIndex)
for _, i := range choosableIndexes {
if i == currentPrimaryIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
}
return i, false, nil
}
return cluster.Status.CurrentPrimaryIndex, nil
return currentPrimaryIndex, false, nil
}

if lastIndex == int(cluster.Status.CurrentPrimaryIndex) {
if lastIndex == currentPrimaryIndex {
bm.log.Info("the last backup source is not a replica", "index", lastIndex)
for i := range pods {
if i == lastIndex {
for _, i := range choosableIndexes {
if i == currentPrimaryIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
}
return i, false, nil
}
return cluster.Status.CurrentPrimaryIndex, nil
return currentPrimaryIndex, false, nil
}

return lastIndex, nil
return lastIndex, false, nil
}

func (bm *BackupManager) backupFull(ctx context.Context, op bkop.Operator) error {
Expand Down Expand Up @@ -361,7 +378,7 @@ func (bm *BackupManager) backupBinlog(ctx context.Context, op bkop.Operator) err
}

if err := op.DumpBinlog(ctx, binlogDir, binlogName, lastBackup.GTIDSet); err != nil {
return fmt.Errorf("failed to take a full dump: %w", err)
return fmt.Errorf("failed to exec mysqlbinlog command: %w", err)
}

usage, err := dirUsage(binlogDir)
Expand Down
55 changes: 38 additions & 17 deletions backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backup

import (
"context"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -80,26 +81,41 @@ func TestChoosePod(t *testing.T) {

var lastOp *choosePodMockOp
newOperator = func(host string, port int, user, password string, threads int) (bkop.Operator, error) {
lastOp = &choosePodMockOp{uuid: "123"}
return lastOp, nil
}

makeBM := func(replicas, current int, bkup mocov1beta2.BackupStatus) *BackupManager {
makeBM := func(replicas, current int, bkup mocov1beta2.BackupStatus, pods []*corev1.Pod) *BackupManager {
cluster := &mocov1beta2.MySQLCluster{}
cluster.Spec.Replicas = int32(replicas)
cluster.Status.CurrentPrimaryIndex = current
cluster.Status.Backup = bkup
uuidSet := make(map[string]string)
for i, pod := range pods {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
uuidSet[strconv.Itoa(i)] = "uuid-" + strconv.Itoa(i)
}
}
}

return &BackupManager{
log: logr.Discard(),
cluster: cluster,
uuidSet: uuidSet,
}
}

makeBS := func(idx int, uuid string) mocov1beta2.BackupStatus {
makeBS := func(replicas, idx int, uuid string) mocov1beta2.BackupStatus {
uuidSet := make(map[string]string, replicas)
for i := 0; i < replicas; i++ {
uuidSet[strconv.Itoa(i)] = "uuid-" + strconv.Itoa(i)
}
uuidSet[strconv.Itoa(idx)] = uuid
return mocov1beta2.BackupStatus{
Time: metav1.Now(),
SourceIndex: idx,
SourceUUID: uuid,
UUIDSet: uuidSet,
}
}

Expand All @@ -111,26 +127,28 @@ func TestChoosePod(t *testing.T) {
pods []*corev1.Pod

expectIdx int
skipBackupBinlog bool
}{
{"single", 1, 0, mocov1beta2.BackupStatus{}, makePod1(true), 0},
{"single-not-ready", 1, 0, mocov1beta2.BackupStatus{}, makePod1(false), 0},
{"triple-ready", 3, 0, mocov1beta2.BackupStatus{}, makePod3(true, false, true), 2},
{"triple-not-ready", 3, 1, mocov1beta2.BackupStatus{}, makePod3(false, true, false), 1},
{"single-2nd", 1, 0, makeBS(0, "123"), makePod1(true), 0},
{"single-2nd-uuid-changed", 1, 0, makeBS(0, "abc"), makePod1(true), 0},
{"single-2nd-not-ready", 1, 0, makeBS(0, "123"), makePod1(false), 0},
{"triple-2nd", 3, 0, makeBS(1, "123"), makePod3(true, true, true), 1},
{"triple-2nd-uuid-changed", 3, 0, makeBS(1, "abc"), makePod3(true, true, true), 2},
{"triple-2nd-not-ready", 3, 0, makeBS(1, "123"), makePod3(true, false, true), 2},
{"triple-2nd-primary", 3, 1, makeBS(1, "123"), makePod3(true, true, true), 0},
{"triple-2nd-all-not-ready", 3, 0, makeBS(1, "123"), makePod3(true, false, false), 0},
{"single", 1, 0, mocov1beta2.BackupStatus{}, makePod1(true), 0, true},
{"single-not-ready", 1, 0, mocov1beta2.BackupStatus{}, makePod1(false), 0, true},
{"triple-ready", 3, 0, mocov1beta2.BackupStatus{}, makePod3(true, false, true), 2, true},
{"triple-not-ready", 3, 1, mocov1beta2.BackupStatus{}, makePod3(false, true, false), 1, true},
{"single-2nd", 1, 0, makeBS(1, 0, "uuid-0"), makePod1(true), 0, false},
{"single-2nd-uuid-changed", 1, 0, makeBS(1, 0, "uuid-a"), makePod1(true), 0, true},
{"single-2nd-not-ready", 1, 0, makeBS(1, 0, "uuid-0"), makePod1(false), 0, true},
{"triple-2nd", 3, 0, makeBS(3, 1, "uuid-1"), makePod3(true, true, true), 1, false},
{"triple-2nd-uuid-changed", 3, 0, makeBS(3, 1, "uuid-b"), makePod3(true, true, true), 2, false},
{"triple-2nd-not-ready", 3, 0, makeBS(3, 1, "uuid-1"), makePod3(true, false, true), 2, false},
{"triple-2nd-primary", 3, 1, makeBS(3, 1, "uuid-1"), makePod3(true, true, true), 0, false},
{"triple-2nd-all-not-ready", 3, 0, makeBS(3, 1, "uuid-1"), makePod3(true, false, false), 0, false},
{"trinpe-2nd-all-not-ready-and-uuid-changed", 3, 0, makeBS(3, 0, "uuid-a"), makePod3(true, false, false), 0, true},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
lastOp = nil
bm := makeBM(tc.replicas, tc.current, tc.bkup)
idx, err := bm.ChoosePod(context.Background(), tc.pods)
bm := makeBM(tc.replicas, tc.current, tc.bkup, tc.pods)
idx, skipBackupBinlog, err := bm.ChoosePod(context.Background(), tc.pods)
if lastOp != nil && !lastOp.closed {
t.Error("op was not closed")
}
Expand All @@ -142,6 +160,9 @@ func TestChoosePod(t *testing.T) {
if idx != tc.expectIdx {
t.Errorf("unexpected index %d, expected %d", idx, tc.expectIdx)
}
if skipBackupBinlog != tc.skipBackupBinlog {
t.Errorf("unexpected skipBackupBinlog %v, expected %v", skipBackupBinlog, tc.skipBackupBinlog)
}
})
}
}
Loading

0 comments on commit e60ff63

Please sign in to comment.