Skip to content

Commit

Permalink
Release & cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mikejholly committed May 22, 2024
1 parent dccc5a2 commit 9ed8778
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 46 deletions.
14 changes: 7 additions & 7 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ type Job struct {
}

type SolverOpt struct {
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
ResultSource ResultSource
RefIDStore *RefIDStore
CommitRefFunc CommitRefFunc
IsSourceOpFunc IsSourceOpFunc
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
ResultSource ResultSource
RefIDStore *RefIDStore
CommitRefFunc CommitRefFunc
IsRunOnceFunc IsRunOnceFunc
}

func NewSolver(opts SolverOpt) *Solver {
Expand All @@ -281,7 +281,7 @@ func NewSolver(opts SolverOpt) *Solver {
solver,
opts.RefIDStore,
opts.ResultSource,
opts.IsSourceOpFunc,
opts.IsRunOnceFunc,
)
solver.simple = simple

Expand Down
6 changes: 3 additions & 3 deletions solver/llbsolver/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"github.com/moby/buildkit/solver/llbsolver/ops"
)

// isSourceOp returns a function that can be called to determine if a Vertex
// contains an *ops.SourceOp.
func (s *Solver) isSourceOp() solver.IsSourceOpFunc {
// isRunOnce returns a function that can be called to determine if a Vertex
// contains an operation that must be run at least once per build.
func (s *Solver) isRunOnceOp() solver.IsRunOnceFunc {
return func(v solver.Vertex, b solver.Builder) (bool, error) {
w, err := s.resolveWorker()
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ func New(opt Opt) (*Solver, error) {
)

s.solver = solver.NewSolver(solver.SolverOpt{
ResolveOpFunc: s.resolver(),
IsSourceOpFunc: s.isSourceOp(),
DefaultCache: opt.CacheManager,
ResultSource: sources,
CommitRefFunc: worker.FinalizeRef,
RefIDStore: refIDStore,
ResolveOpFunc: s.resolver(),
IsRunOnceFunc: s.isRunOnceOp(),
DefaultCache: opt.CacheManager,
ResultSource: sources,
CommitRefFunc: worker.FinalizeRef,
RefIDStore: refIDStore,
})
return s, nil
}
Expand Down
80 changes: 50 additions & 30 deletions solver/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,52 @@ import (
// CommitRefFunc can be used to finalize a Result's ImmutableRef.
type CommitRefFunc func(ctx context.Context, result Result) error

// IsSourceOpFunc determines if the vertex represents a source op.
type IsSourceOpFunc func(Vertex, Builder) (bool, error)
// IsRunOnceFunc determines if the vertex represents an operation that needs to
// be run at least once.
type IsRunOnceFunc func(Vertex, Builder) (bool, error)

// ResultSource can be any source (local or remote) that allows one to load a
// Result using a cache key digest.
type ResultSource interface {
Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error)
}

type sourceOpCtrl struct {
type runOnceCtrl struct {
lru *simplelru.LRU
mu sync.Mutex
}

func newSourceOpCtrl() *sourceOpCtrl {
func newRunOnceCtrl() *runOnceCtrl {
lru, _ := simplelru.NewLRU(1e3, nil) // Error impossible on positive first argument.
return &sourceOpCtrl{lru: lru}
return &runOnceCtrl{lru: lru}
}

func (s *sourceOpCtrl) processed(d digest.Digest, sessionID string) bool {
// hasRun: Here, we use an LRU cache to whether we need to execute the source
// operation for this job. The jobs may be re-run if the LRU size is exceeded,
// but this shouldn't have a big impact on the build. The trade-off is
// worthwhile given the memory-friendliness of LRUs.
func (s *runOnceCtrl) hasRun(d digest.Digest, sessionID string) bool {
s.mu.Lock()
defer s.mu.Unlock()

key := fmt.Sprintf("%s:%s", sessionID, d)
ret := s.lru.Contains(key)

fmt.Println("LRU size", s.lru.Len())

s.lru.Add(key, struct{}{})

return ret
}

type simpleSolver struct {
resolveOpFunc ResolveOpFunc
isSourceOpFunc IsSourceOpFunc
isRunOnceFunc IsRunOnceFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
refIDStore *RefIDStore
resultSource ResultSource
cacheKeyManager *cacheKeyManager
sourceOpCtrl *sourceOpCtrl
runOnceCtrl *runOnceCtrl
}

func newSimpleSolver(
Expand All @@ -73,7 +76,7 @@ func newSimpleSolver(
solver *Solver,
refIDStore *RefIDStore,
resultSource ResultSource,
isSourceOpFunc IsSourceOpFunc,
isRunOnceFunc IsRunOnceFunc,
) *simpleSolver {
return &simpleSolver{
cacheKeyManager: newCacheKeyManager(),
Expand All @@ -83,8 +86,8 @@ func newSimpleSolver(
solver: solver,
refIDStore: refIDStore,
resultSource: resultSource,
isSourceOpFunc: isSourceOpFunc,
sourceOpCtrl: newSourceOpCtrl(),
isRunOnceFunc: isRunOnceFunc,
runOnceCtrl: newRunOnceCtrl(),
}
}

Expand All @@ -107,6 +110,14 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
return nil, err
}

// Release preview result as this is not the final return value.
if ret != nil {
err := ret.Release(ctx)
if err != nil {
return nil, err
}
}

ret = res

// Hijack the CacheKey type in order to export a reference from the new cache key to the ref ID.
Expand All @@ -119,6 +130,11 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
})
}

err := s.commitRefFunc(ctx, ret)
if err != nil {
return nil, err
}

return NewCachedResult(ret, expKeys), nil
}

Expand Down Expand Up @@ -153,19 +169,16 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver
defer done()
<-wait

isSourceOp, err := s.isSourceOpFunc(vertex, job)
isRunOnce, err := s.isRunOnceFunc(vertex, job)
if err != nil {
return nil, "", err
}

// It essential to execute source operations in order to detect content
// change, but these operations should only be run once per build. Here, we
// use an LRU cache to whether we need to execute the source operation for
// this job. The jobs may be re-run if the LRU size is exceeded, but this
// shouldn't have a big impact on the build.
canLoadCache := !isSourceOp || isSourceOp && s.sourceOpCtrl.processed(cacheKey, job.SessionID)
// Special case for source operations. They need to be run once per build or
// content changes will not be reliably detected.
mayLoadCache := !isRunOnce || isRunOnce && s.runOnceCtrl.hasRun(cacheKey, job.SessionID)

if canLoadCache {
if mayLoadCache {
v, ok, err := s.resultSource.Load(ctx, cacheKey)
if err != nil {
return nil, "", err
Expand All @@ -182,17 +195,17 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver
return nil, "", err
}

// Ensure all results are finalized (committed to cache). It may be better
// to background these calls at some point.
for _, res := range results {
err = s.commitRefFunc(ctx, res)
if err != nil {
return nil, "", err
res := results[int(e.Index)]

for i := range results {
if i != int(e.Index) {
err = results[i].Release(ctx)
if err != nil {
return nil, "", err
}
}
}

res := results[int(e.Index)]

err = s.refIDStore.Set(ctx, cacheKey, res.ID())
if err != nil {
return nil, "", err
Expand Down Expand Up @@ -344,7 +357,6 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V
// operation.
if dep.ComputeDigestFunc != nil {
compDigest, err := dep.ComputeDigestFunc(ctx, res, st)
fmt.Println("slow cache digest", compDigest)
if err != nil {
bklog.G(ctx).Warnf("failed to compute digest: %v", err)
return nil, err
Expand All @@ -353,6 +365,14 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V
}
}

// The result can be released now that the preprocess & slow cache
// digest functions have been run. This is crucial as failing to do so
// will lead to full file copying from mutable snapshots.
err = res.Release(ctx)
if err != nil {
return nil, err
}

// Add input references to the struct as to link dependencies.
scm.inputs[i] = in.Vertex.Digest()

Expand Down

0 comments on commit 9ed8778

Please sign in to comment.