Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] Adding optional revision bump and mark compacted to snapshot restore #16193

Merged
merged 2 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions clientv3/snapshot/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,35 @@ type revision struct {
sub int64
}

// GreaterThan should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func (a revision) GreaterThan(b revision) bool {
if a.main > b.main {
return true
}
if a.main < b.main {
return false
}
return a.sub > b.sub
}

// bytesToRev should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}

// revToBytes should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func revToBytes(bytes []byte, rev revision) {
binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main))
bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}

// initIndex implements ConsistentIndexGetter so the snapshot won't block
// the new raft instance by waiting for a future raft index.
type initIndex int
Expand Down
82 changes: 82 additions & 0 deletions clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ type RestoreConfig struct {
// SkipHashCheck is "true" to ignore snapshot integrity hash value
// (required if copied from data directory).
SkipHashCheck bool

// RevisionBump is the amount to increase the latest revision after restore,
// to allow administrators to trick clients into thinking that revision never decreased.
// If 0, revision bumping is skipped.
// (required if MarkCompacted == true)
RevisionBump uint64

// MarkCompacted is "true" to mark the latest revision as compacted.
// (required if RevisionBump > 0)
MarkCompacted bool
}

// Restore restores a new etcd data directory from given snapshot file.
Expand Down Expand Up @@ -303,6 +313,13 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
if err = s.saveDB(); err != nil {
return err
}

if cfg.MarkCompacted && cfg.RevisionBump > 0 {
if err = s.modifyLatestRevision(cfg.RevisionBump); err != nil {
return err
}
}

if err = s.saveWALAndSnap(); err != nil {
return err
}
Expand Down Expand Up @@ -417,6 +434,71 @@ func (s *v3Manager) saveDB() error {
return nil
}

// modifyLatestRevision can increase the latest revision by the given amount and sets the scheduled compaction
// to that revision so that the server will consider this revision compacted.
func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
dbpath := filepath.Join(s.snapDir, "db")
be := backend.NewDefaultBackend(dbpath)
defer func() {
be.ForceCommit()
be.Close()
}()

tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
wenjiaswe marked this conversation as resolved.
Show resolved Hide resolved

latest, err := s.unsafeGetLatestRevision(tx)
if err != nil {
return err
}

latest = s.unsafeBumpRevision(tx, latest, int64(bumpAmount))
s.unsafeMarkRevisionCompacted(tx, latest)

return nil
}

func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amount int64) revision {
s.lg.Info(
"bumping latest revision",
zap.Int64("latest-revision", latest.main),
zap.Int64("bump-amount", amount),
zap.Int64("new-latest-revision", latest.main+amount),
)

latest.main += amount
latest.sub = 0
k := make([]byte, 17)
revToBytes(k, latest)
tx.UnsafePut([]byte("key"), k, []byte{})

return latest
}

func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revision) {
s.lg.Info(
"marking revision compacted",
zap.Int64("revision", latest.main),
)

mvcc.UnsafeSetScheduledCompact(tx, latest.main)
}

func (s *v3Manager) unsafeGetLatestRevision(tx backend.BatchTx) (revision, error) {
var latest revision
err := tx.UnsafeForEach([]byte("key"), func(k, _ []byte) (err error) {
rev := bytesToRev(k)

if rev.GreaterThan(latest) {
latest = rev
}

return nil
})
return latest, err
}

// saveWALAndSnap creates a WAL for the initial cluster
func (s *v3Manager) saveWALAndSnap() error {
if err := fileutil.CreateDirAll(s.walDir); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions etcdctl/ctlv3/command/snapshot_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
restorePeerURLs string
restoreName string
skipHashCheck bool
markCompacted bool
revisionBump uint64
)

// NewSnapshotCommand returns the cobra command for "snapshot".
Expand Down Expand Up @@ -85,6 +87,8 @@ func NewSnapshotRestoreCommand() *cobra.Command {
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member")
cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)")
cmd.Flags().Uint64Var(&revisionBump, "bump-revision", 0, "How much to increase the latest revision after restore")
cmd.Flags().BoolVar(&markCompacted, "mark-compacted", false, "Mark the latest revision after restore as the point of scheduled compaction (required if --bump-revision > 0, disallowed otherwise)")

return cmd
}
Expand Down Expand Up @@ -166,6 +170,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
InitialCluster: restoreCluster,
InitialClusterToken: restoreClusterToken,
SkipHashCheck: skipHashCheck,
RevisionBump: revisionBump,
MarkCompacted: markCompacted,
}); err != nil {
ExitWithError(ExitError, err)
}
Expand Down
6 changes: 6 additions & 0 deletions mvcc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,9 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
be.BatchTx().UnsafePut(keyBucketName, ibytes, d)
be.BatchTx().Unlock()
}

func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) {
rbytes := newRevBytes()
revToBytes(revision{main: value}, rbytes)
tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
}
168 changes: 168 additions & 0 deletions tests/e2e/ctl_v3_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package e2e

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -25,7 +27,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/snapshot"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/expect"
"go.etcd.io/etcd/pkg/testutil"
)
Expand Down Expand Up @@ -267,3 +272,166 @@ func TestIssue6361(t *testing.T) {
t.Fatal(err)
}
}

func TestRestoreCompactionRevBump(t *testing.T) {
defer testutil.AfterTest(t)

epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
clusterSize: 1,
initialToken: "new",
keepDataDir: true,
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()

dialTimeout := 10 * time.Second
prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ","), "--dial-timeout", dialTimeout.String()}

ctl := newClient(t, epc.EndpointsV3(), epc.cfg.clientTLS, epc.cfg.isClientAutoTLS)
watchCh := ctl.Watch(context.Background(), "foo", clientv3.WithPrefix())
// flake-fix: the watch can sometimes miss the first put below causing test failure
time.Sleep(100 * time.Millisecond)

kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}}
for i := range kvs {
if err = spawnWithExpect(append(prefixArgs, "put", kvs[i].key, kvs[i].val), "OK"); err != nil {
t.Fatal(err)
}
}

watchTimeout := 1 * time.Second
watchRes, err := keyValuesFromWatchChan(watchCh, len(kvs), watchTimeout)
require.NoErrorf(t, err, "failed to get key-values from watch channel %s", err)
require.Equal(t, kvs, watchRes)

// ensure we get the right revision back for each of the keys
currentRev := 4
baseRev := 2
hasKVs(t, ctl, kvs, currentRev, baseRev)

fpath := filepath.Join(t.TempDir(), "test.snapshot")

t.Log("etcdctl saving snapshot...")
require.NoError(t, spawnWithExpect(append(prefixArgs, "snapshot", "save", fpath), fmt.Sprintf("Snapshot saved at %s", fpath)))

// add some more kvs that are not in the snapshot that will be lost after restore
unsnappedKVs := []kv{{"unsnapped1", "one"}, {"unsnapped2", "two"}, {"unsnapped3", "three"}}
for i := range unsnappedKVs {
if err = spawnWithExpect(append(prefixArgs, "put", unsnappedKVs[i].key, unsnappedKVs[i].val), "OK"); err != nil {
t.Fatal(err)
}
}

t.Log("Stopping the original server...")
require.NoError(t, epc.Stop())

newDataDir := filepath.Join(t.TempDir(), "test.data")
t.Log("etcdctl restoring the snapshot...")
bumpAmount := 10000

err = spawnWithExpect([]string{
ctlBinPath,
"snapshot",
"restore", fpath,
"--name", epc.procs[0].Config().name,
"--initial-cluster", epc.procs[0].Config().initialCluster,
"--initial-cluster-token", epc.procs[0].Config().initialToken,
"--initial-advertise-peer-urls", epc.procs[0].Config().purl.String(),
"--bump-revision", fmt.Sprintf("%d", bumpAmount),
"--mark-compacted",
"--data-dir", newDataDir,
}, "added member")
if err != nil {
t.Fatal(err)
}

t.Log("(Re)starting the etcd member using the restored snapshot...")
epc.procs[0].Config().dataDirPath = newDataDir
for i := range epc.procs[0].Config().args {
if epc.procs[0].Config().args[i] == "--data-dir" {
epc.procs[0].Config().args[i+1] = newDataDir
}
}

require.NoError(t, epc.Restart())

t.Log("Ensuring the restored member has the correct data...")
hasKVs(t, ctl, kvs, currentRev, baseRev)

for i := range unsnappedKVs {
v, err := ctl.Get(context.Background(), unsnappedKVs[i].key)
require.NoError(t, err)
require.Equal(t, int64(0), v.Count)
}

cancelResult, ok := <-watchCh
require.True(t, ok, "watchChannel should be open")
require.Equal(t, v3rpc.ErrCompacted, cancelResult.Err())
require.Truef(t, cancelResult.Canceled, "expected ongoing watch to be cancelled after restoring with --mark-compacted")
require.Equal(t, int64(bumpAmount+currentRev), cancelResult.CompactRevision)
_, ok = <-watchCh
require.False(t, ok, "watchChannel should be closed after restoring with --mark-compacted")

// clients might restart the watch at the old base revision, that should not yield any new data
// everything up until bumpAmount+currentRev should return "already compacted"
for i := bumpAmount - 2; i < bumpAmount+currentRev; i++ {
watchCh = ctl.Watch(context.Background(), "foo", clientv3.WithPrefix(), clientv3.WithRev(int64(i)))
cancelResult := <-watchCh
require.Equal(t, v3rpc.ErrCompacted, cancelResult.Err())
require.Truef(t, cancelResult.Canceled, "expected ongoing watch to be cancelled after restoring with --mark-compacted")
require.Equal(t, int64(bumpAmount+currentRev), cancelResult.CompactRevision)
}

// a watch after that revision should yield successful results when a new put arrives
ctx, cancel := context.WithTimeout(context.Background(), watchTimeout*5)
defer cancel()
watchCh = ctl.Watch(ctx, "foo", clientv3.WithPrefix(), clientv3.WithRev(int64(bumpAmount+currentRev+1)))
if err = spawnWithExpect(append(prefixArgs, "put", "foo4", "val4"), "OK"); err != nil {
t.Fatal(err)
}
watchRes, err = keyValuesFromWatchChan(watchCh, 1, watchTimeout)
require.NoErrorf(t, err, "failed to get key-values from watch channel %s", err)
require.Equal(t, []kv{{"foo4", "val4"}}, watchRes)

}

func hasKVs(t *testing.T, ctl *clientv3.Client, kvs []kv, currentRev int, baseRev int) {
for i := range kvs {
v, err := ctl.Get(context.Background(), kvs[i].key)
require.NoError(t, err)
require.Equal(t, int64(1), v.Count)
require.Equal(t, kvs[i].val, string(v.Kvs[0].Value))
require.Equal(t, int64(baseRev+i), v.Kvs[0].CreateRevision)
require.Equal(t, int64(baseRev+i), v.Kvs[0].ModRevision)
require.Equal(t, int64(1), v.Kvs[0].Version)
}
}

func keyValuesFromWatchResponse(resp clientv3.WatchResponse) (kvs []kv) {
for _, event := range resp.Events {
kvs = append(kvs, kv{string(event.Kv.Key), string(event.Kv.Value)})
}
return kvs
}

func keyValuesFromWatchChan(wch clientv3.WatchChan, wantedLen int, timeout time.Duration) (kvs []kv, err error) {
for {
select {
case watchResp, ok := <-wch:
if ok {
kvs = append(kvs, keyValuesFromWatchResponse(watchResp)...)
if len(kvs) == wantedLen {
return kvs, nil
}
}
case <-time.After(timeout):
return nil, errors.New("closed watcher channel should not block")
}
}
}