diff --git a/pkg/component/worker/ocibundle.go b/pkg/component/worker/ocibundle.go index e33f195f23d8..0d4ab095d667 100644 --- a/pkg/component/worker/ocibundle.go +++ b/pkg/component/worker/ocibundle.go @@ -26,18 +26,22 @@ import ( "github.com/avast/retry-go" "github.com/containerd/containerd" "github.com/containerd/containerd/platforms" + "github.com/fsnotify/fsnotify" + "github.com/sirupsen/logrus" + "github.com/k0sproject/k0s/internal/pkg/dir" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/component/prober" "github.com/k0sproject/k0s/pkg/config" "github.com/k0sproject/k0s/pkg/constant" - "github.com/sirupsen/logrus" + "github.com/k0sproject/k0s/pkg/debounce" ) // OCIBundleReconciler tries to import OCI bundle into the running containerd instance type OCIBundleReconciler struct { k0sVars *config.CfgVars log *logrus.Entry + loaded map[string]time.Time *prober.EventEmitter } @@ -49,6 +53,7 @@ func NewOCIBundleReconciler(vars *config.CfgVars) *OCIBundleReconciler { k0sVars: vars, log: logrus.WithField("component", "OCIBundleReconciler"), EventEmitter: prober.NewEventEmitter(), + loaded: make(map[string]time.Time), } } @@ -56,46 +61,129 @@ func (a *OCIBundleReconciler) Init(_ context.Context) error { return dir.Init(a.k0sVars.OCIBundleDir, constant.ManifestsDirMode) } -func (a *OCIBundleReconciler) Start(ctx context.Context) error { +// load loads all OCI bundle files into containerd. Read all files from the oci bundle +// directory and loads them only once. If the file is already loaded and hasn't changed +// it will skip it. Errors are logged but not returned, upon failure in one file this +// function logs the error and moves to the next file. +func (a *OCIBundleReconciler) load(ctx context.Context) { files, err := os.ReadDir(a.k0sVars.OCIBundleDir) if err != nil { + a.log.WithError(err).Errorf("can't read bundles directory") a.Emit("can't read bundles directory") - return fmt.Errorf("can't read bundles directory") + return } + a.EmitWithPayload("importing OCI bundles", files) if len(files) == 0 { - return nil + return } + var client *containerd.Client sock := filepath.Join(a.k0sVars.RunDir, "containerd.sock") - err = retry.Do(func() error { - client, err = containerd.New(sock, containerd.WithDefaultNamespace("k8s.io"), containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec()))) + if err := retry.Do(func() error { + client, err = containerd.New( + sock, + containerd.WithDefaultNamespace("k8s.io"), + containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec())), + ) if err != nil { a.log.WithError(err).Errorf("can't connect to containerd socket %s", sock) return err } - _, err := client.ListImages(ctx) - if err != nil { + + if _, err := client.ListImages(ctx); err != nil { a.log.WithError(err).Errorf("can't use containerd client") return err } return nil - }, retry.Context(ctx), retry.Delay(time.Second*5)) - if err != nil { - a.EmitWithPayload("can't connect to containerd socket", map[string]interface{}{"socket": sock, "error": err}) - return fmt.Errorf("can't connect to containerd socket %s: %w", sock, err) + }, retry.Context(ctx), retry.Delay(time.Second*5)); err != nil { + payload := map[string]interface{}{"socket": sock, "error": err} + a.EmitWithPayload("can't connect to containerd socket", payload) + return } defer client.Close() for _, file := range files { - if err := a.unpackBundle(ctx, client, a.k0sVars.OCIBundleDir+"/"+file.Name()); err != nil { - a.EmitWithPayload("unpacking OCI bundle error", map[string]interface{}{"file": file.Name(), "error": err}) + payload := map[string]interface{}{"file": file.Name(), "error": err} + fpath := filepath.Join(a.k0sVars.OCIBundleDir, file.Name()) + finfo, err := os.Stat(fpath) + if err != nil { + a.log.WithError(err).Errorf("can't stat file %s", fpath) + a.EmitWithPayload("can't stat file", payload) + continue + } + + if when, ok := a.loaded[file.Name()]; ok && when.Equal(finfo.ModTime()) { + continue + } + + if err := a.unpackBundle(ctx, client, fpath); err != nil { + a.EmitWithPayload("unpacking OCI bundle error", payload) a.log.WithError(err).Errorf("can't unpack bundle %s", file.Name()) - return fmt.Errorf("can't unpack bundle %s: %w", file.Name(), err) + continue } + + // if succeed in loading the bundle, remember the time. + a.loaded[file.Name()] = finfo.ModTime() a.EmitWithPayload("unpacked OCI bundle", file.Name()) } a.Emit("finished importing OCI bundle") +} + +// watch creates a fs watched on the oci bundle directory. This function calls load() anytime +// a new file is created on the directory or a write operation took place . Events are debounced +// with a timeout of 10 seconds. This function is blocking. +func (a *OCIBundleReconciler) watch(ctx context.Context) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + a.log.WithError(err).Error("failed to create watcher for OCI bundles") + return + } + defer watcher.Close() + + if err := watcher.Add(a.k0sVars.OCIBundleDir); err != nil { + a.log.WithError(err).Error("failed to watch for OCI bundles") + return + } + + debouncer := debounce.Debouncer[fsnotify.Event]{ + Input: watcher.Events, + Timeout: 10 * time.Second, + Filter: func(item fsnotify.Event) bool { + switch item.Op { + case fsnotify.Create, fsnotify.Write: + return true + default: + return false + } + }, + Callback: func(fsnotify.Event) { + a.log.Info("OCI bundle directory changed, reconciling") + a.load(ctx) + }, + } + + go func() { + for { + err, ok := <-watcher.Errors + if !ok { + return + } + a.log.WithError(err).Error("error while watching oci bundle directory") + } + }() + + a.log.Infof("started to watch events on %s", a.k0sVars.OCIBundleDir) + if err := debouncer.Run(ctx); err != nil { + a.log.WithError(err).Warn("oci bundle watch bouncer exited with error") + } +} + +// Starts initiate the OCI bundle loader. It does an initial load of the directory and +// once it is done, it starts a watcher on its own goroutine. +func (a *OCIBundleReconciler) Start(ctx context.Context) error { + a.load(ctx) + go a.watch(ctx) return nil }