Skip to content

Commit

Permalink
fix: handle lease failure better
Browse files Browse the repository at this point in the history
When your laptop goes to sleep we were seeing lots of runner restarts using a cancelled lease context.

It is hard to reproduce reliably, but this should work around it for now.

I could not reproduce the issue by manually cancelling the lease, it only happens when the computer hibernates.

fixes: #3177
  • Loading branch information
stuartwdouglas committed Oct 30, 2024
1 parent 95c8b6d commit de1af3f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
15 changes: 12 additions & 3 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ func (l *localScaling) reconcileRunners(ctx context.Context, deploymentRunners *
}

func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, info *deploymentInfo) error {
select {
case <-ctx.Done():
// In some cases this gets called with an expired context, generally after the lease is released
// We don't want to start a runner in that case
return nil
default:
}
controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)]
logger := log.FromContext(ctx)

Expand Down Expand Up @@ -210,12 +217,14 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
go func() {
logger.Debugf("Starting runner: %s", config.Key)
err := runner.Start(runnerCtx, config)
l.lock.Lock()
defer l.lock.Unlock()
if err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf(err, "Runner failed: %s", err)
} else {
// Don't count context.Canceled as an a restart error
info.exits++
}
l.lock.Lock()
defer l.lock.Unlock()
info.exits++
if info.exits >= maxExits {
logger.Errorf(fmt.Errorf("too many restarts"), "Runner failed too many times, not restarting")
}
Expand Down
16 changes: 8 additions & 8 deletions backend/controller/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ func BeginGrpcScaling(ctx context.Context, url url.URL, leaser leases.Leaser, ha
// Grab a lease to take control of runner scaling
lease, leaseContext, err := leaser.AcquireLease(ctx, leases.SystemKey("ftl-scaling", "runner-creation"), leaseTimeout, optional.None[any]())
if err == nil {
defer func(lease leases.Lease) {
err := lease.Release()
if err != nil {
logger := log.FromContext(ctx)
logger.Errorf(err, "Failed to release lease")
}
}(lease)
// If we get it then we take over runner scaling
runGrpcScaling(leaseContext, url, handler)
} else if !errors.Is(err, leases.ErrConflict) {
Expand All @@ -43,8 +36,15 @@ func BeginGrpcScaling(ctx context.Context, url url.URL, leaser leases.Leaser, ha
}
select {
case <-ctx.Done():
if lease != nil {
err := lease.Release()
if err != nil {
logger := log.FromContext(ctx)
logger.Errorf(err, "Failed to release lease")
}
}
return
case <-time.After(leaseTimeout):
case <-leaseContext.Done():
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,6 @@ func RetryStreamingServerStream[Req, Resp any](
for {
stream, err := rpc(ctx, connect.NewRequest(req))
if err == nil {
defer func(stream *connect.ServerStreamForClient[Resp]) {
err := stream.Close()
if err != nil {
logger.Debugf("Failed to close stream: %s", err)
}
}(stream)
for {
if stream.Receive() {
resp := stream.Msg()
Expand All @@ -300,6 +294,10 @@ func RetryStreamingServerStream[Req, Resp any](
}
select {
case <-ctx.Done():
err := stream.Close()
if err != nil {
logger.Debugf("Failed to close stream: %s", err)
}
return
default:
}
Expand All @@ -312,6 +310,10 @@ func RetryStreamingServerStream[Req, Resp any](
break
}
}
err := stream.Close()
if err != nil {
logger.Debugf("Failed to close stream: %s", err)
}
}

errored = true
Expand Down

0 comments on commit de1af3f

Please sign in to comment.