diff --git a/pkg/component/worker/ocibundle.go b/pkg/component/worker/ocibundle.go index e33f195f23d8..5d2c4d5e04d3 100644 --- a/pkg/component/worker/ocibundle.go +++ b/pkg/component/worker/ocibundle.go @@ -17,21 +17,28 @@ limitations under the License. package worker import ( + "bufio" "context" + "errors" "fmt" + "io/fs" "os" "path/filepath" + "slices" + "sync/atomic" "time" "github.com/avast/retry-go" "github.com/containerd/containerd" "github.com/containerd/containerd/platforms" + "github.com/fsnotify/fsnotify" "github.com/k0sproject/k0s/internal/pkg/dir" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/component/prober" "github.com/k0sproject/k0s/pkg/config" "github.com/k0sproject/k0s/pkg/constant" "github.com/sirupsen/logrus" + "k8s.io/utils/ptr" ) // OCIBundleReconciler tries to import OCI bundle into the running containerd instance @@ -39,6 +46,8 @@ type OCIBundleReconciler struct { k0sVars *config.CfgVars log *logrus.Entry *prober.EventEmitter + + stop atomic.Pointer[func()] } var _ manager.Component = (*OCIBundleReconciler)(nil) @@ -56,18 +65,170 @@ func (a *OCIBundleReconciler) Init(_ context.Context) error { return dir.Init(a.k0sVars.OCIBundleDir, constant.ManifestsDirMode) } +// Run executes the initial apply and watches the stack for updates. func (a *OCIBundleReconciler) Start(ctx context.Context) error { - files, err := os.ReadDir(a.k0sVars.OCIBundleDir) + if ctx.Err() != nil { + return nil // The context is already done. + } + + bundleDir := a.k0sVars.OCIBundleDir + + runCtx, cancel := context.WithCancel(context.Background()) + started, done := make(chan error), make(chan struct{}) + if !a.stop.CompareAndSwap(nil, ptr.To(func() { cancel(); <-done })) { + return errors.New("already started/stopped") + } + + go func() { + defer close(done) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + started <- fmt.Errorf("failed to create watcher: %w", err) + return + } + defer watcher.Close() + + if err = watcher.Add(bundleDir); err != nil { + started <- fmt.Errorf("failed to watch %q: %w", bundleDir, err) + return + } + + importCtx, cancelImport := context.WithCancel(context.Background()) + defer cancelImport() + go func() { + select { + case <-ctx.Done(): + case <-runCtx.Done(): + } + cancelImport() + }() + + err = a.importBundleDir(importCtx) + started <- err + close(started) + if err != nil { + return + } + + t := time.NewTicker(10 * time.Second) + defer t.Stop() + + type queuedItem = struct { + name string + lastEvent time.Time + removed bool + } + + var queue []queuedItem + + for { + select { + case e, ok := <-watcher.Events: + if !ok { + a.log.Error("Events channel closed unexpectedly") + return + } + + queued := queuedItem{ + name: e.Name, + lastEvent: time.Now(), + removed: e.Has(fsnotify.Remove) || e.Has(fsnotify.Rename), + } + + if idx := slices.IndexFunc(queue, func(candidate queuedItem) bool { + return candidate.name == queued.name + }); idx >= 0 { + queue[idx] = queued + } else if !queued.removed { + queue = append(queue, queued) + } + + case err, ok := <-watcher.Errors: + if !ok { + a.log.Error("Errors channel closed unexpectedly") + return + } + a.log.WithError(err).Error("Error while watching bundle dir") + + case threshold := <-t.C: + select { + case <-runCtx.Done(): + return + default: + } + + var bundlePaths []string + var remaining []queuedItem + threshold = threshold.Add(-10 * time.Second) + for _, item := range queue { + switch { + case item.removed: + break + + case item.lastEvent.Before(threshold): + bundlePath := filepath.Join(bundleDir, item.name) + stat, err := os.Stat(bundlePath) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + a.log.WithError(err).Error("Failed to stat path") + } + break + } + if stat.IsDir() { + break + } + + bundlePaths = append(bundlePaths, filepath.Join(bundleDir, item.name)) + + default: + remaining = append(remaining, item) + } + } + + queue = remaining + if err := a.importBundles(ctx, bundlePaths); err != nil { + a.log.WithError(err).Error("Failed to import bundle paths ", bundlePaths) + } + + case <-runCtx.Done(): + return + } + } + }() + + return <-started +} + +func (a *OCIBundleReconciler) importBundleDir(ctx context.Context) error { + bundleDir := a.k0sVars.OCIBundleDir + files, err := os.ReadDir(bundleDir) if err != nil { a.Emit("can't read bundles directory") return fmt.Errorf("can't read bundles directory") } - a.EmitWithPayload("importing OCI bundles", files) - if len(files) == 0 { - return nil + + bundlePaths := make([]string, 0, len(files)) + for _, file := range files { + if file.IsDir() { + continue + } + bundlePaths = append(bundlePaths, filepath.Join(bundleDir, file.Name())) } - var client *containerd.Client + + a.EmitWithPayload("importing OCI bundles", bundlePaths) + + if len(bundlePaths) == 0 { + return a.importBundles(ctx, bundlePaths) + } + + return nil +} + +func (a *OCIBundleReconciler) importBundles(ctx context.Context, bundlePaths []string) (err error) { sock := filepath.Join(a.k0sVars.RunDir, "containerd.sock") + + var client *containerd.Client err = retry.Do(func() error { client, err = containerd.New(sock, containerd.WithDefaultNamespace("k8s.io"), containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec()))) if err != nil { @@ -87,25 +248,26 @@ func (a *OCIBundleReconciler) Start(ctx context.Context) error { } defer client.Close() - for _, file := range files { - if err := a.unpackBundle(ctx, client, a.k0sVars.OCIBundleDir+"/"+file.Name()); err != nil { - a.EmitWithPayload("unpacking OCI bundle error", map[string]interface{}{"file": file.Name(), "error": err}) - a.log.WithError(err).Errorf("can't unpack bundle %s", file.Name()) - return fmt.Errorf("can't unpack bundle %s: %w", file.Name(), err) + for _, bundlePath := range bundlePaths { + if err := a.unpackBundle(ctx, client, bundlePath); err != nil { + a.EmitWithPayload("unpacking OCI bundle error", map[string]interface{}{"file": bundlePath, "error": err}) + a.log.WithError(err).Errorf("can't unpack bundle %s", bundlePath) + return fmt.Errorf("can't unpack bundle %s: %w", bundlePath, err) } - a.EmitWithPayload("unpacked OCI bundle", file.Name()) + a.EmitWithPayload("unpacked OCI bundle", bundlePath) } a.Emit("finished importing OCI bundle") return nil } -func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error { +func (a *OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error { r, err := os.Open(bundlePath) if err != nil { return fmt.Errorf("can't open bundle file %s: %w", bundlePath, err) } defer r.Close() - images, err := client.Import(ctx, r) + + images, err := client.Import(ctx, bufio.NewReader(r)) if err != nil { return fmt.Errorf("can't import bundle: %w", err) } @@ -127,5 +289,10 @@ func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *container } func (a *OCIBundleReconciler) Stop() error { + stop := a.stop.Load() + if stop == nil { + return errors.New("not yet started") + } + (*stop)() return nil } diff --git a/pkg/component/worker/ocibundle_test.go b/pkg/component/worker/ocibundle_test.go new file mode 100644 index 000000000000..165c7ad6e5e0 --- /dev/null +++ b/pkg/component/worker/ocibundle_test.go @@ -0,0 +1,35 @@ +/* +Copyright 2021 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package worker + +import "testing" + +func TestXxx(t *testing.T) { + + x := []int{1, 2, 3, 4, 5} + + y := x[0:3] + + t.Log(x, y) + + z := append(y, -4) + + t.Log(x, y, z) + + t.Fail() + +}