Skip to content

Commit

Permalink
internal/worker/server: return an error on depsolve timeout HMS-2989
Browse files Browse the repository at this point in the history
Fixes the special case that if no worker is available and we
generate an internal timeout and cancel the depsolve including all
followup jobs, no error was propagated.
  • Loading branch information
schuellerf committed Oct 4, 2024
1 parent a54ac30 commit 82a6126
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 31 deletions.
98 changes: 67 additions & 31 deletions internal/cloudapi/v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -258,7 +259,7 @@ func (s *Server) enqueueCompose(irs []imageRequest, channel string) (uuid.UUID,

s.goroutinesGroup.Add(1)
go func() {
serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, ir.manifestSeed)
serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, id, ir.manifestSeed)
defer s.goroutinesGroup.Done()
}()

Expand Down Expand Up @@ -414,7 +415,7 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas
// copy the image request while passing it into the goroutine to prevent data races
s.goroutinesGroup.Add(1)
go func(ir imageRequest) {
serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, ir.manifestSeed)
serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, buildID, ir.manifestSeed)
defer s.goroutinesGroup.Done()
}(ir)
}
Expand All @@ -435,23 +436,81 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas
return id, nil
}

func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, seed int64) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, osbuildJobID uuid.UUID, seed int64) {
// prepared to become a config variable
const depsolveTimeout = 5
ctx, cancel := context.WithTimeout(ctx, time.Minute*depsolveTimeout)
defer cancel()

// wait until job is in a pending state
var token uuid.UUID
jobResult := &worker.ManifestJobByIDResult{
Manifest: nil,
ManifestInfo: worker.ManifestInfo{
OSBuildComposerVersion: common.BuildVersion(),
},
}

var dynArgs []json.RawMessage
var err error
token := uuid.Nil
logWithId := logrus.WithField("jobId", manifestJobID)

defer func() {
// token == uuid.Nil indicates that no worker even started processing
if token == uuid.Nil {
if jobResult.JobError != nil {
// set all jobs to "failed"
jobs := map[string]uuid.UUID{
"depsolve": depsolveJobID,
"containerResolve": containerResolveJobID,
"ostreeResolve": ostreeResolveJobID,
"manifest": manifestJobID,
"osbuild": osbuildJobID,
}

for jobName, jobID := range jobs {
if jobID != uuid.Nil {
err := workers.SetFailed(jobID, jobResult.JobError)
if err != nil {
logWithId.Errorf("Error failing %s job: %v", jobName, err)
}
}
}

} else {
logWithId.Errorf("Internal error, no worker started depsolve but we didn't get a reason.")
}
} else {
result, err := json.Marshal(jobResult)
if err != nil {
logWithId.Errorf("Error marshalling manifest job results: %v", err)
}
err = workers.FinishJob(token, result)
if err != nil {
logWithId.Errorf("Error finishing manifest job: %v", err)
}
if jobResult.JobError != nil {
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
}
}
}()


// wait until job is in a pending state
for {
_, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID)
if err == jobqueue.ErrNotPending {
if errors.Is(err, jobqueue.ErrNotPending) {
logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish")
time.Sleep(time.Millisecond * 50)
select {
case <-ctx.Done():
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines")
logWithId.Warning(fmt.Sprintf("Manifest job dependencies took longer than %d minutes to finish," +
" or the server is shutting down, returning to avoid dangling routines", depsolveTimeout))

jobResult.JobError = clienterrors.New(clienterrors.ErrorDepsolveTimeout,
"Timeout while waiting for package dependency resolution",
"There may be a temporary issue with compute resources. " +
"We’re looking into it, please try again later.",
)
break
default:
continue
Expand All @@ -464,13 +523,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w
break
}

jobResult := &worker.ManifestJobByIDResult{
Manifest: nil,
ManifestInfo: worker.ManifestInfo{
OSBuildComposerVersion: common.BuildVersion(),
},
}

// add osbuild/images dependency info to job result
osbuildImagesDep, err := common.GetDepModuleInfoByPath(common.OSBuildImagesModulePath)
if err != nil {
Expand All @@ -482,22 +534,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w
jobResult.ManifestInfo.OSBuildComposerDeps = append(jobResult.ManifestInfo.OSBuildComposerDeps, osbuildImagesDepModule)
}

defer func() {
if jobResult.JobError != nil {
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
}

result, err := json.Marshal(jobResult)
if err != nil {
logWithId.Errorf("Error marshalling manifest job results: %v", err)
}

err = workers.FinishJob(token, result)
if err != nil {
logWithId.Errorf("Error finishing manifest job: %v", err)
}
}()

if len(dynArgs) == 0 {
reason := "No dynamic arguments"
jobResult.JobError = clienterrors.New(clienterrors.ErrorNoDynamicArgs, reason, nil)
Expand Down
34 changes: 34 additions & 0 deletions internal/jobqueue/fsjobqueue/fsjobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,40 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
return nil
}

func (q *fsJobQueue) FailJob(id uuid.UUID, result interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()

j, err := q.readJob(id)
if err != nil {
return err
}

if !j.FinishedAt.IsZero() {
return jobqueue.ErrFinished
}

if !j.StartedAt.IsZero() {
return jobqueue.ErrRunning
}

j.Result, err = json.Marshal(result)
if err != nil {
return err
}

j.StartedAt = time.Now()
j.FinishedAt = time.Now()
j.Token = uuid.New()

err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}

return nil
}

func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) {
j, err := q.readJob(id)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/worker/clienterrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
ErrorJobPanicked ClientErrorCode = 37
ErrorGeneratingSignedURL ClientErrorCode = 38
ErrorInvalidRepositoryURL ClientErrorCode = 39
ErrorDepsolveTimeout ClientErrorCode = 40
)

type ClientErrorCode int
Expand Down
21 changes: 21 additions & 0 deletions internal/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,27 @@ func (s *Server) Cancel(id uuid.UUID) error {
return s.jobs.CancelJob(id)
}

// SetFailed sets the given job id to "failed" with the given error
func (s *Server) SetFailed(id uuid.UUID, error *clienterrors.Error) error {
/* create a separate metrics?
jobInfo, err := s.jobInfo(id, nil)
if err != nil {
logrus.Errorf("error getting job status: %v", err)
} else {
prometheus.CancelJobMetrics(jobInfo.JobStatus.Started, jobInfo.JobType, jobInfo.Channel)
}*/
FailedJobErrorResult := JobResult{
JobError: error,
}

res, err := json.Marshal(FailedJobErrorResult)
if err != nil {
logrus.Errorf("error marshalling the error: %v", err)
return nil
}
return s.jobs.FailJob(id, res)
}

// Provides access to artifacts of a job. Returns an io.Reader for the artifact
// and the artifact's size.
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
Expand Down
32 changes: 32 additions & 0 deletions pkg/jobqueue/dbjobqueue/dbjobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ const (
WHERE id = $1 AND finished_at IS NULL
RETURNING type, started_at`

sqlFailJob = `
UPDATE jobs
SET token = $2, started_at = now(), finished_at = now(), result = $3
WHERE id = $1 AND finished_at IS NULL AND started_at IS NULL AND token IS NULL
RETURNING id, type`

sqlInsertHeartbeat = `
INSERT INTO heartbeats(token, id, heartbeat)
VALUES ($1, $2, now())`
Expand Down Expand Up @@ -591,6 +597,32 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
return nil
}

func (q *DBJobQueue) FailJob(id uuid.UUID, result interface{}) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %w", err)
}
defer conn.Release()

var jobType string
var resultId uuid.UUID
dummyToken := uuid.New()
err = conn.QueryRow(context.Background(), sqlFailJob, id, dummyToken, result).Scan(&resultId, &jobType)
if errors.Is(err, pgx.ErrNoRows) {
return jobqueue.ErrNotRunning
}
if err != nil {
return fmt.Errorf("error failing job %s: %w", id, err)
}
if id != resultId {
return fmt.Errorf("that should never happen, I wanted to set %s to failed but got %s back from DB", id, resultId)
}

q.logger.Info("Job set to failed", "job_type", jobType, "job_id", id.String())

return nil
}

func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/jobqueue/jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type JobQueue interface {
// Cancel a job. Does nothing if the job has already finished.
CancelJob(id uuid.UUID) error

// Fail a job that didn't even start (e.g. no worker available)
FailJob(id uuid.UUID, result interface{}) error

// If the job has finished, returns the result as raw JSON.
//
// Returns the current status of the job, in the form of three times:
Expand Down Expand Up @@ -114,6 +117,8 @@ var (
ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled")
ErrActiveJobs = errors.New("worker has active jobs associated with it")
ErrWorkerNotExist = errors.New("worker does not exist")
ErrRunning = errors.New("job is running, but wasn't expected to be")
ErrFinished = errors.New("job is finished, but wasn't expected to be")
)

type Worker struct {
Expand Down

0 comments on commit 82a6126

Please sign in to comment.