Skip to content

Commit

Permalink
feat: implement watcher for oci bundles
Browse files Browse the repository at this point in the history
implements an oci bundle watcher. this allows k0s to load new bundles
without requiring a restart of the process. the watcher acts upon create
or write operations happening in the oci bundles directory.

events are debounced with a timeout of 10 seconds.

Signed-off-by: Ricardo Maraschini <[email protected]>
  • Loading branch information
ricardomaraschini committed Apr 22, 2024
1 parent b313016 commit 10e7f8f
Showing 1 changed file with 103 additions and 15 deletions.
118 changes: 103 additions & 15 deletions pkg/component/worker/ocibundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@ import (
"github.com/avast/retry-go"
"github.com/containerd/containerd"
"github.com/containerd/containerd/platforms"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"

"github.com/k0sproject/k0s/internal/pkg/dir"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/component/prober"
"github.com/k0sproject/k0s/pkg/config"
"github.com/k0sproject/k0s/pkg/constant"
"github.com/sirupsen/logrus"
"github.com/k0sproject/k0s/pkg/debounce"
)

// OCIBundleReconciler tries to import OCI bundle into the running containerd instance
type OCIBundleReconciler struct {
k0sVars *config.CfgVars
log *logrus.Entry
loaded map[string]time.Time
*prober.EventEmitter
}

Expand All @@ -49,53 +53,137 @@ func NewOCIBundleReconciler(vars *config.CfgVars) *OCIBundleReconciler {
k0sVars: vars,
log: logrus.WithField("component", "OCIBundleReconciler"),
EventEmitter: prober.NewEventEmitter(),
loaded: make(map[string]time.Time),
}
}

func (a *OCIBundleReconciler) Init(_ context.Context) error {
return dir.Init(a.k0sVars.OCIBundleDir, constant.ManifestsDirMode)
}

func (a *OCIBundleReconciler) Start(ctx context.Context) error {
// load loads all OCI bundle files into containerd. Read all files from the oci bundle
// directory and loads them only once. If the file is already loaded and hasn't changed
// it will skip it. Errors are logged but not returned, upon failure in one file this
// function logs the error and moves to the next file.
func (a *OCIBundleReconciler) load(ctx context.Context) {
files, err := os.ReadDir(a.k0sVars.OCIBundleDir)
if err != nil {
a.log.WithError(err).Errorf("can't read bundles directory")
a.Emit("can't read bundles directory")
return fmt.Errorf("can't read bundles directory")
return
}

a.EmitWithPayload("importing OCI bundles", files)
if len(files) == 0 {
return nil
return
}

var client *containerd.Client
sock := filepath.Join(a.k0sVars.RunDir, "containerd.sock")
err = retry.Do(func() error {
client, err = containerd.New(sock, containerd.WithDefaultNamespace("k8s.io"), containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec())))
if err := retry.Do(func() error {
client, err = containerd.New(
sock,
containerd.WithDefaultNamespace("k8s.io"),
containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec())),
)
if err != nil {
a.log.WithError(err).Errorf("can't connect to containerd socket %s", sock)
return err
}
_, err := client.ListImages(ctx)
if err != nil {

if _, err := client.ListImages(ctx); err != nil {
a.log.WithError(err).Errorf("can't use containerd client")
return err
}
return nil
}, retry.Context(ctx), retry.Delay(time.Second*5))
if err != nil {
a.EmitWithPayload("can't connect to containerd socket", map[string]interface{}{"socket": sock, "error": err})
return fmt.Errorf("can't connect to containerd socket %s: %w", sock, err)
}, retry.Context(ctx), retry.Delay(time.Second*5)); err != nil {
payload := map[string]interface{}{"socket": sock, "error": err}
a.EmitWithPayload("can't connect to containerd socket", payload)
return
}
defer client.Close()

for _, file := range files {
if err := a.unpackBundle(ctx, client, a.k0sVars.OCIBundleDir+"/"+file.Name()); err != nil {
a.EmitWithPayload("unpacking OCI bundle error", map[string]interface{}{"file": file.Name(), "error": err})
payload := map[string]interface{}{"file": file.Name(), "error": err}
fpath := filepath.Join(a.k0sVars.OCIBundleDir, file.Name())
finfo, err := os.Stat(fpath)
if err != nil {
a.log.WithError(err).Errorf("can't stat file %s", fpath)
a.EmitWithPayload("can't stat file", payload)
continue
}

if when, ok := a.loaded[file.Name()]; ok && when.Equal(finfo.ModTime()) {
continue
}

if err := a.unpackBundle(ctx, client, fpath); err != nil {
a.EmitWithPayload("unpacking OCI bundle error", payload)
a.log.WithError(err).Errorf("can't unpack bundle %s", file.Name())
return fmt.Errorf("can't unpack bundle %s: %w", file.Name(), err)
continue
}

// if succeed in loading the bundle, remember the time.
a.loaded[file.Name()] = finfo.ModTime()
a.EmitWithPayload("unpacked OCI bundle", file.Name())
}
a.Emit("finished importing OCI bundle")
}

// watch creates a fs watched on the oci bundle directory. This function calls load() anytime
// a new file is created on the directory or a write operation took place . Events are debounced
// with a timeout of 10 seconds. This function is blocking.
func (a *OCIBundleReconciler) watch(ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
a.log.WithError(err).Error("failed to create watcher for OCI bundles")
return
}
defer watcher.Close()

if err := watcher.Add(a.k0sVars.OCIBundleDir); err != nil {
a.log.WithError(err).Error("failed to watch for OCI bundles")
return
}

debouncer := debounce.Debouncer[fsnotify.Event]{
Input: watcher.Events,
Timeout: 10 * time.Second,
Filter: func(item fsnotify.Event) bool {
switch item.Op {
case fsnotify.Create, fsnotify.Write:
return true
default:
return false
}
},
Callback: func(fsnotify.Event) {
a.log.Info("OCI bundle directory changed, reconciling")
a.load(ctx)
},
}

go func() {
for {
err, ok := <-watcher.Errors
if !ok {
return
}
a.log.WithError(err).Error("error while watching oci bundle directory")
}
}()

a.log.Infof("started to watch events on %s", a.k0sVars.OCIBundleDir)
if err := debouncer.Run(ctx); err != nil {
a.log.WithError(err).Warn("oci bundle watch bouncer exited with error")
}
}

// Starts initiate the OCI bundle loader. It does an initial load of the directory and
// once it is done, it starts a watcher on its own goroutine.
func (a *OCIBundleReconciler) Start(ctx context.Context) error {
a.load(ctx)
go a.watch(ctx)
return nil
}

Expand Down

0 comments on commit 10e7f8f

Please sign in to comment.