Skip to content

Commit

Permalink
Lock ops based on computed key rather than LLB digest
Browse files Browse the repository at this point in the history
  • Loading branch information
mikejholly committed May 22, 2024
1 parent d4e630c commit 6e287a4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 14 deletions.
4 changes: 3 additions & 1 deletion solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

// ResolveOpFunc finds an Op implementation for a Vertex
// ResolveOpFunc finds an Op implementation for a Vertex.
type ResolveOpFunc func(Vertex, Builder) (Op, error)

type Builder interface {
Expand Down Expand Up @@ -261,6 +261,7 @@ type SolverOpt struct {
ResultSource ResultSource
RefIDStore *RefIDStore
CommitRefFunc CommitRefFunc
IsRunOnceFunc IsRunOnceFunc
}

func NewSolver(opts SolverOpt) *Solver {
Expand All @@ -280,6 +281,7 @@ func NewSolver(opts SolverOpt) *Solver {
solver,
opts.RefIDStore,
opts.ResultSource,
opts.IsRunOnceFunc,
)
solver.simple = simple

Expand Down
29 changes: 29 additions & 0 deletions solver/llbsolver/simple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package llbsolver

import (
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver/ops"
)

// isRunOnce returns a function that can be called to determine if a Vertex
// contains an operation that must be run at least once per build.
func (s *Solver) isRunOnceOp() solver.IsRunOnceFunc {
return func(v solver.Vertex, b solver.Builder) (bool, error) {
w, err := s.resolveWorker()
if err != nil {
return false, err
}

op, err := w.ResolveOp(v, s.Bridge(b), s.sm)
if err != nil {
return false, err
}

switch op.(type) {
case *ops.SourceOp:
return true, nil
default:
return false, nil
}
}
}
1 change: 1 addition & 0 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func New(opt Opt) (*Solver, error) {

s.solver = solver.NewSolver(solver.SolverOpt{
ResolveOpFunc: s.resolver(),
IsRunOnceFunc: s.isRunOnceOp(),
DefaultCache: opt.CacheManager,
ResultSource: sources,
CommitRefFunc: worker.FinalizeRef,
Expand Down
93 changes: 80 additions & 13 deletions solver/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/hashicorp/golang-lru/simplelru"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/tracing"
Expand All @@ -21,21 +22,52 @@ import (
// CommitRefFunc can be used to finalize a Result's ImmutableRef.
type CommitRefFunc func(ctx context.Context, result Result) error

// IsRunOnceFunc determines if the vertex represents an operation that needs to
// be run at least once.
type IsRunOnceFunc func(Vertex, Builder) (bool, error)

// ResultSource can be any source (local or remote) that allows one to load a
// Result using a cache key digest.
type ResultSource interface {
Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error)
}

type runOnceCtrl struct {
lru *simplelru.LRU
mu sync.Mutex
}

func newRunOnceCtrl() *runOnceCtrl {
lru, _ := simplelru.NewLRU(1e3, nil) // Error impossible on positive first argument.
return &runOnceCtrl{lru: lru}
}

// hasRun: Here, we use an LRU cache to whether we need to execute the source
// operation for this job. The jobs may be re-run if the LRU size is exceeded,
// but this shouldn't have a big impact on the build. The trade-off is
// worthwhile given the memory-friendliness of LRUs.
func (s *runOnceCtrl) hasRun(d digest.Digest, sessionID string) bool {
s.mu.Lock()
defer s.mu.Unlock()

key := fmt.Sprintf("%s:%s", sessionID, d)
ret := s.lru.Contains(key)

s.lru.Add(key, struct{}{})

return ret
}

type simpleSolver struct {
resolveOpFunc ResolveOpFunc
isRunOnceFunc IsRunOnceFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
refIDStore *RefIDStore
resultSource ResultSource
cacheKeyManager *cacheKeyManager
mu sync.Mutex
runOnceCtrl *runOnceCtrl
}

func newSimpleSolver(
Expand All @@ -44,6 +76,7 @@ func newSimpleSolver(
solver *Solver,
refIDStore *RefIDStore,
resultSource ResultSource,
isRunOnceFunc IsRunOnceFunc,
) *simpleSolver {
return &simpleSolver{
cacheKeyManager: newCacheKeyManager(),
Expand All @@ -53,6 +86,8 @@ func newSimpleSolver(
solver: solver,
refIDStore: refIDStore,
resultSource: resultSource,
isRunOnceFunc: isRunOnceFunc,
runOnceCtrl: newRunOnceCtrl(),
}
}

Expand All @@ -75,6 +110,14 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
return nil, err
}

// Release previous result as this is not the final return value.
if ret != nil {
err := ret.Release(ctx)
if err != nil {
return nil, err
}
}

ret = res

// Hijack the CacheKey type in order to export a reference from the new cache key to the ref ID.
Expand All @@ -87,6 +130,11 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
})
}

err := s.commitRefFunc(ctx, ret)
if err != nil {
return nil, err
}

return NewCachedResult(ret, expKeys), nil
}

Expand Down Expand Up @@ -121,32 +169,43 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver
defer done()
<-wait

v, ok, err := s.resultSource.Load(ctx, cacheKey)
isRunOnce, err := s.isRunOnceFunc(vertex, job)
if err != nil {
return nil, "", err
}

if ok && v != nil {
notifyError(ctx, st, true, nil)
return v, cacheKey, nil
// Special case for source operations. They need to be run once per build or
// content changes will not be reliably detected.
mayLoadCache := !isRunOnce || isRunOnce && s.runOnceCtrl.hasRun(cacheKey, job.SessionID)

if mayLoadCache {
v, ok, err := s.resultSource.Load(ctx, cacheKey)
if err != nil {
return nil, "", err
}

if ok && v != nil {
notifyError(ctx, st, true, nil)
return v, cacheKey, nil
}
}

results, _, err := st.op.Exec(ctx, inputs)
if err != nil {
return nil, "", err
}

// Ensure all results are finalized (committed to cache). It may be better
// to background these calls at some point.
for _, res := range results {
err = s.commitRefFunc(ctx, res)
if err != nil {
return nil, "", err
res := results[int(e.Index)]

for i := range results {
if i != int(e.Index) {
err = results[i].Release(ctx)
if err != nil {
return nil, "", err
}
}
}

res := results[int(e.Index)]

err = s.refIDStore.Set(ctx, cacheKey, res.ID())
if err != nil {
return nil, "", err
Expand Down Expand Up @@ -306,6 +365,14 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V
}
}

// The result can be released now that the preprocess & slow cache
// digest functions have been run. This is crucial as failing to do so
// will lead to full file copying from mutable snapshots.
err = res.Release(ctx)
if err != nil {
return nil, err
}

// Add input references to the struct as to link dependencies.
scm.inputs[i] = in.Vertex.Digest()

Expand Down

0 comments on commit 6e287a4

Please sign in to comment.