Skip to content

Commit

Permalink
core/metadata: migrate sandboxes bucket into v1
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Jul 30, 2024
1 parent 1fb1882 commit 4cfeb7b
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 11 deletions.
2 changes: 2 additions & 0 deletions core/metadata/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket {
func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
return createBucketIfNotExists(
tx,
bucketKeyVersion,
[]byte(namespace),
bucketKeyObjectSandboxes,
)
Expand All @@ -309,6 +310,7 @@ func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
func getSandboxBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
return getBucket(
tx,
bucketKeyVersion,
[]byte(namespace),
bucketKeyObjectSandboxes,
)
Expand Down
2 changes: 1 addition & 1 deletion core/metadata/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
// dbVersion represents updates to the schema
// version which are additions and compatible with
// prior version of the same schema.
dbVersion = 3
dbVersion = 4
)

// DBOpt configures how we set up the DB
Expand Down
96 changes: 95 additions & 1 deletion core/metadata/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,42 @@ func TestMigrations(t *testing.T) {
bref: "",
},
}

testSandboxes := []struct {
id string
keyValues [][3]string // {bucket, key, value}
}{
{
id: "sb1",
keyValues: [][3]string{
{
"", // is not sub bucket
"created", "2dayago",
},
{
"", // is not sub bucket
"updated", "1dayago",
},
{
"extension",
"labels", strings.Repeat("whoknows", 10),
},
},
},
{
id: "sb2",
keyValues: [][3]string{
{
"", // is not sub bucket
"sandboxer", "default",
},
{
"labels", "hello", "panic",
},
},
},
}

migrationTests := []struct {
name string
init func(*bolt.Tx) error
Expand Down Expand Up @@ -282,7 +318,6 @@ func TestMigrations(t *testing.T) {
return nil
},
},

{
name: "NoOp",
init: func(tx *bolt.Tx) error {
Expand All @@ -292,6 +327,65 @@ func TestMigrations(t *testing.T) {
return nil
},
},
{
name: "MigrateSandboxes",
init: func(tx *bolt.Tx) error {
allsbbkt, err := createBucketIfNotExists(tx, []byte("kubernetes"), bucketKeyObjectSandboxes)
if err != nil {
return err
}

for _, sbDef := range testSandboxes {
sbbkt, err := allsbbkt.CreateBucket([]byte(sbDef.id))
if err != nil {
return err
}

for _, keyValues := range sbDef.keyValues {
bkt := sbbkt
if keyValues[0] != "" {
bkt, err = sbbkt.CreateBucketIfNotExists([]byte(keyValues[0]))
if err != nil {
return err
}
}

if err = bkt.Put([]byte(keyValues[1]), []byte(keyValues[2])); err != nil {
return err
}
}
}
return nil
},
check: func(tx *bolt.Tx) error {
allsbbkt := getSandboxBucket(tx, "kubernetes")

for _, sbDef := range testSandboxes {
sbbkt := allsbbkt.Bucket([]byte(sbDef.id))

for _, keyValues := range sbDef.keyValues {
bkt := sbbkt
if keyValues[0] != "" {
bkt = sbbkt.Bucket([]byte(keyValues[0]))
}

key := []byte(keyValues[1])
expected := keyValues[2]

value := string(bkt.Get(key))
if value != expected {
return fmt.Errorf("expected %s, but got %s in sandbox %s", expected, value, sbDef.id)
}
}
}

allsbbkt = getBucket(tx, []byte("kubernetes"), bucketKeyObjectSandboxes)
if allsbbkt != nil {
return errors.New("old sandboxes bucket still exists")
}
return nil
},
},
}

if len(migrationTests) != len(migrations) {
Expand Down
93 changes: 92 additions & 1 deletion core/metadata/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package metadata

import bolt "go.etcd.io/bbolt"
import (
"bytes"
"fmt"

bolt "go.etcd.io/bbolt"
)

type migration struct {
schema string
Expand Down Expand Up @@ -50,6 +55,11 @@ var migrations = []migration{
version: 3,
migrate: noOpMigration,
},
{
schema: "v1",
version: 4,
migrate: migrateSandboxes,
},
}

// addChildLinks Adds children key to the snapshotters to enforce snapshot
Expand Down Expand Up @@ -160,6 +170,87 @@ func migrateIngests(tx *bolt.Tx) error {
return nil
}

// migrateSandboxes moves sandboxes from root bucket into v1 bucket.
func migrateSandboxes(tx *bolt.Tx) error {
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}

deletingBuckets := [][]byte{}

if merr := tx.ForEach(func(ns []byte, nsbkt *bolt.Bucket) error {
// Skip v1 bucket, even if users created sandboxes in v1 namespace.
if bytes.Equal(bucketKeyVersion, ns) {
return nil
}

deletingBuckets = append(deletingBuckets, ns)

allsbbkt := nsbkt.Bucket(bucketKeyObjectSandboxes)
if allsbbkt == nil {
return nil
}

tnsbkt, err := v1bkt.CreateBucketIfNotExists(ns)
if err != nil {
return fmt.Errorf("failed to create namespace %s in bucket %s: %w",
ns, bucketKeyVersion, err)
}

tallsbbkt, err := tnsbkt.CreateBucketIfNotExists(bucketKeyObjectSandboxes)
if err != nil {
return fmt.Errorf("failed to create bucket sandboxes in namespace %s: %w", ns, err)
}

return allsbbkt.ForEachBucket(func(sb []byte) error {
sbbkt := allsbbkt.Bucket(sb) // single sandbox bucket

tsbbkt, err := tallsbbkt.CreateBucketIfNotExists(sb)
if err != nil {
return fmt.Errorf("failed to create sandbox object %s in namespace %s: %w",
sb, ns, err)
}

// copy single
if cerr := sbbkt.ForEach(func(key, value []byte) error {
if value == nil {
return nil
}

return tsbbkt.Put(key, value)
}); cerr != nil {
return cerr
}

return sbbkt.ForEachBucket(func(subbkt []byte) error {
tsubbkt, err := tsbbkt.CreateBucketIfNotExists(subbkt)
if err != nil {
return fmt.Errorf("failed to create subbucket %s in sandbox %s (namespace %s): %w",
subbkt, sb, ns, err)
}

return sbbkt.Bucket(subbkt).ForEach(func(key, value []byte) error {
if value == nil {
return fmt.Errorf("unexpected bucket %s", key)
}
return tsubbkt.Put(key, value)
})
})
})
}); merr != nil {
return fmt.Errorf("failed to copy sandboxes into v1 bucket: %w", err)
}

for _, ns := range deletingBuckets {
derr := tx.DeleteBucket(ns)
if derr != nil {
return fmt.Errorf("failed to cleanup bucket %s in root: %w", ns, err)
}
}
return nil
}

// noOpMigration was for a database change from boltdb/bolt which is no
// longer being supported, to go.etcd.io/bbolt which is the currently
// maintained repo for boltdb.
Expand Down
2 changes: 1 addition & 1 deletion integration/container_volume_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ version = 3
require.NoError(t, err)

t.Logf("Starting containerd")
currentProc := newCtrdProc(t, "containerd", workDir)
currentProc := newCtrdProc(t, "containerd", workDir, nil)
require.NoError(t, currentProc.isReady())
t.Cleanup(func() {
t.Log("Cleanup all the pods")
Expand Down
117 changes: 117 additions & 0 deletions integration/issue10467_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"fmt"
"path/filepath"
"syscall"
"testing"
"time"

"github.com/containerd/continuity/fs"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)

func TestIssue10467(t *testing.T) {
latestVersion := "v1.7.20"

releaseBinDir := t.TempDir()

downloadReleaseBinary(t, releaseBinDir, latestVersion)

t.Logf("Install config for release %s", latestVersion)
workDir := t.TempDir()
previousReleaseCtrdConfig(t, releaseBinDir, workDir)

t.Log("Starting the previous release's containerd")
previousCtrdBinPath := filepath.Join(releaseBinDir, "bin", "containerd")
previousProc := newCtrdProc(t, previousCtrdBinPath, workDir, []string{"ENABLE_CRI_SANDBOXES=yes"})

boltdbPath := filepath.Join(workDir, "root", "io.containerd.metadata.v1.bolt", "meta.db")

ctrdLogPath := previousProc.logPath()
t.Cleanup(func() {
if t.Failed() {
dumpFileContent(t, ctrdLogPath)
}
})

require.NoError(t, previousProc.isReady())

needToCleanup := true
t.Cleanup(func() {
if t.Failed() && needToCleanup {
t.Logf("Try to cleanup leaky pods")
cleanupPods(t, previousProc.criRuntimeService(t))
}
})

t.Log("Prepare pods for current release")
upgradeCaseFunc, hookFunc := shouldManipulateContainersInPodAfterUpgrade(t, previousProc.criRuntimeService(t), previousProc.criImageService(t))
needToCleanup = false
require.Nil(t, hookFunc)

t.Log("Gracefully stop previous release's containerd process")
require.NoError(t, previousProc.kill(syscall.SIGTERM))
require.NoError(t, previousProc.wait(5*time.Minute))

t.Logf("%s should have bucket k8s.io in root", boltdbPath)
db, err := bbolt.Open(boltdbPath, 0600, &bbolt.Options{ReadOnly: true})
require.NoError(t, err)
require.NoError(t, db.View(func(tx *bbolt.Tx) error {
if tx.Bucket([]byte("k8s.io")) == nil {
return fmt.Errorf("expected k8s.io bucket")
}
return nil
}))
require.NoError(t, db.Close())

t.Log("Install default config for current release")
currentReleaseCtrdDefaultConfig(t, workDir)

t.Log("Starting the current release's containerd")
currentProc := newCtrdProc(t, "containerd", workDir, nil)
require.NoError(t, currentProc.isReady())

t.Cleanup(func() {
t.Log("Cleanup all the pods")
cleanupPods(t, currentProc.criRuntimeService(t))

t.Log("Stopping current release's containerd process")
require.NoError(t, currentProc.kill(syscall.SIGTERM))
require.NoError(t, currentProc.wait(5*time.Minute))
})

t.Logf("%s should not have bucket k8s.io in root after restart", boltdbPath)
copiedBoltdbPath := filepath.Join(t.TempDir(), "meta.db.new")
require.NoError(t, fs.CopyFile(copiedBoltdbPath, boltdbPath))

db, err = bbolt.Open(copiedBoltdbPath, 0600, &bbolt.Options{ReadOnly: true})
require.NoError(t, err)
require.NoError(t, db.View(func(tx *bbolt.Tx) error {
if tx.Bucket([]byte("k8s.io")) != nil {
return fmt.Errorf("unexpected k8s.io bucket")
}
return nil
}))
require.NoError(t, db.Close())

t.Log("Verifing")
upgradeCaseFunc(t, currentProc.criRuntimeService(t), currentProc.criImageService(t))
}
Loading

0 comments on commit 4cfeb7b

Please sign in to comment.