From 24c41483877ebca66242e43255e6f84635d07a0e Mon Sep 17 00:00:00 2001 From: Mike Date: Wed, 3 Apr 2024 14:02:49 -0700 Subject: [PATCH 01/12] Cache IDs & fix BK result caching --- solver/jobs.go | 12 ++- solver/llbsolver/provenance.go | 3 + solver/llbsolver/solver.go | 6 +- solver/simple.go | 144 ++++++++++++++++++++++++++++++--- worker/base/worker.go | 4 + worker/simple.go | 57 +++++++++++++ 6 files changed, 208 insertions(+), 18 deletions(-) create mode 100644 worker/simple.go diff --git a/solver/jobs.go b/solver/jobs.go index 665ebebb3..e39c593b1 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -256,8 +256,10 @@ type Job struct { } type SolverOpt struct { - ResolveOpFunc ResolveOpFunc - DefaultCache CacheManager + ResolveOpFunc ResolveOpFunc + DefaultCache CacheManager + WorkerResultGetter workerResultGetter + CommitRefFunc CommitRefFunc } func NewSolver(opts SolverOpt) *Solver { @@ -274,7 +276,11 @@ func NewSolver(opts SolverOpt) *Solver { // TODO: This should be hoisted up a few layers as not to be bound to the // original solver. For now, we just need a convenient place to initialize // it once. - simple := newSimpleSolver(opts.ResolveOpFunc, jl) + c, err := newDiskCache(opts.WorkerResultGetter) + if err != nil { + panic(err) // TODO: Handle error appropriately once the new solver code is moved. + } + simple := newSimpleSolver(opts.ResolveOpFunc, opts.CommitRefFunc, jl, c) jl.simple = simple jl.s = newScheduler(jl) diff --git a/solver/llbsolver/provenance.go b/solver/llbsolver/provenance.go index eca2ac14d..ae794ee02 100644 --- a/solver/llbsolver/provenance.go +++ b/solver/llbsolver/provenance.go @@ -269,6 +269,9 @@ func captureProvenance(ctx context.Context, res solver.CachedResultWithProvenanc switch op := pp.(type) { case *ops.SourceOp: id, pin := op.Pin() + if pin == "" { // Hack: latest cache opt changes led to an empty value here. Investigate. + return nil + } err := id.Capture(c, pin) if err != nil { return err diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index ccc3504c0..e7528e8f5 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -119,8 +119,10 @@ func New(opt Opt) (*Solver, error) { s.sysSampler = sampler s.solver = solver.NewSolver(solver.SolverOpt{ - ResolveOpFunc: s.resolver(), - DefaultCache: opt.CacheManager, + ResolveOpFunc: s.resolver(), + DefaultCache: opt.CacheManager, + WorkerResultGetter: worker.NewWorkerResultGetter(opt.WorkerController), + CommitRefFunc: worker.FinalizeRef, }) return s, nil } diff --git a/solver/simple.go b/solver/simple.go index ba5719505..1d7cf8260 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -9,28 +9,35 @@ import ( "sync" "time" + "github.com/docker/docker/errdefs" + "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/tracing" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" ) +type CommitRefFunc func(ctx context.Context, result Result) error + type simpleSolver struct { resolveOpFunc ResolveOpFunc + commitRefFunc CommitRefFunc solver *Solver job *Job parallelGuard *parallelGuard - resultCache *resultCache + resultCache resultCache cacheKeyManager *cacheKeyManager mu sync.Mutex } -func newSimpleSolver(resolveOpFunc ResolveOpFunc, solver *Solver) *simpleSolver { +func newSimpleSolver(resolveOpFunc ResolveOpFunc, commitRefFunc CommitRefFunc, solver *Solver, cache resultCache) *simpleSolver { return &simpleSolver{ cacheKeyManager: newCacheKeyManager(), - resultCache: newResultCache(), + resultCache: cache, parallelGuard: newParallelGuard(time.Millisecond * 100), resolveOpFunc: resolveOpFunc, + commitRefFunc: commitRefFunc, solver: solver, } } @@ -72,6 +79,9 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver // Required to access cache map results on state. st.op = op + // Add cache opts to context as they will be accessed by cache retrieval. + ctx = withAncestorCacheOpts(ctx, st) + // CacheMap populates required fields in SourceOp. cm, err := op.CacheMap(ctx, int(e.Index)) if err != nil { @@ -88,7 +98,12 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, err } - if v, ok := s.resultCache.get(cacheKey); ok && v != nil { + v, ok, err := s.resultCache.get(ctx, cacheKey) + if err != nil { + return nil, err + } + + if ok && v != nil { ctx = progress.WithProgress(ctx, st.mpw) notifyCompleted := notifyStarted(ctx, &st.clientVertex, true) notifyCompleted(nil, true) @@ -100,9 +115,21 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, err } + // Ensure all results are finalized (committed to cache). It may be better + // to background these calls at some point. + for _, res := range results { + err = s.commitRefFunc(ctx, res) + if err != nil { + return nil, err + } + } + res := results[int(e.Index)] - s.resultCache.set(cacheKey, res) + err = s.resultCache.set(ctx, cacheKey, res) + if err != nil { + return nil, err + } return res, nil } @@ -134,6 +161,14 @@ func (s *simpleSolver) createState(vertex Vertex, job *Job) *state { st.mpw.Add(job.pw) + // Hack: this is used in combination with withAncestorCacheOpts to pass + // necessary dependency information to a few caching components. We'll need + // to expire these keys somehow. We should also move away from using the + // actives map, but it's still being used by withAncestorCacheOpts for now. + s.solver.mu.Lock() + s.solver.actives[vertex.Digest()] = st + s.solver.mu.Unlock() + return st } @@ -167,7 +202,6 @@ func (s *simpleSolver) exploreVertices(e Edge) ([]digest.Digest, map[digest.Dige func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap) ([]Result, error) { // This struct is used to reconstruct a cache key from an LLB digest & all // parents using consistent digests that depend on the full dependency chain. - // TODO: handle cm.Opts (CacheOpts)? scm := simpleCacheMap{ digest: cm.Digest.String(), deps: make([]cacheMapDep, len(cm.Deps)), @@ -184,7 +218,11 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V } // Lookup the result for that cache key. - res, ok := s.resultCache.get(cacheKey) + res, ok, err := s.resultCache.get(ctx, cacheKey) + if err != nil { + return nil, err + } + if !ok { return nil, errors.Errorf("cache key not found: %s", cacheKey) } @@ -351,24 +389,104 @@ func (f *parallelGuard) acquire(ctx context.Context, d string) (<-chan struct{}, return ch, closer } -type resultCache struct { +type resultCache interface { + set(ctx context.Context, key string, r Result) error + get(ctx context.Context, key string) (Result, bool, error) +} + +type inMemCache struct { cache map[string]Result mu sync.Mutex } -func newResultCache() *resultCache { - return &resultCache{cache: map[string]Result{}} +func newInMemCache() *inMemCache { + return &inMemCache{cache: map[string]Result{}} } -func (c *resultCache) set(key string, r Result) { +func (c *inMemCache) set(ctx context.Context, key string, r Result) error { c.mu.Lock() c.cache[key] = r c.mu.Unlock() + return nil } -func (c *resultCache) get(key string) (Result, bool) { +func (c *inMemCache) get(ctx context.Context, key string) (Result, bool, error) { c.mu.Lock() r, ok := c.cache[key] c.mu.Unlock() - return r, ok + return r, ok, nil +} + +var _ resultCache = &inMemCache{} + +type diskCache struct { + resultGetter workerResultGetter + db *bolt.DB + bucketName string } + +type workerResultGetter interface { + Get(ctx context.Context, id string) (Result, error) +} + +func newDiskCache(resultGetter workerResultGetter) (*diskCache, error) { + c := &diskCache{ + bucketName: "ids", + resultGetter: resultGetter, + } + err := c.init() + if err != nil { + return nil, err + } + return c, nil +} + +func (c *diskCache) init() error { + // TODO: pass in root config directory. + db, err := bolt.Open("/tmp/earthly/buildkit/simple.db", 0600, nil) + if err != nil { + return err + } + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte("ids")) + return err + }) + if err != nil { + return err + } + c.db = db + return nil +} + +func (c *diskCache) set(ctx context.Context, key string, r Result) error { + return c.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(c.bucketName)) + return b.Put([]byte(key), []byte(r.ID())) + }) +} + +func (c *diskCache) get(ctx context.Context, key string) (Result, bool, error) { + var id string + err := c.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(c.bucketName)) + id = string(b.Get([]byte(key))) + return nil + }) + if err != nil { + return nil, false, err + } + if id == "" { + return nil, false, nil + } + res, err := c.resultGetter.Get(ctx, id) + if err != nil { + if errdefs.IsNotFound(err) { + bklog.G(ctx).Warnf("failed to get cached result from worker: %v", err) + return nil, false, nil + } + return nil, false, err + } + return res, true, nil +} + +var _ resultCache = &diskCache{} diff --git a/worker/base/worker.go b/worker/base/worker.go index e98f76214..d44521988 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -13,6 +13,7 @@ import ( "github.com/containerd/containerd/images" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes/docker" + "github.com/davecgh/go-spew/spew" "github.com/docker/docker/pkg/idtools" "github.com/hashicorp/go-multierror" "github.com/moby/buildkit/cache" @@ -297,6 +298,8 @@ func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.Imm ref, err := w.CacheMgr.Get(ctx, id, pg, opts...) var needsRemoteProviders cache.NeedsRemoteProviderError if errors.As(err, &needsRemoteProviders) { + fmt.Println("Trying again with cache opts") + if optGetter := solver.CacheOptGetterOf(ctx); optGetter != nil { var keys []interface{} for _, dgst := range needsRemoteProviders { @@ -310,6 +313,7 @@ func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.Imm } } } + spew.Dump(descHandlers) opts = append(opts, descHandlers) ref, err = w.CacheMgr.Get(ctx, id, pg, opts...) } diff --git a/worker/simple.go b/worker/simple.go new file mode 100644 index 000000000..7565d7b74 --- /dev/null +++ b/worker/simple.go @@ -0,0 +1,57 @@ +package worker + +import ( + "context" + + "github.com/containerd/nydus-snapshotter/pkg/errdefs" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/solver" +) + +// WorkerResultGetter abstracts the work involved in loading a Result from a +// worker using a ref ID. +type WorkerResultGetter struct { + wc *Controller +} + +// NewWorkerResultGetter creates and returns a new *WorkerResultGetter. +func NewWorkerResultGetter(wc *Controller) *WorkerResultGetter { + return &WorkerResultGetter{wc: wc} +} + +// Get a cached results from a worker. +func (w *WorkerResultGetter) Get(ctx context.Context, id string) (solver.Result, error) { + workerID, refID, err := parseWorkerRef(id) + if err != nil { + return nil, err + } + + worker, err := w.wc.Get(workerID) + if err != nil { + return nil, err + } + + ref, err := worker.LoadRef(ctx, refID, false) + if err != nil { + if cache.IsNotFound(err) { + return nil, errdefs.ErrNotFound + } + return nil, err + } + + return NewWorkerRefResult(ref, worker), nil +} + +// FinalizeRef is a convenience function that calls Finalize on a Result's +// ImmutableRef. The 'worker' package cannot be imported by 'solver' due to an +// import cycle, so this function is passed in with solver.SolverOpt. +func FinalizeRef(ctx context.Context, res solver.Result) error { + sys := res.Sys() + if w, ok := sys.(*WorkerRef); ok { + err := w.ImmutableRef.Finalize(ctx) + if err != nil { + return err + } + } + return nil +} From 9b06a9798f96bea2e6bb58974dfd9ebd54f78eb8 Mon Sep 17 00:00:00 2001 From: Mike Date: Wed, 3 Apr 2024 14:05:05 -0700 Subject: [PATCH 02/12] Remove debug statements --- worker/base/worker.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/worker/base/worker.go b/worker/base/worker.go index d44521988..e98f76214 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -13,7 +13,6 @@ import ( "github.com/containerd/containerd/images" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes/docker" - "github.com/davecgh/go-spew/spew" "github.com/docker/docker/pkg/idtools" "github.com/hashicorp/go-multierror" "github.com/moby/buildkit/cache" @@ -298,8 +297,6 @@ func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.Imm ref, err := w.CacheMgr.Get(ctx, id, pg, opts...) var needsRemoteProviders cache.NeedsRemoteProviderError if errors.As(err, &needsRemoteProviders) { - fmt.Println("Trying again with cache opts") - if optGetter := solver.CacheOptGetterOf(ctx); optGetter != nil { var keys []interface{} for _, dgst := range needsRemoteProviders { @@ -313,7 +310,6 @@ func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.Imm } } } - spew.Dump(descHandlers) opts = append(opts, descHandlers) ref, err = w.CacheMgr.Get(ctx, id, pg, opts...) } From dfe4db5d16a0ab3282bc77e067486649fd06491c Mon Sep 17 00:00:00 2001 From: Mike Date: Wed, 3 Apr 2024 14:06:38 -0700 Subject: [PATCH 03/12] Added comment --- solver/simple.go | 1 + 1 file changed, 1 insertion(+) diff --git a/solver/simple.go b/solver/simple.go index 1d7cf8260..e85d5fd62 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -18,6 +18,7 @@ import ( bolt "go.etcd.io/bbolt" ) +// CommitRefFunc can be used to finalize a Result's ImmutableRef. type CommitRefFunc func(ctx context.Context, result Result) error type simpleSolver struct { From 0e34c18492033f95ff1939227cbd35a361bbc638 Mon Sep 17 00:00:00 2001 From: Mike Date: Wed, 3 Apr 2024 16:37:09 -0700 Subject: [PATCH 04/12] Fix error check --- solver/simple.go | 18 ++++++++++++++---- worker/simple.go | 3 +-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/solver/simple.go b/solver/simple.go index e85d5fd62..aab6e12b0 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/docker/docker/errdefs" "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/tracing" @@ -18,6 +17,8 @@ import ( bolt "go.etcd.io/bbolt" ) +var ErrRefNotFound = errors.New("ref not found") + // CommitRefFunc can be used to finalize a Result's ImmutableRef. type CommitRefFunc func(ctx context.Context, result Result) error @@ -444,7 +445,7 @@ func newDiskCache(resultGetter workerResultGetter) (*diskCache, error) { func (c *diskCache) init() error { // TODO: pass in root config directory. - db, err := bolt.Open("/tmp/earthly/buildkit/simple.db", 0600, nil) + db, err := bolt.Open("/tmp/earthly/buildkit/simple.db", 0755, nil) if err != nil { return err } @@ -481,8 +482,10 @@ func (c *diskCache) get(ctx context.Context, key string) (Result, bool, error) { } res, err := c.resultGetter.Get(ctx, id) if err != nil { - if errdefs.IsNotFound(err) { - bklog.G(ctx).Warnf("failed to get cached result from worker: %v", err) + if errors.Is(err, ErrRefNotFound) { + if err := c.delete(ctx, key); err != nil { + bklog.G(ctx).Warnf("failed to delete cache key: %v", err) + } return nil, false, nil } return nil, false, err @@ -490,4 +493,11 @@ func (c *diskCache) get(ctx context.Context, key string) (Result, bool, error) { return res, true, nil } +func (c *diskCache) delete(_ context.Context, key string) error { + return c.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(c.bucketName)) + return b.Delete([]byte(key)) + }) +} + var _ resultCache = &diskCache{} diff --git a/worker/simple.go b/worker/simple.go index 7565d7b74..c2ebb6fda 100644 --- a/worker/simple.go +++ b/worker/simple.go @@ -3,7 +3,6 @@ package worker import ( "context" - "github.com/containerd/nydus-snapshotter/pkg/errdefs" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/solver" ) @@ -34,7 +33,7 @@ func (w *WorkerResultGetter) Get(ctx context.Context, id string) (solver.Result, ref, err := worker.LoadRef(ctx, refID, false) if err != nil { if cache.IsNotFound(err) { - return nil, errdefs.ErrNotFound + return nil, solver.ErrRefNotFound } return nil, err } From cf6f1e9371d2cd75aaca3a665f4a36fce2168432 Mon Sep 17 00:00:00 2001 From: Mike Date: Wed, 3 Apr 2024 17:30:01 -0700 Subject: [PATCH 05/12] Pass in root dir --- cmd/buildkitd/main.go | 1 + control/control.go | 2 ++ solver/jobs.go | 3 ++- solver/llbsolver/solver.go | 2 ++ solver/simple.go | 8 +++++--- 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index fd316b9fc..0b0d61498 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -843,6 +843,7 @@ func newController(c *cli.Context, cfg *config.Config, shutdownCh chan struct{}) LeaseManager: w.LeaseManager(), ContentStore: w.ContentStore(), HistoryConfig: cfg.History, + RootDir: cfg.Root, }) } diff --git a/control/control.go b/control/control.go index a9816692d..7b7a158e8 100644 --- a/control/control.go +++ b/control/control.go @@ -68,6 +68,7 @@ type Opt struct { LeaseManager *leaseutil.Manager ContentStore *containerdsnapshot.Store HistoryConfig *config.HistoryConfig + RootDir string } type Controller struct { // TODO: ControlService @@ -105,6 +106,7 @@ func NewController(opt Opt) (*Controller, error) { SessionManager: opt.SessionManager, Entitlements: opt.Entitlements, HistoryQueue: hq, + RootDir: opt.RootDir, }) if err != nil { return nil, errors.Wrap(err, "failed to create solver") diff --git a/solver/jobs.go b/solver/jobs.go index e39c593b1..5116644d1 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -260,6 +260,7 @@ type SolverOpt struct { DefaultCache CacheManager WorkerResultGetter workerResultGetter CommitRefFunc CommitRefFunc + RootDir string } func NewSolver(opts SolverOpt) *Solver { @@ -276,7 +277,7 @@ func NewSolver(opts SolverOpt) *Solver { // TODO: This should be hoisted up a few layers as not to be bound to the // original solver. For now, we just need a convenient place to initialize // it once. - c, err := newDiskCache(opts.WorkerResultGetter) + c, err := newDiskCache(opts.WorkerResultGetter, opts.RootDir) if err != nil { panic(err) // TODO: Handle error appropriately once the new solver code is moved. } diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index e7528e8f5..5cc90aa5f 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -79,6 +79,7 @@ type Opt struct { WorkerController *worker.Controller HistoryQueue *HistoryQueue ResourceMonitor *resources.Monitor + RootDir string } type Solver struct { @@ -123,6 +124,7 @@ func New(opt Opt) (*Solver, error) { DefaultCache: opt.CacheManager, WorkerResultGetter: worker.NewWorkerResultGetter(opt.WorkerController), CommitRefFunc: worker.FinalizeRef, + RootDir: opt.RootDir, }) return s, nil } diff --git a/solver/simple.go b/solver/simple.go index aab6e12b0..b68e9cd0f 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -6,6 +6,7 @@ import ( "fmt" "hash" "io" + "path/filepath" "sync" "time" @@ -425,16 +426,18 @@ type diskCache struct { resultGetter workerResultGetter db *bolt.DB bucketName string + rootDir string } type workerResultGetter interface { Get(ctx context.Context, id string) (Result, error) } -func newDiskCache(resultGetter workerResultGetter) (*diskCache, error) { +func newDiskCache(resultGetter workerResultGetter, rootDir string) (*diskCache, error) { c := &diskCache{ bucketName: "ids", resultGetter: resultGetter, + rootDir: rootDir, } err := c.init() if err != nil { @@ -444,8 +447,7 @@ func newDiskCache(resultGetter workerResultGetter) (*diskCache, error) { } func (c *diskCache) init() error { - // TODO: pass in root config directory. - db, err := bolt.Open("/tmp/earthly/buildkit/simple.db", 0755, nil) + db, err := bolt.Open(filepath.Join(c.rootDir, "ids.db"), 0755, nil) if err != nil { return err } From 0e52077c9c37900ec1a3b4e5a0599e5d954d8ec0 Mon Sep 17 00:00:00 2001 From: Mike Date: Wed, 3 Apr 2024 21:36:35 -0700 Subject: [PATCH 06/12] Don't wait on compute digest fail --- solver/simple.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/solver/simple.go b/solver/simple.go index b68e9cd0f..363b34782 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -251,9 +251,10 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V if dep.ComputeDigestFunc != nil { compDigest, err := dep.ComputeDigestFunc(ctx, res, st) if err != nil { - return nil, err + bklog.G(ctx).Warnf("failed to compute digest: %v", err) + } else { + scm.deps[i].computed = compDigest.String() } - scm.deps[i].computed = compDigest.String() } // Add input references to the struct as to link dependencies. From 7f19ba4c02c97df99d9109cde6ff785b298b73cb Mon Sep 17 00:00:00 2001 From: Mike Date: Thu, 4 Apr 2024 12:58:53 -0700 Subject: [PATCH 07/12] Only pass opts to get --- solver/simple.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/solver/simple.go b/solver/simple.go index 363b34782..cf49dea37 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -83,7 +83,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver st.op = op // Add cache opts to context as they will be accessed by cache retrieval. - ctx = withAncestorCacheOpts(ctx, st) + cacheOptsCtx := withAncestorCacheOpts(ctx, st) // CacheMap populates required fields in SourceOp. cm, err := op.CacheMap(ctx, int(e.Index)) @@ -101,7 +101,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, err } - v, ok, err := s.resultCache.get(ctx, cacheKey) + v, ok, err := s.resultCache.get(cacheOptsCtx, cacheKey) if err != nil { return nil, err } From 1aee4b2a70cdc54ba262aca48143a4a6fa333a0c Mon Sep 17 00:00:00 2001 From: Mike Date: Thu, 4 Apr 2024 13:33:27 -0700 Subject: [PATCH 08/12] Revert --- solver/jobs.go | 2 ++ solver/simple.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index 5116644d1..a156b7b6b 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/davecgh/go-spew/spew" "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" @@ -954,6 +955,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, }() res, err := op.Exec(ctx, s.st, inputs) + spew.Dump(err) complete := true if err != nil { select { diff --git a/solver/simple.go b/solver/simple.go index cf49dea37..363b34782 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -83,7 +83,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver st.op = op // Add cache opts to context as they will be accessed by cache retrieval. - cacheOptsCtx := withAncestorCacheOpts(ctx, st) + ctx = withAncestorCacheOpts(ctx, st) // CacheMap populates required fields in SourceOp. cm, err := op.CacheMap(ctx, int(e.Index)) @@ -101,7 +101,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, err } - v, ok, err := s.resultCache.get(cacheOptsCtx, cacheKey) + v, ok, err := s.resultCache.get(ctx, cacheKey) if err != nil { return nil, err } From 69097fdd10a28c13d19d79c0b0e4032018f07ffd Mon Sep 17 00:00:00 2001 From: Mike Date: Thu, 4 Apr 2024 17:18:44 -0700 Subject: [PATCH 09/12] Fix not found display problem --- solver/jobs.go | 5 +++++ solver/simple.go | 12 +++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index a156b7b6b..ad9889fa3 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -621,6 +621,11 @@ func (j *Job) CloseProgress() { } func (j *Job) Discard() error { + // TMP: Hack to prevent actives map deletes. + if true { + return nil + } + j.list.mu.Lock() defer j.list.mu.Unlock() diff --git a/solver/simple.go b/solver/simple.go index 363b34782..a93bf15dd 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -93,6 +93,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap) if err != nil { + notifyError(ctx, st, false, err) return nil, err } @@ -107,9 +108,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver } if ok && v != nil { - ctx = progress.WithProgress(ctx, st.mpw) - notifyCompleted := notifyStarted(ctx, &st.clientVertex, true) - notifyCompleted(nil, true) + notifyError(ctx, st, true, nil) return v, nil } @@ -137,6 +136,12 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return res, nil } +func notifyError(ctx context.Context, st *state, cached bool, err error) { + ctx = progress.WithProgress(ctx, st.mpw) + notifyCompleted := notifyStarted(ctx, &st.clientVertex, cached) + notifyCompleted(err, cached) +} + // createState creates a new state struct with required and placeholder values. func (s *simpleSolver) createState(vertex Vertex, job *Job) *state { defaultCache := NewInMemoryCacheManager() @@ -252,6 +257,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V compDigest, err := dep.ComputeDigestFunc(ctx, res, st) if err != nil { bklog.G(ctx).Warnf("failed to compute digest: %v", err) + return nil, err } else { scm.deps[i].computed = compDigest.String() } From 470d3fbecab55458e866e6cca19e160a8fdc92c3 Mon Sep 17 00:00:00 2001 From: Mike Date: Fri, 5 Apr 2024 10:14:30 -0700 Subject: [PATCH 10/12] Remove debug --- solver/jobs.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index ad9889fa3..9dfb79f1b 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/davecgh/go-spew/spew" "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" @@ -960,7 +959,6 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, }() res, err := op.Exec(ctx, s.st, inputs) - spew.Dump(err) complete := true if err != nil { select { From faff83060c28bab86bc9d00d0f9c54f951606246 Mon Sep 17 00:00:00 2001 From: Mike Date: Fri, 5 Apr 2024 14:31:22 -0700 Subject: [PATCH 11/12] Fixes --no-cache --- solver/simple.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/solver/simple.go b/solver/simple.go index a93bf15dd..425f5c1a8 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -91,13 +91,22 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, err } - inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap) + // By default we generate a cache key that's not salted as the keys need to + // persist across builds. However, when cache is disabled, we scope the keys + // to the current session. This is because some jobs will be duplicated in a + // given build & will need to be cached in a limited way. + salt := "" + if op.IgnoreCache() { + salt = job.SessionID + } + + inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, salt) if err != nil { notifyError(ctx, st, false, err) return nil, err } - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d.String()) + cacheKey, err := s.cacheKeyManager.cacheKey(ctx, salt, d.String()) if err != nil { return nil, err } @@ -207,7 +216,7 @@ func (s *simpleSolver) exploreVertices(e Edge) ([]digest.Digest, map[digest.Dige return ret, vertices } -func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap) ([]Result, error) { +func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap, salt string) ([]Result, error) { // This struct is used to reconstruct a cache key from an LLB digest & all // parents using consistent digests that depend on the full dependency chain. scm := simpleCacheMap{ @@ -220,7 +229,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V for i, in := range vertex.Inputs() { // Compute a cache key given the LLB digest value. - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, in.Vertex.Digest().String()) + cacheKey, err := s.cacheKeyManager.cacheKey(ctx, salt, in.Vertex.Digest().String()) if err != nil { return nil, err } @@ -306,10 +315,12 @@ func (m *cacheKeyManager) add(key string, s *simpleCacheMap) { // cacheKey recursively generates a cache key based on a sequence of ancestor // operations & their cacheable values. -func (m *cacheKeyManager) cacheKey(ctx context.Context, d string) (string, error) { +func (m *cacheKeyManager) cacheKey(ctx context.Context, salt, digest string) (string, error) { h := sha256.New() - err := m.cacheKeyRecurse(ctx, d, h) + io.WriteString(h, salt) + + err := m.cacheKeyRecurse(ctx, digest, h) if err != nil { return "", err } From 636787bfcb14941ef3b1360e057cf64f9051abd0 Mon Sep 17 00:00:00 2001 From: Mike Date: Mon, 8 Apr 2024 17:03:34 -0700 Subject: [PATCH 12/12] Fixes issue with ignore cache breaking cache key --- solver/simple.go | 40 ++++++++++++++++++++++------------------ worker/simple.go | 2 ++ 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/solver/simple.go b/solver/simple.go index 425f5c1a8..3a01a5d55 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -27,7 +27,6 @@ type simpleSolver struct { resolveOpFunc ResolveOpFunc commitRefFunc CommitRefFunc solver *Solver - job *Job parallelGuard *parallelGuard resultCache resultCache cacheKeyManager *cacheKeyManager @@ -91,22 +90,13 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, err } - // By default we generate a cache key that's not salted as the keys need to - // persist across builds. However, when cache is disabled, we scope the keys - // to the current session. This is because some jobs will be duplicated in a - // given build & will need to be cached in a limited way. - salt := "" - if op.IgnoreCache() { - salt = job.SessionID - } - - inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, salt) + inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, job) if err != nil { notifyError(ctx, st, false, err) return nil, err } - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, salt, d.String()) + cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d.String()) if err != nil { return nil, err } @@ -216,7 +206,7 @@ func (s *simpleSolver) exploreVertices(e Edge) ([]digest.Digest, map[digest.Dige return ret, vertices } -func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap, salt string) ([]Result, error) { +func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) { // This struct is used to reconstruct a cache key from an LLB digest & all // parents using consistent digests that depend on the full dependency chain. scm := simpleCacheMap{ @@ -225,11 +215,22 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V inputs: make([]string, len(cm.Deps)), } + // By default we generate a cache key that's not salted as the keys need to + // persist across builds. However, when cache is disabled, we scope the keys + // to the current session. This is because some jobs will be duplicated in a + // given build & will need to be cached in a limited way. + if vertex.Options().IgnoreCache { + scm.salt = job.SessionID + } + var inputs []Result for i, in := range vertex.Inputs() { + + digest := in.Vertex.Digest().String() + // Compute a cache key given the LLB digest value. - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, salt, in.Vertex.Digest().String()) + cacheKey, err := s.cacheKeyManager.cacheKey(ctx, digest) if err != nil { return nil, err } @@ -241,7 +242,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V } if !ok { - return nil, errors.Errorf("cache key not found: %s", cacheKey) + return nil, errors.Errorf("result not found for digest: %s", digest) } dep := cm.Deps[i] @@ -299,6 +300,7 @@ type simpleCacheMap struct { digest string inputs []string deps []cacheMapDep + salt string } func newCacheKeyManager() *cacheKeyManager { @@ -315,11 +317,9 @@ func (m *cacheKeyManager) add(key string, s *simpleCacheMap) { // cacheKey recursively generates a cache key based on a sequence of ancestor // operations & their cacheable values. -func (m *cacheKeyManager) cacheKey(ctx context.Context, salt, digest string) (string, error) { +func (m *cacheKeyManager) cacheKey(ctx context.Context, digest string) (string, error) { h := sha256.New() - io.WriteString(h, salt) - err := m.cacheKeyRecurse(ctx, digest, h) if err != nil { return "", err @@ -336,6 +336,10 @@ func (m *cacheKeyManager) cacheKeyRecurse(ctx context.Context, d string, h hash. return errors.New("missing cache map key") } + if c.salt != "" { + io.WriteString(h, c.salt) + } + for _, in := range c.inputs { err := m.cacheKeyRecurse(ctx, in, h) if err != nil { diff --git a/worker/simple.go b/worker/simple.go index c2ebb6fda..dec48e1d3 100644 --- a/worker/simple.go +++ b/worker/simple.go @@ -5,6 +5,7 @@ import ( "github.com/moby/buildkit/cache" "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/bklog" ) // WorkerResultGetter abstracts the work involved in loading a Result from a @@ -33,6 +34,7 @@ func (w *WorkerResultGetter) Get(ctx context.Context, id string) (solver.Result, ref, err := worker.LoadRef(ctx, refID, false) if err != nil { if cache.IsNotFound(err) { + bklog.G(ctx).Warnf("could not load ref from worker: %v", err) return nil, solver.ErrRefNotFound } return nil, err