diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index fd316b9fc..0b0d61498 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -843,6 +843,7 @@ func newController(c *cli.Context, cfg *config.Config, shutdownCh chan struct{}) LeaseManager: w.LeaseManager(), ContentStore: w.ContentStore(), HistoryConfig: cfg.History, + RootDir: cfg.Root, }) } diff --git a/control/control.go b/control/control.go index a9816692d..7b7a158e8 100644 --- a/control/control.go +++ b/control/control.go @@ -68,6 +68,7 @@ type Opt struct { LeaseManager *leaseutil.Manager ContentStore *containerdsnapshot.Store HistoryConfig *config.HistoryConfig + RootDir string } type Controller struct { // TODO: ControlService @@ -105,6 +106,7 @@ func NewController(opt Opt) (*Controller, error) { SessionManager: opt.SessionManager, Entitlements: opt.Entitlements, HistoryQueue: hq, + RootDir: opt.RootDir, }) if err != nil { return nil, errors.Wrap(err, "failed to create solver") diff --git a/solver/jobs.go b/solver/jobs.go index 665ebebb3..9dfb79f1b 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -256,8 +256,11 @@ type Job struct { } type SolverOpt struct { - ResolveOpFunc ResolveOpFunc - DefaultCache CacheManager + ResolveOpFunc ResolveOpFunc + DefaultCache CacheManager + WorkerResultGetter workerResultGetter + CommitRefFunc CommitRefFunc + RootDir string } func NewSolver(opts SolverOpt) *Solver { @@ -274,7 +277,11 @@ func NewSolver(opts SolverOpt) *Solver { // TODO: This should be hoisted up a few layers as not to be bound to the // original solver. For now, we just need a convenient place to initialize // it once. - simple := newSimpleSolver(opts.ResolveOpFunc, jl) + c, err := newDiskCache(opts.WorkerResultGetter, opts.RootDir) + if err != nil { + panic(err) // TODO: Handle error appropriately once the new solver code is moved. + } + simple := newSimpleSolver(opts.ResolveOpFunc, opts.CommitRefFunc, jl, c) jl.simple = simple jl.s = newScheduler(jl) @@ -613,6 +620,11 @@ func (j *Job) CloseProgress() { } func (j *Job) Discard() error { + // TMP: Hack to prevent actives map deletes. + if true { + return nil + } + j.list.mu.Lock() defer j.list.mu.Unlock() diff --git a/solver/llbsolver/provenance.go b/solver/llbsolver/provenance.go index eca2ac14d..ae794ee02 100644 --- a/solver/llbsolver/provenance.go +++ b/solver/llbsolver/provenance.go @@ -269,6 +269,9 @@ func captureProvenance(ctx context.Context, res solver.CachedResultWithProvenanc switch op := pp.(type) { case *ops.SourceOp: id, pin := op.Pin() + if pin == "" { // Hack: latest cache opt changes led to an empty value here. Investigate. + return nil + } err := id.Capture(c, pin) if err != nil { return err diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index ccc3504c0..5cc90aa5f 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -79,6 +79,7 @@ type Opt struct { WorkerController *worker.Controller HistoryQueue *HistoryQueue ResourceMonitor *resources.Monitor + RootDir string } type Solver struct { @@ -119,8 +120,11 @@ func New(opt Opt) (*Solver, error) { s.sysSampler = sampler s.solver = solver.NewSolver(solver.SolverOpt{ - ResolveOpFunc: s.resolver(), - DefaultCache: opt.CacheManager, + ResolveOpFunc: s.resolver(), + DefaultCache: opt.CacheManager, + WorkerResultGetter: worker.NewWorkerResultGetter(opt.WorkerController), + CommitRefFunc: worker.FinalizeRef, + RootDir: opt.RootDir, }) return s, nil } diff --git a/solver/simple.go b/solver/simple.go index ba5719505..3a01a5d55 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -6,31 +6,40 @@ import ( "fmt" "hash" "io" + "path/filepath" "sync" "time" + "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/tracing" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" ) +var ErrRefNotFound = errors.New("ref not found") + +// CommitRefFunc can be used to finalize a Result's ImmutableRef. +type CommitRefFunc func(ctx context.Context, result Result) error + type simpleSolver struct { resolveOpFunc ResolveOpFunc + commitRefFunc CommitRefFunc solver *Solver - job *Job parallelGuard *parallelGuard - resultCache *resultCache + resultCache resultCache cacheKeyManager *cacheKeyManager mu sync.Mutex } -func newSimpleSolver(resolveOpFunc ResolveOpFunc, solver *Solver) *simpleSolver { +func newSimpleSolver(resolveOpFunc ResolveOpFunc, commitRefFunc CommitRefFunc, solver *Solver, cache resultCache) *simpleSolver { return &simpleSolver{ cacheKeyManager: newCacheKeyManager(), - resultCache: newResultCache(), + resultCache: cache, parallelGuard: newParallelGuard(time.Millisecond * 100), resolveOpFunc: resolveOpFunc, + commitRefFunc: commitRefFunc, solver: solver, } } @@ -72,14 +81,18 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver // Required to access cache map results on state. st.op = op + // Add cache opts to context as they will be accessed by cache retrieval. + ctx = withAncestorCacheOpts(ctx, st) + // CacheMap populates required fields in SourceOp. cm, err := op.CacheMap(ctx, int(e.Index)) if err != nil { return nil, err } - inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap) + inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, job) if err != nil { + notifyError(ctx, st, false, err) return nil, err } @@ -88,10 +101,13 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, err } - if v, ok := s.resultCache.get(cacheKey); ok && v != nil { - ctx = progress.WithProgress(ctx, st.mpw) - notifyCompleted := notifyStarted(ctx, &st.clientVertex, true) - notifyCompleted(nil, true) + v, ok, err := s.resultCache.get(ctx, cacheKey) + if err != nil { + return nil, err + } + + if ok && v != nil { + notifyError(ctx, st, true, nil) return v, nil } @@ -100,13 +116,31 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, err } + // Ensure all results are finalized (committed to cache). It may be better + // to background these calls at some point. + for _, res := range results { + err = s.commitRefFunc(ctx, res) + if err != nil { + return nil, err + } + } + res := results[int(e.Index)] - s.resultCache.set(cacheKey, res) + err = s.resultCache.set(ctx, cacheKey, res) + if err != nil { + return nil, err + } return res, nil } +func notifyError(ctx context.Context, st *state, cached bool, err error) { + ctx = progress.WithProgress(ctx, st.mpw) + notifyCompleted := notifyStarted(ctx, &st.clientVertex, cached) + notifyCompleted(err, cached) +} + // createState creates a new state struct with required and placeholder values. func (s *simpleSolver) createState(vertex Vertex, job *Job) *state { defaultCache := NewInMemoryCacheManager() @@ -134,6 +168,14 @@ func (s *simpleSolver) createState(vertex Vertex, job *Job) *state { st.mpw.Add(job.pw) + // Hack: this is used in combination with withAncestorCacheOpts to pass + // necessary dependency information to a few caching components. We'll need + // to expire these keys somehow. We should also move away from using the + // actives map, but it's still being used by withAncestorCacheOpts for now. + s.solver.mu.Lock() + s.solver.actives[vertex.Digest()] = st + s.solver.mu.Unlock() + return st } @@ -164,29 +206,43 @@ func (s *simpleSolver) exploreVertices(e Edge) ([]digest.Digest, map[digest.Dige return ret, vertices } -func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap) ([]Result, error) { +func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) { // This struct is used to reconstruct a cache key from an LLB digest & all // parents using consistent digests that depend on the full dependency chain. - // TODO: handle cm.Opts (CacheOpts)? scm := simpleCacheMap{ digest: cm.Digest.String(), deps: make([]cacheMapDep, len(cm.Deps)), inputs: make([]string, len(cm.Deps)), } + // By default we generate a cache key that's not salted as the keys need to + // persist across builds. However, when cache is disabled, we scope the keys + // to the current session. This is because some jobs will be duplicated in a + // given build & will need to be cached in a limited way. + if vertex.Options().IgnoreCache { + scm.salt = job.SessionID + } + var inputs []Result for i, in := range vertex.Inputs() { + + digest := in.Vertex.Digest().String() + // Compute a cache key given the LLB digest value. - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, in.Vertex.Digest().String()) + cacheKey, err := s.cacheKeyManager.cacheKey(ctx, digest) if err != nil { return nil, err } // Lookup the result for that cache key. - res, ok := s.resultCache.get(cacheKey) + res, ok, err := s.resultCache.get(ctx, cacheKey) + if err != nil { + return nil, err + } + if !ok { - return nil, errors.Errorf("cache key not found: %s", cacheKey) + return nil, errors.Errorf("result not found for digest: %s", digest) } dep := cm.Deps[i] @@ -210,9 +266,11 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V if dep.ComputeDigestFunc != nil { compDigest, err := dep.ComputeDigestFunc(ctx, res, st) if err != nil { + bklog.G(ctx).Warnf("failed to compute digest: %v", err) return nil, err + } else { + scm.deps[i].computed = compDigest.String() } - scm.deps[i].computed = compDigest.String() } // Add input references to the struct as to link dependencies. @@ -242,6 +300,7 @@ type simpleCacheMap struct { digest string inputs []string deps []cacheMapDep + salt string } func newCacheKeyManager() *cacheKeyManager { @@ -258,10 +317,10 @@ func (m *cacheKeyManager) add(key string, s *simpleCacheMap) { // cacheKey recursively generates a cache key based on a sequence of ancestor // operations & their cacheable values. -func (m *cacheKeyManager) cacheKey(ctx context.Context, d string) (string, error) { +func (m *cacheKeyManager) cacheKey(ctx context.Context, digest string) (string, error) { h := sha256.New() - err := m.cacheKeyRecurse(ctx, d, h) + err := m.cacheKeyRecurse(ctx, digest, h) if err != nil { return "", err } @@ -277,6 +336,10 @@ func (m *cacheKeyManager) cacheKeyRecurse(ctx context.Context, d string, h hash. return errors.New("missing cache map key") } + if c.salt != "" { + io.WriteString(h, c.salt) + } + for _, in := range c.inputs { err := m.cacheKeyRecurse(ctx, in, h) if err != nil { @@ -351,24 +414,114 @@ func (f *parallelGuard) acquire(ctx context.Context, d string) (<-chan struct{}, return ch, closer } -type resultCache struct { +type resultCache interface { + set(ctx context.Context, key string, r Result) error + get(ctx context.Context, key string) (Result, bool, error) +} + +type inMemCache struct { cache map[string]Result mu sync.Mutex } -func newResultCache() *resultCache { - return &resultCache{cache: map[string]Result{}} +func newInMemCache() *inMemCache { + return &inMemCache{cache: map[string]Result{}} } -func (c *resultCache) set(key string, r Result) { +func (c *inMemCache) set(ctx context.Context, key string, r Result) error { c.mu.Lock() c.cache[key] = r c.mu.Unlock() + return nil } -func (c *resultCache) get(key string) (Result, bool) { +func (c *inMemCache) get(ctx context.Context, key string) (Result, bool, error) { c.mu.Lock() r, ok := c.cache[key] c.mu.Unlock() - return r, ok + return r, ok, nil } + +var _ resultCache = &inMemCache{} + +type diskCache struct { + resultGetter workerResultGetter + db *bolt.DB + bucketName string + rootDir string +} + +type workerResultGetter interface { + Get(ctx context.Context, id string) (Result, error) +} + +func newDiskCache(resultGetter workerResultGetter, rootDir string) (*diskCache, error) { + c := &diskCache{ + bucketName: "ids", + resultGetter: resultGetter, + rootDir: rootDir, + } + err := c.init() + if err != nil { + return nil, err + } + return c, nil +} + +func (c *diskCache) init() error { + db, err := bolt.Open(filepath.Join(c.rootDir, "ids.db"), 0755, nil) + if err != nil { + return err + } + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte("ids")) + return err + }) + if err != nil { + return err + } + c.db = db + return nil +} + +func (c *diskCache) set(ctx context.Context, key string, r Result) error { + return c.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(c.bucketName)) + return b.Put([]byte(key), []byte(r.ID())) + }) +} + +func (c *diskCache) get(ctx context.Context, key string) (Result, bool, error) { + var id string + err := c.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(c.bucketName)) + id = string(b.Get([]byte(key))) + return nil + }) + if err != nil { + return nil, false, err + } + if id == "" { + return nil, false, nil + } + res, err := c.resultGetter.Get(ctx, id) + if err != nil { + if errors.Is(err, ErrRefNotFound) { + if err := c.delete(ctx, key); err != nil { + bklog.G(ctx).Warnf("failed to delete cache key: %v", err) + } + return nil, false, nil + } + return nil, false, err + } + return res, true, nil +} + +func (c *diskCache) delete(_ context.Context, key string) error { + return c.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(c.bucketName)) + return b.Delete([]byte(key)) + }) +} + +var _ resultCache = &diskCache{} diff --git a/worker/simple.go b/worker/simple.go new file mode 100644 index 000000000..dec48e1d3 --- /dev/null +++ b/worker/simple.go @@ -0,0 +1,58 @@ +package worker + +import ( + "context" + + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/bklog" +) + +// WorkerResultGetter abstracts the work involved in loading a Result from a +// worker using a ref ID. +type WorkerResultGetter struct { + wc *Controller +} + +// NewWorkerResultGetter creates and returns a new *WorkerResultGetter. +func NewWorkerResultGetter(wc *Controller) *WorkerResultGetter { + return &WorkerResultGetter{wc: wc} +} + +// Get a cached results from a worker. +func (w *WorkerResultGetter) Get(ctx context.Context, id string) (solver.Result, error) { + workerID, refID, err := parseWorkerRef(id) + if err != nil { + return nil, err + } + + worker, err := w.wc.Get(workerID) + if err != nil { + return nil, err + } + + ref, err := worker.LoadRef(ctx, refID, false) + if err != nil { + if cache.IsNotFound(err) { + bklog.G(ctx).Warnf("could not load ref from worker: %v", err) + return nil, solver.ErrRefNotFound + } + return nil, err + } + + return NewWorkerRefResult(ref, worker), nil +} + +// FinalizeRef is a convenience function that calls Finalize on a Result's +// ImmutableRef. The 'worker' package cannot be imported by 'solver' due to an +// import cycle, so this function is passed in with solver.SolverOpt. +func FinalizeRef(ctx context.Context, res solver.Result) error { + sys := res.Sys() + if w, ok := sys.(*WorkerRef); ok { + err := w.ImmutableRef.Finalize(ctx) + if err != nil { + return err + } + } + return nil +}