diff --git a/core/metadata/buckets.go b/core/metadata/buckets.go index c075439581c8..bdfdafb4c93c 100644 --- a/core/metadata/buckets.go +++ b/core/metadata/buckets.go @@ -301,6 +301,7 @@ func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket { func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { return createBucketIfNotExists( tx, + bucketKeyVersion, []byte(namespace), bucketKeyObjectSandboxes, ) @@ -309,6 +310,7 @@ func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { func getSandboxBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { return getBucket( tx, + bucketKeyVersion, []byte(namespace), bucketKeyObjectSandboxes, ) diff --git a/core/metadata/db.go b/core/metadata/db.go index 25752397ab2a..6d3a7404d4c4 100644 --- a/core/metadata/db.go +++ b/core/metadata/db.go @@ -50,7 +50,7 @@ const ( // dbVersion represents updates to the schema // version which are additions and compatible with // prior version of the same schema. - dbVersion = 3 + dbVersion = 4 ) // DBOpt configures how we set up the DB diff --git a/core/metadata/db_test.go b/core/metadata/db_test.go index a952ae214f59..e79ab3f69a51 100644 --- a/core/metadata/db_test.go +++ b/core/metadata/db_test.go @@ -140,6 +140,42 @@ func TestMigrations(t *testing.T) { bref: "", }, } + + testSandboxes := []struct { + id string + keyValues [][3]string // {bucket, key, value} + }{ + { + id: "sb1", + keyValues: [][3]string{ + { + "", // is not sub bucket + "created", "2dayago", + }, + { + "", // is not sub bucket + "updated", "1dayago", + }, + { + "extension", + "labels", strings.Repeat("whoknows", 10), + }, + }, + }, + { + id: "sb2", + keyValues: [][3]string{ + { + "", // is not sub bucket + "sandboxer", "default", + }, + { + "labels", "hello", "panic", + }, + }, + }, + } + migrationTests := []struct { name string init func(*bolt.Tx) error @@ -282,7 +318,6 @@ func TestMigrations(t *testing.T) { return nil }, }, - { name: "NoOp", init: func(tx *bolt.Tx) error { @@ -292,6 +327,65 @@ func TestMigrations(t *testing.T) { return nil }, }, + { + name: "MigrateSandboxes", + init: func(tx *bolt.Tx) error { + allsbbkt, err := createBucketIfNotExists(tx, []byte("kubernetes"), bucketKeyObjectSandboxes) + if err != nil { + return err + } + + for _, sbDef := range testSandboxes { + sbbkt, err := allsbbkt.CreateBucket([]byte(sbDef.id)) + if err != nil { + return err + } + + for _, keyValues := range sbDef.keyValues { + bkt := sbbkt + if keyValues[0] != "" { + bkt, err = sbbkt.CreateBucketIfNotExists([]byte(keyValues[0])) + if err != nil { + return err + } + } + + if err = bkt.Put([]byte(keyValues[1]), []byte(keyValues[2])); err != nil { + return err + } + } + } + return nil + }, + check: func(tx *bolt.Tx) error { + allsbbkt := getSandboxBucket(tx, "kubernetes") + + for _, sbDef := range testSandboxes { + sbbkt := allsbbkt.Bucket([]byte(sbDef.id)) + + for _, keyValues := range sbDef.keyValues { + bkt := sbbkt + if keyValues[0] != "" { + bkt = sbbkt.Bucket([]byte(keyValues[0])) + } + + key := []byte(keyValues[1]) + expected := keyValues[2] + + value := string(bkt.Get(key)) + if value != expected { + return fmt.Errorf("expected %s, but got %s in sandbox %s", expected, value, sbDef.id) + } + } + } + + allsbbkt = getBucket(tx, []byte("kubernetes"), bucketKeyObjectSandboxes) + if allsbbkt != nil { + return errors.New("old sandboxes bucket still exists") + } + return nil + }, + }, } if len(migrationTests) != len(migrations) { diff --git a/core/metadata/migrations.go b/core/metadata/migrations.go index 34febdd15965..c5078751054c 100644 --- a/core/metadata/migrations.go +++ b/core/metadata/migrations.go @@ -16,7 +16,12 @@ package metadata -import bolt "go.etcd.io/bbolt" +import ( + "bytes" + "fmt" + + bolt "go.etcd.io/bbolt" +) type migration struct { schema string @@ -50,6 +55,11 @@ var migrations = []migration{ version: 3, migrate: noOpMigration, }, + { + schema: "v1", + version: 4, + migrate: migrateSandboxes, + }, } // addChildLinks Adds children key to the snapshotters to enforce snapshot @@ -160,6 +170,87 @@ func migrateIngests(tx *bolt.Tx) error { return nil } +// migrateSandboxes moves sandboxes from root bucket into v1 bucket. +func migrateSandboxes(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + + deletingBuckets := [][]byte{} + + if merr := tx.ForEach(func(ns []byte, nsbkt *bolt.Bucket) error { + // Skip v1 bucket, even if users created sandboxes in v1 namespace. + if bytes.Equal(bucketKeyVersion, ns) { + return nil + } + + deletingBuckets = append(deletingBuckets, ns) + + allsbbkt := nsbkt.Bucket(bucketKeyObjectSandboxes) + if allsbbkt == nil { + return nil + } + + tnsbkt, err := v1bkt.CreateBucketIfNotExists(ns) + if err != nil { + return fmt.Errorf("failed to create namespace %s in bucket %s: %w", + ns, bucketKeyVersion, err) + } + + tallsbbkt, err := tnsbkt.CreateBucketIfNotExists(bucketKeyObjectSandboxes) + if err != nil { + return fmt.Errorf("failed to create bucket sandboxes in namespace %s: %w", ns, err) + } + + return allsbbkt.ForEachBucket(func(sb []byte) error { + sbbkt := allsbbkt.Bucket(sb) // single sandbox bucket + + tsbbkt, err := tallsbbkt.CreateBucketIfNotExists(sb) + if err != nil { + return fmt.Errorf("failed to create sandbox object %s in namespace %s: %w", + sb, ns, err) + } + + // copy single + if cerr := sbbkt.ForEach(func(key, value []byte) error { + if value == nil { + return nil + } + + return tsbbkt.Put(key, value) + }); cerr != nil { + return cerr + } + + return sbbkt.ForEachBucket(func(subbkt []byte) error { + tsubbkt, err := tsbbkt.CreateBucketIfNotExists(subbkt) + if err != nil { + return fmt.Errorf("failed to create subbucket %s in sandbox %s (namespace %s): %w", + subbkt, sb, ns, err) + } + + return sbbkt.Bucket(subbkt).ForEach(func(key, value []byte) error { + if value == nil { + return fmt.Errorf("unexpected bucket %s", key) + } + return tsubbkt.Put(key, value) + }) + }) + }) + }); merr != nil { + return fmt.Errorf("failed to copy sandboxes into v1 bucket: %w", err) + } + + for _, ns := range deletingBuckets { + derr := tx.DeleteBucket(ns) + if derr != nil { + return fmt.Errorf("failed to cleanup bucket %s in root: %w", ns, err) + } + } + return nil +} + // noOpMigration was for a database change from boltdb/bolt which is no // longer being supported, to go.etcd.io/bbolt which is the currently // maintained repo for boltdb. diff --git a/integration/container_volume_linux_test.go b/integration/container_volume_linux_test.go index 708559018f5a..9f5cbec16aae 100644 --- a/integration/container_volume_linux_test.go +++ b/integration/container_volume_linux_test.go @@ -56,7 +56,7 @@ version = 3 require.NoError(t, err) t.Logf("Starting containerd") - currentProc := newCtrdProc(t, "containerd", workDir) + currentProc := newCtrdProc(t, "containerd", workDir, nil) require.NoError(t, currentProc.isReady()) t.Cleanup(func() { t.Log("Cleanup all the pods") diff --git a/integration/issue10467_linux_test.go b/integration/issue10467_linux_test.go new file mode 100644 index 000000000000..96a8d4ed511b --- /dev/null +++ b/integration/issue10467_linux_test.go @@ -0,0 +1,117 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package integration + +import ( + "fmt" + "path/filepath" + "syscall" + "testing" + "time" + + "github.com/containerd/continuity/fs" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func TestIssue10467(t *testing.T) { + latestVersion := "v1.7.20" + + releaseBinDir := t.TempDir() + + downloadReleaseBinary(t, releaseBinDir, latestVersion) + + t.Logf("Install config for release %s", latestVersion) + workDir := t.TempDir() + previousReleaseCtrdConfig(t, releaseBinDir, workDir) + + t.Log("Starting the previous release's containerd") + previousCtrdBinPath := filepath.Join(releaseBinDir, "bin", "containerd") + previousProc := newCtrdProc(t, previousCtrdBinPath, workDir, []string{"ENABLE_CRI_SANDBOXES=yes"}) + + boltdbPath := filepath.Join(workDir, "root", "io.containerd.metadata.v1.bolt", "meta.db") + + ctrdLogPath := previousProc.logPath() + t.Cleanup(func() { + if t.Failed() { + dumpFileContent(t, ctrdLogPath) + } + }) + + require.NoError(t, previousProc.isReady()) + + needToCleanup := true + t.Cleanup(func() { + if t.Failed() && needToCleanup { + t.Logf("Try to cleanup leaky pods") + cleanupPods(t, previousProc.criRuntimeService(t)) + } + }) + + t.Log("Prepare pods for current release") + upgradeCaseFunc, hookFunc := shouldManipulateContainersInPodAfterUpgrade(t, previousProc.criRuntimeService(t), previousProc.criImageService(t)) + needToCleanup = false + require.Nil(t, hookFunc) + + t.Log("Gracefully stop previous release's containerd process") + require.NoError(t, previousProc.kill(syscall.SIGTERM)) + require.NoError(t, previousProc.wait(5*time.Minute)) + + t.Logf("%s should have bucket k8s.io in root", boltdbPath) + db, err := bbolt.Open(boltdbPath, 0600, &bbolt.Options{ReadOnly: true}) + require.NoError(t, err) + require.NoError(t, db.View(func(tx *bbolt.Tx) error { + if tx.Bucket([]byte("k8s.io")) == nil { + return fmt.Errorf("expected k8s.io bucket") + } + return nil + })) + require.NoError(t, db.Close()) + + t.Log("Install default config for current release") + currentReleaseCtrdDefaultConfig(t, workDir) + + t.Log("Starting the current release's containerd") + currentProc := newCtrdProc(t, "containerd", workDir, nil) + require.NoError(t, currentProc.isReady()) + + t.Cleanup(func() { + t.Log("Cleanup all the pods") + cleanupPods(t, currentProc.criRuntimeService(t)) + + t.Log("Stopping current release's containerd process") + require.NoError(t, currentProc.kill(syscall.SIGTERM)) + require.NoError(t, currentProc.wait(5*time.Minute)) + }) + + t.Logf("%s should not have bucket k8s.io in root after restart", boltdbPath) + copiedBoltdbPath := filepath.Join(t.TempDir(), "meta.db.new") + require.NoError(t, fs.CopyFile(copiedBoltdbPath, boltdbPath)) + + db, err = bbolt.Open(copiedBoltdbPath, 0600, &bbolt.Options{ReadOnly: true}) + require.NoError(t, err) + require.NoError(t, db.View(func(tx *bbolt.Tx) error { + if tx.Bucket([]byte("k8s.io")) != nil { + return fmt.Errorf("unexpected k8s.io bucket") + } + return nil + })) + require.NoError(t, db.Close()) + + t.Log("Verifing") + upgradeCaseFunc(t, currentProc.criRuntimeService(t), currentProc.criImageService(t)) +} diff --git a/integration/release_upgrade_linux_test.go b/integration/release_upgrade_linux_test.go index 5b2ad4cafc76..7938e6a2f07a 100644 --- a/integration/release_upgrade_linux_test.go +++ b/integration/release_upgrade_linux_test.go @@ -50,7 +50,7 @@ type beforeUpgradeHookFunc func(*testing.T) // TODO: Support Windows func TestUpgrade(t *testing.T) { previousReleaseBinDir := t.TempDir() - downloadPreviousReleaseBinary(t, previousReleaseBinDir) + downloadPreviousLatestReleaseBinary(t, previousReleaseBinDir) t.Run("recover", runUpgradeTestCase(previousReleaseBinDir, shouldRecoverAllThePodsAfterUpgrade)) t.Run("exec", runUpgradeTestCase(previousReleaseBinDir, execToExistingContainer)) @@ -73,7 +73,7 @@ func runUpgradeTestCase( t.Log("Starting the previous release's containerd") previousCtrdBinPath := filepath.Join(previousReleaseBinDir, "bin", "containerd") - previousProc := newCtrdProc(t, previousCtrdBinPath, workDir) + previousProc := newCtrdProc(t, previousCtrdBinPath, workDir, nil) ctrdLogPath := previousProc.logPath() t.Cleanup(func() { @@ -107,7 +107,7 @@ func runUpgradeTestCase( currentReleaseCtrdDefaultConfig(t, workDir) t.Log("Starting the current release's containerd") - currentProc := newCtrdProc(t, "containerd", workDir) + currentProc := newCtrdProc(t, "containerd", workDir, nil) require.NoError(t, currentProc.isReady()) t.Cleanup(func() { t.Log("Cleanup all the pods") @@ -658,7 +658,7 @@ func (p *ctrdProc) criImageService(t *testing.T) cri.ImageManagerService { } // newCtrdProc is to start containerd process. -func newCtrdProc(t *testing.T, ctrdBin string, ctrdWorkDir string) *ctrdProc { +func newCtrdProc(t *testing.T, ctrdBin string, ctrdWorkDir string, envs []string) *ctrdProc { p := &ctrdProc{workDir: ctrdWorkDir} var args []string @@ -673,6 +673,7 @@ func newCtrdProc(t *testing.T, ctrdBin string, ctrdWorkDir string) *ctrdProc { t.Cleanup(func() { f.Close() }) cmd := exec.Command(ctrdBin, args...) + cmd.Env = append(os.Environ(), envs...) cmd.Stdout = f cmd.Stderr = f cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL} diff --git a/integration/release_upgrade_utils_linux_test.go b/integration/release_upgrade_utils_linux_test.go index e788b09ce507..014bac705759 100644 --- a/integration/release_upgrade_utils_linux_test.go +++ b/integration/release_upgrade_utils_linux_test.go @@ -33,11 +33,16 @@ import ( "github.com/containerd/containerd/v2/version" ) -// downloadPreviousReleaseBinary downloads the latest version of previous release -// into the target dir. -func downloadPreviousReleaseBinary(t *testing.T, targetDir string) { +// downloadPreviousLatestReleaseBinary downloads the latest version of previous +// release into the target dir. +func downloadPreviousLatestReleaseBinary(t *testing.T, targetDir string) { ver := previousReleaseVersion(t) + downloadReleaseBinary(t, targetDir, ver) +} + +// downloadReleaseBinary downloads containerd binary with a given release. +func downloadReleaseBinary(t *testing.T, targetDir string, ver string) { targetURL := fmt.Sprintf("https://github.com/containerd/containerd/releases/download/%s/containerd-%s-linux-%s.tar.gz", ver, strings.TrimPrefix(ver, "v"), runtime.GOARCH, )