Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable job discard, exporter stub, context cancel bug #56

Merged
merged 18 commits into from
Apr 16, 2024
5 changes: 4 additions & 1 deletion executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}
}

statsCtx, statsCancel := context.WithCancel(ctx)

trace.SpanFromContext(ctx).AddEvent("Container created")
err = w.run(ctx, id, bundle, process, func() {
startedOnce.Do(func() {
Expand All @@ -331,7 +333,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
close(started)
}
if process.StatsStream != nil {
go w.monitorContainerStats(ctx, id, w.sampleFrequency, process.StatsStream) // earthly-specific
go w.monitorContainerStats(statsCtx, id, w.sampleFrequency, process.StatsStream) // earthly-specific
}
if rec != nil {
rec.Start()
Expand All @@ -340,6 +342,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}, true)

releaseContainer := func(ctx context.Context) error {
statsCancel()
err := w.runc.Delete(ctx, id, &runc.DeleteOpts{})
err1 := namespace.Close()
if err == nil {
Expand Down
53 changes: 31 additions & 22 deletions executor/runcexecutor/monitor_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,42 @@ func writeStatsToStream(w io.Writer, stats *runc.Stats) error {

func (w *runcExecutor) monitorContainerStats(ctx context.Context, id string, sampleFrequency time.Duration, statsWriter io.WriteCloser) {
numFailuresAllowed := 10
for {
// sleep at the top of the loop to give it time to start
time.Sleep(sampleFrequency)

stats, err := w.runc.Stats(ctx, id)
if err != nil {
if errors.Is(err, context.Canceled) {
timer := time.NewTimer(sampleFrequency)
defer timer.Stop()

for {
select {
case <-ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice addition!

bklog.G(ctx).Infof("stats collection context done: %v", ctx.Err())
return
case <-timer.C: // Initial sleep will give container the chance to start.
stats, err := w.runc.Stats(ctx, id)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
if numFailuresAllowed > 0 {
// Allow the initial calls to runc.Stats to fail, for cases
// where the program didn't start within the initial
// sampleFrequency; this should only occur under heavy
// workloads.
bklog.G(ctx).Warnf("ignoring runc stats collection error: %s", err)
numFailuresAllowed--
continue
}
bklog.G(ctx).Errorf("runc stats collection error: %s", err)
return
}
if numFailuresAllowed > 0 {
// allow the initial calls to runc.Stats to fail, for cases where the program didn't start within the initial
// sampleFrequency; this should only occur under heavy workloads
bklog.G(ctx).Warnf("ignoring runc stats collection error: %s", err)
numFailuresAllowed--
continue
}
bklog.G(ctx).Errorf("runc stats collection error: %s", err)
return
}

// once runc.Stats has succeeded, don't ignore future errors
numFailuresAllowed = 0
// Once runc.Stats has succeeded, don't ignore future errors.
numFailuresAllowed = 0

err = writeStatsToStream(statsWriter, stats)
if err != nil {
bklog.G(ctx).Errorf("failed to send runc stats to client-stream: %s", err)
return
err = writeStatsToStream(statsWriter, stats)
if err != nil {
bklog.G(ctx).Errorf("failed to send runc stats to client-stream: %s", err)
return
}
}
}
}
5 changes: 0 additions & 5 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,6 @@ func (j *Job) CloseProgress() {
}

func (j *Job) Discard() error {
// TMP: Hack to prevent actives map deletes.
if true {
return nil
}

j.list.mu.Lock()
defer j.list.mu.Unlock()

Expand Down
68 changes: 44 additions & 24 deletions solver/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,90 +49,89 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
// Ordered list of vertices to build.
digests, vertices := s.exploreVertices(e)

var ret Result
var ret CachedResult

for _, d := range digests {
vertex, ok := vertices[d]
if !ok {
return nil, errors.Errorf("digest %s not found", d)
}

res, err := s.buildOne(ctx, d, vertex, job, e)
res, expCacheKeys, err := s.buildOne(ctx, d, vertex, job, e)
if err != nil {
return nil, err
}

ret = res
ret = NewCachedResult(res, expCacheKeys)
}

return NewCachedResult(ret, []ExportableCacheKey{}), nil
return ret, nil
}

func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, error) {
func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, []ExportableCacheKey, error) {
// Ensure we don't have multiple threads working on the same digest.
wait, done := s.parallelGuard.acquire(ctx, d.String())
defer done()
<-wait

st := s.createState(vertex, job)

op := newSharedOp(st.opts.ResolveOpFunc, st.opts.DefaultCache, st)

// Required to access cache map results on state.
st.op = op
st := s.state(vertex, job)

// Add cache opts to context as they will be accessed by cache retrieval.
ctx = withAncestorCacheOpts(ctx, st)

// CacheMap populates required fields in SourceOp.
cm, err := op.CacheMap(ctx, int(e.Index))
cm, err := st.op.CacheMap(ctx, int(e.Index))
if err != nil {
return nil, err
return nil, nil, err
}

inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, job)
if err != nil {
notifyError(ctx, st, false, err)
return nil, err
return nil, nil, err
}

cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d.String())
if err != nil {
return nil, err
return nil, nil, err
}

v, ok, err := s.resultCache.get(ctx, cacheKey)
if err != nil {
return nil, err
return nil, nil, err
}

expCacheKeys := []ExportableCacheKey{
{Exporter: &simpleExporter{cacheKey: cacheKey}},
}

if ok && v != nil {
notifyError(ctx, st, true, nil)
return v, nil
return v, expCacheKeys, nil
}

results, _, err := op.Exec(ctx, inputs)
results, _, err := st.op.Exec(ctx, inputs)
if err != nil {
return nil, err
return nil, nil, err
}

// Ensure all results are finalized (committed to cache). It may be better
// to background these calls at some point.
for _, res := range results {
err = s.commitRefFunc(ctx, res)
if err != nil {
return nil, err
return nil, nil, err
}
}

res := results[int(e.Index)]

err = s.resultCache.set(ctx, cacheKey, res)
if err != nil {
return nil, err
return nil, nil, err
}

return res, nil
return res, expCacheKeys, nil
}

func notifyError(ctx context.Context, st *state, cached bool, err error) {
Expand All @@ -141,6 +140,16 @@ func notifyError(ctx context.Context, st *state, cached bool, err error) {
notifyCompleted(err, cached)
}

func (s *simpleSolver) state(vertex Vertex, job *Job) *state {
s.solver.mu.Lock()
defer s.solver.mu.Unlock()
if st, ok := s.solver.actives[vertex.Digest()]; ok {
st.jobs[job] = struct{}{}
return st
}
return s.createState(vertex, job)
}

// createState creates a new state struct with required and placeholder values.
func (s *simpleSolver) createState(vertex Vertex, job *Job) *state {
defaultCache := NewInMemoryCacheManager()
Expand Down Expand Up @@ -172,9 +181,12 @@ func (s *simpleSolver) createState(vertex Vertex, job *Job) *state {
// necessary dependency information to a few caching components. We'll need
// to expire these keys somehow. We should also move away from using the
// actives map, but it's still being used by withAncestorCacheOpts for now.
s.solver.mu.Lock()
s.solver.actives[vertex.Digest()] = st
s.solver.mu.Unlock()

op := newSharedOp(st.opts.ResolveOpFunc, st.opts.DefaultCache, st)

// Required to access cache map results on state.
st.op = op

return st
}
Expand Down Expand Up @@ -525,3 +537,11 @@ func (c *diskCache) delete(_ context.Context, key string) error {
}

var _ resultCache = &diskCache{}

type simpleExporter struct {
cacheKey string
}

func (s *simpleExporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt CacheExportOpt) ([]CacheExporterRecord, error) {
return nil, nil
}
7 changes: 4 additions & 3 deletions source/local/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, ind
}

// Hack: The encoded session ID here is breaking the simplified caching
// approach in "simple.go". However, a consistent value here is likely
// unreliable with multiple users. Figure out another option.
sessionID = "session-id"
// approach in "simple.go" as it differs for each request. Use the
// SharedKeyHint property which is provided by Earthly and is based off of
// the path & inode names.
sessionID = ls.src.SharedKeyHint

dt, err := json.Marshal(struct {
SessionID string
Expand Down
Loading