Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune ref IDs and cache key digests #63

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ type SolverOpt struct {
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
ResultSource ResultSource
RefIDStore *RefIDStore
CommitRefFunc CommitRefFunc
IsRunOnceFunc IsRunOnceFunc
}
Expand All @@ -279,7 +278,6 @@ func NewSolver(opts SolverOpt) *Solver {
opts.ResolveOpFunc,
opts.CommitRefFunc,
solver,
opts.RefIDStore,
opts.ResultSource,
opts.IsRunOnceFunc,
)
Expand Down
5 changes: 2 additions & 3 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ func New(opt Opt) (*Solver, error) {
}
s.sysSampler = sampler

refIDStore, err := solver.NewRefIDStore(opt.RootDir)
workerSource, err := worker.NewWorkerResultSource(opt.WorkerController, opt.RootDir)
if err != nil {
return nil, err
}

sources := worker.NewCombinedResultSource(
worker.NewWorkerResultSource(opt.WorkerController, refIDStore),
workerSource,
remoteSource,
)

Expand All @@ -148,7 +148,6 @@ func New(opt Opt) (*Solver, error) {
DefaultCache: opt.CacheManager,
ResultSource: sources,
CommitRefFunc: worker.FinalizeRef,
RefIDStore: refIDStore,
})
return s, nil
}
Expand Down
124 changes: 25 additions & 99 deletions solver/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"hash"
"io"
"path/filepath"
"sync"
"time"

Expand All @@ -16,7 +15,6 @@ import (
"github.com/moby/buildkit/util/tracing"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)

const (
Expand All @@ -35,6 +33,7 @@ type IsRunOnceFunc func(Vertex, Builder) (bool, error)
// Result using a cache key digest.
type ResultSource interface {
Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error)
Link(ctx context.Context, cacheKey digest.Digest, refID string) error
}

// runOnceCtrl is a simple wrapper around an LRU cache. It's used to ensure that
Expand Down Expand Up @@ -67,35 +66,30 @@ func (s *runOnceCtrl) hasRun(d digest.Digest, sessionID string) bool {
}

type simpleSolver struct {
resolveOpFunc ResolveOpFunc
isRunOnceFunc IsRunOnceFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
refIDStore *RefIDStore
resultSource ResultSource
cacheKeyManager *cacheKeyManager
runOnceCtrl *runOnceCtrl
resolveOpFunc ResolveOpFunc
isRunOnceFunc IsRunOnceFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
resultSource ResultSource
runOnceCtrl *runOnceCtrl
}

func newSimpleSolver(
resolveOpFunc ResolveOpFunc,
commitRefFunc CommitRefFunc,
solver *Solver,
refIDStore *RefIDStore,
resultSource ResultSource,
isRunOnceFunc IsRunOnceFunc,
) *simpleSolver {
return &simpleSolver{
cacheKeyManager: newCacheKeyManager(),
parallelGuard: newParallelGuard(parallelGuardWait),
resolveOpFunc: resolveOpFunc,
commitRefFunc: commitRefFunc,
solver: solver,
refIDStore: refIDStore,
resultSource: resultSource,
isRunOnceFunc: isRunOnceFunc,
runOnceCtrl: newRunOnceCtrl(),
parallelGuard: newParallelGuard(parallelGuardWait),
resolveOpFunc: resolveOpFunc,
commitRefFunc: commitRefFunc,
solver: solver,
resultSource: resultSource,
isRunOnceFunc: isRunOnceFunc,
runOnceCtrl: newRunOnceCtrl(),
}
}

Expand All @@ -107,13 +101,15 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
var ret Result
var expKeys []ExportableCacheKey

runCacheMan := newCacheKeyManager()

for _, d := range digests {
vertex, ok := vertices[d]
if !ok {
return nil, errors.Errorf("digest %s not found", d)
}

res, cacheKey, err := s.buildOne(ctx, d, vertex, job, e)
res, cacheKey, err := s.buildOne(ctx, runCacheMan, d, vertex, job, e)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -146,7 +142,7 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
return NewCachedResult(ret, expKeys), nil
}

func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) {
func (s *simpleSolver) buildOne(ctx context.Context, runCacheMan *cacheKeyManager, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) {
st := s.state(vertex, job)

// Add cache opts to context as they will be accessed by cache retrieval.
Expand All @@ -158,13 +154,13 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver
return nil, "", err
}

inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, job)
inputs, err := s.preprocessInputs(ctx, runCacheMan, st, vertex, cm.CacheMap, job)
if err != nil {
notifyError(ctx, st, false, err)
return nil, "", err
}

cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d)
cacheKey, err := runCacheMan.cacheKey(ctx, d)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -214,7 +210,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver
}
}

err = s.refIDStore.Set(ctx, cacheKey, res.ID())
err = s.resultSource.Link(ctx, cacheKey, res.ID())
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -306,7 +302,7 @@ func (s *simpleSolver) exploreVertices(e Edge) ([]digest.Digest, map[digest.Dige
return ret, vertices
}

func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) {
func (s *simpleSolver) preprocessInputs(ctx context.Context, runCacheMan *cacheKeyManager, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) {
// This struct is used to reconstruct a cache key from an LLB digest & all
// parents using consistent digests that depend on the full dependency chain.
scm := simpleCacheMap{
Expand All @@ -330,7 +326,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V
d := in.Vertex.Digest()

// Compute a cache key given the LLB digest value.
cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d)
cacheKey, err := runCacheMan.cacheKey(ctx, d)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -390,7 +386,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V
inputs = append(inputs, res)
}

s.cacheKeyManager.add(vertex.Digest(), &scm)
runCacheMan.add(vertex.Digest(), &scm)

return inputs, nil
}
Expand Down Expand Up @@ -523,76 +519,6 @@ func (f *parallelGuard) acquire(ctx context.Context, d digest.Digest) (<-chan st
return ch, closer
}

// RefIDStore uses a BoltDB database to store links from computed cache keys to
// worker ref IDs.
type RefIDStore struct {
db *bolt.DB
bucketName string
rootDir string
}

// NewRefIDStore creates and returns a new store and initializes a BoltDB
// instance in the specified root directory.
func NewRefIDStore(rootDir string) (*RefIDStore, error) {
r := &RefIDStore{
bucketName: "ids",
rootDir: rootDir,
}
err := r.init()
if err != nil {
return nil, err
}
return r, nil
}

func (r *RefIDStore) init() error {
db, err := bolt.Open(filepath.Join(r.rootDir, "ids.db"), 0755, nil)
if err != nil {
return err
}
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("ids"))
return err
})
if err != nil {
return err
}
r.db = db
return nil
}

// Set a cache key digest to the value of the worker ref ID.
func (r *RefIDStore) Set(ctx context.Context, key digest.Digest, id string) error {
return r.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(r.bucketName))
return b.Put([]byte(key), []byte(id))
})
}

// Get a worker ref ID given a cache key digest.
func (r *RefIDStore) Get(ctx context.Context, cacheKey digest.Digest) (string, bool, error) {
var id string
err := r.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(r.bucketName))
id = string(b.Get([]byte(cacheKey)))
return nil
})
if err != nil {
return "", false, err
}
if id == "" {
return "", false, nil
}
return id, true, nil
}

func (r *RefIDStore) delete(_ context.Context, key string) error {
return r.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(r.bucketName))
return b.Delete([]byte(key))
})
}

func newDigest(s string) digest.Digest {
return digest.NewDigestFromEncoded(digest.SHA256, s)
}
Loading
Loading