diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 9496341d4..502f80498 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -990,7 +990,13 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m } // recheck the tenant queue - ec.checkTenantQueue(ctx, metadata.TenantId, "", false, true) + sr, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId) + + if err != nil { + return fmt.Errorf("could not get step run: %w", err) + } + + ec.checkTenantQueue(ctx, metadata.TenantId, sr.SRQueue, false, true) return nil } @@ -1030,7 +1036,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun } // check the queue on failure - defer ec.checkTenantQueue(ctx, tenantId, "", false, true) + defer ec.checkTenantQueue(ctx, tenantId, oldStepRun.SRQueue, false, true) // determine if step run should be retried or not shouldRetry := oldStepRun.SRRetryCount < oldStepRun.StepRetries diff --git a/internal/services/controllers/jobs/queue.go b/internal/services/controllers/jobs/queue.go index 41d5ee2de..2e785fd7e 100644 --- a/internal/services/controllers/jobs/queue.go +++ b/internal/services/controllers/jobs/queue.go @@ -239,6 +239,10 @@ func (q *queue) handleCheckQueue(ctx context.Context, task *msgqueue.Message) er case payload.IsStepQueued && payload.QueueName != "": q.scheduler.Queue(ctx, metadata.TenantId, payload.QueueName) case payload.IsSlotReleased: + if payload.QueueName != "" { + q.scheduler.Queue(ctx, metadata.TenantId, payload.QueueName) + } + q.scheduler.Replenish(ctx, metadata.TenantId) q.updateStepRunV2Operations.RunOrContinue(metadata.TenantId) default: diff --git a/pkg/repository/prisma/dbsqlc/queue.sql b/pkg/repository/prisma/dbsqlc/queue.sql index fd46486c0..2de4f368f 100644 --- a/pkg/repository/prisma/dbsqlc/queue.sql +++ b/pkg/repository/prisma/dbsqlc/queue.sql @@ -150,9 +150,12 @@ FOR UPDATE SKIP LOCKED; -- name: ListQueueItemsForQueue :many SELECT - * + sqlc.embed(qi), + sr."status" FROM "QueueItem" qi +JOIN + "StepRun" sr ON qi."stepRunId" = sr."id" WHERE qi."isQueued" = true AND qi."tenantId" = @tenantId::uuid diff --git a/pkg/repository/prisma/dbsqlc/queue.sql.go b/pkg/repository/prisma/dbsqlc/queue.sql.go index 564b18ee7..5fb77418d 100644 --- a/pkg/repository/prisma/dbsqlc/queue.sql.go +++ b/pkg/repository/prisma/dbsqlc/queue.sql.go @@ -732,9 +732,12 @@ func (q *Queries) ListInternalQueueItems(ctx context.Context, db DBTX, arg ListI const listQueueItemsForQueue = `-- name: ListQueueItemsForQueue :many SELECT - id, "stepRunId", "stepId", "actionId", "scheduleTimeoutAt", "stepTimeout", priority, "isQueued", "tenantId", queue, sticky, "desiredWorkerId" + qi.id, qi."stepRunId", qi."stepId", qi."actionId", qi."scheduleTimeoutAt", qi."stepTimeout", qi.priority, qi."isQueued", qi."tenantId", qi.queue, qi.sticky, qi."desiredWorkerId", + sr."status" FROM "QueueItem" qi +JOIN + "StepRun" sr ON qi."stepRunId" = sr."id" WHERE qi."isQueued" = true AND qi."tenantId" = $1::uuid @@ -759,7 +762,12 @@ type ListQueueItemsForQueueParams struct { Limit pgtype.Int4 `json:"limit"` } -func (q *Queries) ListQueueItemsForQueue(ctx context.Context, db DBTX, arg ListQueueItemsForQueueParams) ([]*QueueItem, error) { +type ListQueueItemsForQueueRow struct { + QueueItem QueueItem `json:"queue_item"` + Status StepRunStatus `json:"status"` +} + +func (q *Queries) ListQueueItemsForQueue(ctx context.Context, db DBTX, arg ListQueueItemsForQueueParams) ([]*ListQueueItemsForQueueRow, error) { rows, err := db.Query(ctx, listQueueItemsForQueue, arg.Tenantid, arg.Queue, @@ -770,22 +778,23 @@ func (q *Queries) ListQueueItemsForQueue(ctx context.Context, db DBTX, arg ListQ return nil, err } defer rows.Close() - var items []*QueueItem + var items []*ListQueueItemsForQueueRow for rows.Next() { - var i QueueItem + var i ListQueueItemsForQueueRow if err := rows.Scan( - &i.ID, - &i.StepRunId, - &i.StepId, - &i.ActionId, - &i.ScheduleTimeoutAt, - &i.StepTimeout, - &i.Priority, - &i.IsQueued, - &i.TenantId, - &i.Queue, - &i.Sticky, - &i.DesiredWorkerId, + &i.QueueItem.ID, + &i.QueueItem.StepRunId, + &i.QueueItem.StepId, + &i.QueueItem.ActionId, + &i.QueueItem.ScheduleTimeoutAt, + &i.QueueItem.StepTimeout, + &i.QueueItem.Priority, + &i.QueueItem.IsQueued, + &i.QueueItem.TenantId, + &i.QueueItem.Queue, + &i.QueueItem.Sticky, + &i.QueueItem.DesiredWorkerId, + &i.Status, ); err != nil { return nil, err } diff --git a/pkg/repository/prisma/dbsqlc/workflows.sql b/pkg/repository/prisma/dbsqlc/workflows.sql index 9630a948d..0836ad115 100644 --- a/pkg/repository/prisma/dbsqlc/workflows.sql +++ b/pkg/repository/prisma/dbsqlc/workflows.sql @@ -354,29 +354,30 @@ INSERT INTO "WorkflowTriggerScheduledRef" ( ) RETURNING *; -- name: ListWorkflowsForEvent :many -SELECT DISTINCT ON ("WorkflowVersion"."workflowId") "WorkflowVersion".id -FROM "WorkflowVersion" -LEFT JOIN "Workflow" AS j1 ON j1.id = "WorkflowVersion"."workflowId" -LEFT JOIN "WorkflowTriggers" AS j2 ON j2."workflowVersionId" = "WorkflowVersion"."id" +-- Get all of the latest workflow versions for the tenant +WITH latest_versions AS ( + SELECT DISTINCT ON("workflowId") + workflowVersions."id" AS "workflowVersionId" + FROM + "WorkflowVersion" as workflowVersions + JOIN + "Workflow" as workflow ON workflow."id" = workflowVersions."workflowId" + WHERE + workflow."tenantId" = @tenantId::uuid + AND workflowVersions."deletedAt" IS NULL + ORDER BY "workflowId", "order" DESC +) +-- select the workflow versions that have the event trigger +SELECT + latest_versions."workflowVersionId" +FROM + latest_versions +JOIN + "WorkflowTriggers" as triggers ON triggers."workflowVersionId" = latest_versions."workflowVersionId" +JOIN + "WorkflowTriggerEventRef" as eventRef ON eventRef."parentId" = triggers."id" WHERE - (j1."tenantId"::uuid = @tenantId AND j1.id IS NOT NULL) - AND j1."deletedAt" IS NULL - AND "WorkflowVersion"."deletedAt" IS NULL - AND - (j2.id IN ( - SELECT t3."parentId" - FROM "WorkflowTriggerEventRef" AS t3 - WHERE t3."eventKey" = @eventKey AND t3."parentId" IS NOT NULL - ) AND j2.id IS NOT NULL) - AND "WorkflowVersion".id = ( - -- confirm that the workflow version is the latest - SELECT wv2.id - FROM "WorkflowVersion" wv2 - WHERE wv2."workflowId" = "WorkflowVersion"."workflowId" - ORDER BY wv2."order" DESC - LIMIT 1 - ) -ORDER BY "WorkflowVersion"."workflowId", "WorkflowVersion"."order" DESC; + eventRef."eventKey" = @eventKey::text; -- name: GetWorkflowVersionForEngine :many SELECT diff --git a/pkg/repository/prisma/dbsqlc/workflows.sql.go b/pkg/repository/prisma/dbsqlc/workflows.sql.go index 6b4177041..fb608f209 100644 --- a/pkg/repository/prisma/dbsqlc/workflows.sql.go +++ b/pkg/repository/prisma/dbsqlc/workflows.sql.go @@ -1438,49 +1438,50 @@ func (q *Queries) ListWorkflows(ctx context.Context, db DBTX, arg ListWorkflowsP } const listWorkflowsForEvent = `-- name: ListWorkflowsForEvent :many -SELECT DISTINCT ON ("WorkflowVersion"."workflowId") "WorkflowVersion".id -FROM "WorkflowVersion" -LEFT JOIN "Workflow" AS j1 ON j1.id = "WorkflowVersion"."workflowId" -LEFT JOIN "WorkflowTriggers" AS j2 ON j2."workflowVersionId" = "WorkflowVersion"."id" +WITH latest_versions AS ( + SELECT DISTINCT ON("workflowId") + workflowVersions."id" AS "workflowVersionId" + FROM + "WorkflowVersion" as workflowVersions + JOIN + "Workflow" as workflow ON workflow."id" = workflowVersions."workflowId" + WHERE + workflow."tenantId" = $2::uuid + AND workflowVersions."deletedAt" IS NULL + ORDER BY "workflowId", "order" DESC +) +SELECT + latest_versions."workflowVersionId" +FROM + latest_versions +JOIN + "WorkflowTriggers" as triggers ON triggers."workflowVersionId" = latest_versions."workflowVersionId" +JOIN + "WorkflowTriggerEventRef" as eventRef ON eventRef."parentId" = triggers."id" WHERE - (j1."tenantId"::uuid = $1 AND j1.id IS NOT NULL) - AND j1."deletedAt" IS NULL - AND "WorkflowVersion"."deletedAt" IS NULL - AND - (j2.id IN ( - SELECT t3."parentId" - FROM "WorkflowTriggerEventRef" AS t3 - WHERE t3."eventKey" = $2 AND t3."parentId" IS NOT NULL - ) AND j2.id IS NOT NULL) - AND "WorkflowVersion".id = ( - -- confirm that the workflow version is the latest - SELECT wv2.id - FROM "WorkflowVersion" wv2 - WHERE wv2."workflowId" = "WorkflowVersion"."workflowId" - ORDER BY wv2."order" DESC - LIMIT 1 - ) -ORDER BY "WorkflowVersion"."workflowId", "WorkflowVersion"."order" DESC + eventRef."eventKey" = $1::text ` type ListWorkflowsForEventParams struct { - Tenantid pgtype.UUID `json:"tenantid"` Eventkey string `json:"eventkey"` + Tenantid pgtype.UUID `json:"tenantid"` } +// Get all of the latest workflow versions for the tenant +// select the workflow versions that have the event trigger func (q *Queries) ListWorkflowsForEvent(ctx context.Context, db DBTX, arg ListWorkflowsForEventParams) ([]pgtype.UUID, error) { - rows, err := db.Query(ctx, listWorkflowsForEvent, arg.Tenantid, arg.Eventkey) + rows, err := db.Query(ctx, listWorkflowsForEvent, arg.Eventkey, arg.Tenantid) if err != nil { return nil, err } defer rows.Close() var items []pgtype.UUID for rows.Next() { - var id pgtype.UUID - if err := rows.Scan(&id); err != nil { + var workflowVersionId pgtype.UUID + if err := rows.Scan(&workflowVersionId); err != nil { return nil, err } - items = append(items, id) + items = append(items, workflowVersionId) } if err := rows.Err(); err != nil { return nil, err diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index da753494c..ced126b52 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -267,6 +267,7 @@ type stepRunEngineRepository struct { cf *server.ConfigFileRuntime cachedMinQueuedIds sync.Map cachedStepIdHasRateLimit *cache.Cache + cachedDesiredLabels *cache.Cache callbacks []repository.Callback[*dbsqlc.ResolveWorkflowRunStatusRow] bulkStatusBuffer *buffer.TenantBufferManager[*updateStepRunQueueData, pgtype.UUID] @@ -301,6 +302,7 @@ func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *ze queries: queries, cf: cf, cachedStepIdHasRateLimit: rlCache, + cachedDesiredLabels: rlCache, updateConcurrentFactor: cf.UpdateConcurrentFactor, maxHashFactor: cf.UpdateHashFactor, bulkEventBuffer: eventBuffer, @@ -1306,23 +1308,36 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, qlp *zerolo desiredLabels := make(map[string][]*dbsqlc.GetDesiredLabelsRow) hasDesired := false - // GET DESIRED LABELS - // OPTIMIZATION: CACHEABLE - stepIds := make([]pgtype.UUID, 0, len(stepIdSet)) + // GET KNOWN LABELS FROM CACHE + cacheMissStepIds := make([]pgtype.UUID, 0, len(stepIdSet)) for stepId := range stepIdSet { - stepIds = append(stepIds, sqlchelpers.UUIDFromStr(stepId)) + if knownLabels, ok := s.cachedDesiredLabels.Get(stepId); ok { + knownLabelsArr := knownLabels.([]*dbsqlc.GetDesiredLabelsRow) + + for _, label := range knownLabelsArr { + stepIdStr := sqlchelpers.UUIDToStr(label.StepId) + desiredLabels[stepIdStr] = knownLabelsArr + s.cachedDesiredLabels.Set(stepIdStr, knownLabelsArr) + } + } else { + cacheMissStepIds = append(cacheMissStepIds, sqlchelpers.UUIDFromStr(stepId)) + } } - labels, err := s.queries.GetDesiredLabels(ctx, tx, stepIds) + // GET CACHE MISS DESIRED LABELS + if len(cacheMissStepIds) > 0 { - if err != nil { - return emptyRes, fmt.Errorf("could not get desired labels: %w", err) - } + labels, err := s.queries.GetDesiredLabels(ctx, tx, cacheMissStepIds) - for _, label := range labels { - stepId := sqlchelpers.UUIDToStr(label.StepId) - desiredLabels[stepId] = labels - hasDesired = true + if err != nil { + return emptyRes, fmt.Errorf("could not get desired labels: %w", err) + } + + for _, label := range labels { + stepId := sqlchelpers.UUIDToStr(label.StepId) + desiredLabels[stepId] = labels + hasDesired = true + } } var workerLabels = make(map[string][]*dbsqlc.GetWorkerLabelsRow) diff --git a/pkg/scheduling/v2/queuer.go b/pkg/scheduling/v2/queuer.go index 8f808462c..39c80ebd1 100644 --- a/pkg/scheduling/v2/queuer.go +++ b/pkg/scheduling/v2/queuer.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" @@ -21,7 +20,7 @@ import ( ) type queuerRepo interface { - ListQueueItems(ctx context.Context) ([]*dbsqlc.QueueItem, error) + ListQueueItems(ctx context.Context, limit int) ([]*dbsqlc.QueueItem, error) MarkQueueItemsProcessed(ctx context.Context, r *assignResults) (succeeded []*AssignedQueueItem, failed []*AssignedQueueItem, err error) GetStepRunRateLimits(ctx context.Context, queueItems []*dbsqlc.QueueItem) (map[string]map[string]int32, error) GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*dbsqlc.GetDesiredLabelsRow, error) @@ -35,7 +34,6 @@ type queuerDbQueries struct { pool *pgxpool.Pool l *zerolog.Logger - limit pgtype.Int4 gtId pgtype.Int8 gtIdMu sync.RWMutex @@ -43,20 +41,16 @@ type queuerDbQueries struct { cachedStepIdHasRateLimit *cache.Cache } -func newQueueItemDbQueries(cf *sharedConfig, tenantId pgtype.UUID, eventBuffer *buffer.BulkEventWriter, queueName string, limit int32, +func newQueueItemDbQueries(cf *sharedConfig, tenantId pgtype.UUID, eventBuffer *buffer.BulkEventWriter, queueName string, ) (*queuerDbQueries, func()) { c := cache.New(5 * time.Minute) return &queuerDbQueries{ - tenantId: tenantId, - queueName: queueName, - queries: cf.queries, - pool: cf.pool, - l: cf.l, - eventBuffer: eventBuffer, - limit: pgtype.Int4{ - Int32: limit, - Valid: true, - }, + tenantId: tenantId, + queueName: queueName, + queries: cf.queries, + pool: cf.pool, + l: cf.l, + eventBuffer: eventBuffer, cachedStepIdHasRateLimit: c, }, c.Stop } @@ -80,53 +74,60 @@ func (d *queuerDbQueries) getMinId() pgtype.Int8 { return val } -func (d *queuerDbQueries) ListQueueItems(ctx context.Context) ([]*dbsqlc.QueueItem, error) { +func (d *queuerDbQueries) ListQueueItems(ctx context.Context, limit int) ([]*dbsqlc.QueueItem, error) { ctx, span := telemetry.NewSpan(ctx, "list-queue-items") defer span.End() - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, d.pool, d.l, 5000) - - if err != nil { - return nil, err - } - - defer rollback() + start := time.Now() + checkpoint := start - qis, err := d.queries.ListQueueItemsForQueue(ctx, tx, dbsqlc.ListQueueItemsForQueueParams{ + qis, err := d.queries.ListQueueItemsForQueue(ctx, d.pool, dbsqlc.ListQueueItemsForQueueParams{ Tenantid: d.tenantId, Queue: d.queueName, GtId: d.getMinId(), - Limit: d.limit, + Limit: pgtype.Int4{ + Int32: int32(limit), // nolint: gosec + Valid: true, + }, }) if err != nil { return nil, err } - qis, err = d.removeInvalidStepRuns(ctx, tx, qis) + if len(qis) == 0 { + return nil, nil + } + + listTime := time.Since(checkpoint) + checkpoint = time.Now() + + resQis, err := d.removeInvalidStepRuns(ctx, qis) if err != nil { return nil, err } - if err := commit(ctx); err != nil { - return nil, err + removeInvalidTime := time.Since(checkpoint) + + if sinceStart := time.Since(start); sinceStart > 100*time.Millisecond { + d.l.Warn().Dur( + "list", listTime, + ).Dur( + "remove_invalid", removeInvalidTime, + ).Msgf( + "listing %d queue items for queue %s took longer than 100ms (%s)", len(resQis), d.queueName, sinceStart.String(), + ) } - return qis, nil + return resQis, nil } // removeInvalidStepRuns removes all duplicate step runs and step runs which are in a finalized state from // the queue. It returns the remaining queue items and an error if one occurred. -func (s *queuerDbQueries) removeInvalidStepRuns(ctx context.Context, tx pgx.Tx, qis []*dbsqlc.QueueItem) ([]*dbsqlc.QueueItem, error) { +func (s *queuerDbQueries) removeInvalidStepRuns(ctx context.Context, qis []*dbsqlc.ListQueueItemsForQueueRow) ([]*dbsqlc.QueueItem, error) { if len(qis) == 0 { - return qis, nil - } - - currStepRunIds := make([]pgtype.UUID, len(qis)) - - for i, qi := range qis { - currStepRunIds[i] = qi.StepRunId + return nil, nil } // remove duplicates @@ -135,28 +136,25 @@ func (s *queuerDbQueries) removeInvalidStepRuns(ctx context.Context, tx pgx.Tx, cancelled := make([]int64, 0, len(qis)) for _, v := range qis { - stepRunId := sqlchelpers.UUIDToStr(v.StepRunId) + stepRunId := sqlchelpers.UUIDToStr(v.QueueItem.StepRunId) if encountered[stepRunId] { - cancelled = append(cancelled, v.ID) + cancelled = append(cancelled, v.QueueItem.ID) continue } encountered[stepRunId] = true - remaining1 = append(remaining1, v) + remaining1 = append(remaining1, &v.QueueItem) } - finalizedStepRuns, err := s.queries.GetFinalizedStepRuns(ctx, tx, currStepRunIds) + finalizedStepRunsMap := make(map[string]bool) - if err != nil { - return nil, err - } - - finalizedStepRunsMap := make(map[string]bool, len(finalizedStepRuns)) - - for _, sr := range finalizedStepRuns { - s.l.Warn().Msgf("step run %s is in state %s, skipping queueing", sqlchelpers.UUIDToStr(sr.ID), string(sr.Status)) - finalizedStepRunsMap[sqlchelpers.UUIDToStr(sr.ID)] = true + for _, v := range qis { + if v.Status == dbsqlc.StepRunStatusCANCELLED || v.Status == dbsqlc.StepRunStatusSUCCEEDED || v.Status == dbsqlc.StepRunStatusFAILED || v.Status == dbsqlc.StepRunStatusCANCELLING { + stepRunId := sqlchelpers.UUIDToStr(v.QueueItem.StepRunId) + s.l.Warn().Msgf("step run %s is in state %s, skipping queueing", stepRunId, string(v.Status)) + finalizedStepRunsMap[stepRunId] = true + } } // remove cancelled step runs from the queue items @@ -171,12 +169,28 @@ func (s *queuerDbQueries) removeInvalidStepRuns(ctx context.Context, tx pgx.Tx, remaining2 = append(remaining2, qi) } - if len(cancelled) > 0 { - err = s.queries.BulkQueueItems(ctx, tx, cancelled) + if len(cancelled) == 0 { + return remaining2, nil + } - if err != nil { - return nil, err - } + // If we've reached this point, we have queue items to cancel. We prepare a transaction in order + // to set a statement timeout. + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, s.pool, s.l, 5000) + + if err != nil { + return nil, err + } + + defer rollback() + + err = s.queries.BulkQueueItems(ctx, tx, cancelled) + + if err != nil { + return nil, err + } + + if err := commit(ctx); err != nil { + return nil, err } return remaining2, nil @@ -268,7 +282,7 @@ func (s *queuerDbQueries) bulkStepRunsUnassigned( func (s *queuerDbQueries) bulkStepRunsRateLimited( tenantId string, - rateLimits []*rateLimitResult, + rateLimits []*scheduleRateLimitResult, ) { for _, rlResult := range rateLimits { message := fmt.Sprintf( @@ -401,12 +415,14 @@ func (d *queuerDbQueries) MarkQueueItemsProcessed(ctx context.Context, r *assign return nil, nil, err } - // if we committed, we can update the min id - go d.updateMinId() + go func() { + // if we committed, we can update the min id + d.updateMinId() - d.bulkStepRunsAssigned(sqlchelpers.UUIDToStr(d.tenantId), time.Now().UTC(), stepRunIds, workerIds) - d.bulkStepRunsUnassigned(sqlchelpers.UUIDToStr(d.tenantId), unassignedStepRunIds) - d.bulkStepRunsRateLimited(sqlchelpers.UUIDToStr(d.tenantId), r.rateLimited) + d.bulkStepRunsAssigned(sqlchelpers.UUIDToStr(d.tenantId), time.Now().UTC(), stepRunIds, workerIds) + d.bulkStepRunsUnassigned(sqlchelpers.UUIDToStr(d.tenantId), unassignedStepRunIds) + d.bulkStepRunsRateLimited(sqlchelpers.UUIDToStr(d.tenantId), r.rateLimited) + }() workerIdToDispatcherId := make(map[string]pgtype.UUID, len(dispatcherIdWorkerIds)) @@ -730,6 +746,9 @@ type Queuer struct { cleanup func() isCleanedUp bool + + unackedMu rwMutex + unacked map[int64]struct{} } func newQueuer(conf *sharedConfig, tenantId pgtype.UUID, queueName string, s *Scheduler, eventBuffer *buffer.BulkEventWriter, resultsCh chan<- *QueueResults) *Queuer { @@ -739,7 +758,7 @@ func newQueuer(conf *sharedConfig, tenantId pgtype.UUID, queueName string, s *Sc defaultLimit = conf.singleQueueLimit } - repo, cleanupRepo := newQueueItemDbQueries(conf, tenantId, eventBuffer, queueName, int32(defaultLimit)) // nolint: gosec + repo, cleanupRepo := newQueueItemDbQueries(conf, tenantId, eventBuffer, queueName) notifyQueueCh := make(chan struct{}, 1) @@ -749,10 +768,12 @@ func newQueuer(conf *sharedConfig, tenantId pgtype.UUID, queueName string, s *Sc queueName: queueName, l: conf.l, s: s, - limit: 100, + limit: defaultLimit, resultsCh: resultsCh, notifyQueueCh: notifyQueueCh, queueMu: newMu(conf.l), + unackedMu: newRWMu(conf.l), + unacked: make(map[int64]struct{}), } ctx, cancel := context.WithCancel(context.Background()) @@ -804,14 +825,18 @@ func (q *Queuer) loopQueue(ctx context.Context) { } start := time.Now() - - qis, err := q.refillQueue(ctx, qis) + checkpoint := start + var err error + qis, err = q.refillQueue(ctx, qis) if err != nil { q.l.Error().Err(err).Msg("error refilling queue") continue } + refillTime := time.Since(checkpoint) + checkpoint = time.Now() + rls, err := q.repo.GetStepRunRateLimits(ctx, qis) if err != nil { @@ -819,6 +844,9 @@ func (q *Queuer) loopQueue(ctx context.Context) { continue } + rateLimitTime := time.Since(checkpoint) + checkpoint = time.Now() + stepIds := make([]pgtype.UUID, 0, len(qis)) for _, qi := range qis { @@ -832,17 +860,20 @@ func (q *Queuer) loopQueue(ctx context.Context) { continue } - timeToRefill := time.Since(start) + desiredLabelsTime := time.Since(checkpoint) + checkpoint = time.Now() assignCh := q.s.tryAssign(ctx, qis, labels, rls) count := 0 countMu := sync.Mutex{} - wg := sync.WaitGroup{} for r := range assignCh { + // synchronously add to unackedMu + q.addToUnacked(r) wg.Add(1) + // asynchronously flush to database go func() { defer wg.Done() @@ -860,45 +891,77 @@ func (q *Queuer) loopQueue(ctx context.Context) { }() } - wg.Wait() - + assignTime := time.Since(checkpoint) elapsed := time.Since(start) if elapsed > 100*time.Millisecond { - q.l.Warn().Msgf("queue %s took longer than 100ms (%s) to process %d items (time to refill %s)", q.queueName, elapsed, len(qis), timeToRefill) + q.l.Warn().Dur( + "refill_time", refillTime, + ).Dur( + "rate_limit_time", rateLimitTime, + ).Dur( + "desired_labels_time", desiredLabelsTime, + ).Dur( + "assign_time", assignTime, + ).Msgf("queue %s took longer than 100ms (%s) to process %d items", q.queueName, elapsed, len(qis)) } // if we processed all queue items, queue again - if len(qis) > 0 && count == len(qis) { - go q.queue() - } + go func() { + wg.Wait() + + countMu.Lock() + if len(qis) > 0 && count == len(qis) { + q.queue() + } + countMu.Unlock() + }() } } func (q *Queuer) refillQueue(ctx context.Context, curr []*dbsqlc.QueueItem) ([]*dbsqlc.QueueItem, error) { + q.unackedMu.RLock() + defer q.unackedMu.RUnlock() + // determine whether we need to replenish with the following cases: // - we last replenished more than 1 second ago // - if we are at less than 50% of the limit, we always attempt to replenish replenish := false now := time.Now() - if len(curr) < q.limit/2 { + if len(curr) < q.limit { replenish = true - } - - if q.lastReplenished != nil { + } else if q.lastReplenished != nil { if time.Since(*q.lastReplenished) > 990*time.Millisecond { replenish = true } } - if !replenish { - return curr, nil + if replenish { + q.lastReplenished = &now + limit := q.limit + len(q.unacked) + + if limit < 2*q.limit { + limit = 2 * q.limit + } + + var err error + curr, err = q.repo.ListQueueItems(ctx, limit) + + if err != nil { + return nil, err + } } - q.lastReplenished = &now + newCurr := make([]*dbsqlc.QueueItem, 0, len(curr)) + + for _, qi := range curr { + if _, ok := q.unacked[qi.ID]; !ok { + newCurr = append(newCurr, qi) + } + } - return q.repo.ListQueueItems(ctx) + return newCurr, nil } type QueueResults struct { @@ -910,7 +973,54 @@ type QueueResults struct { SchedulingTimedOut []string } +func (q *Queuer) addToUnacked(r *assignResults) { + q.unackedMu.Lock() + + for _, assignedItem := range r.assigned { + q.unacked[assignedItem.QueueItem.ID] = struct{}{} + } + + for _, unassignedItem := range r.unassigned { + q.unacked[unassignedItem.ID] = struct{}{} + } + + for _, schedulingTimedOutItem := range r.schedulingTimedOut { + q.unacked[schedulingTimedOutItem.ID] = struct{}{} + } + + for _, rateLimitedItem := range r.rateLimited { + q.unacked[rateLimitedItem.qi.ID] = struct{}{} + } + + q.unackedMu.Unlock() +} + +func (q *Queuer) ack(r *assignResults) { + q.unackedMu.Lock() + + for _, assignedItem := range r.assigned { + delete(q.unacked, assignedItem.QueueItem.ID) + } + + for _, unassignedItem := range r.unassigned { + delete(q.unacked, unassignedItem.ID) + } + + for _, schedulingTimedOutItem := range r.schedulingTimedOut { + delete(q.unacked, schedulingTimedOutItem.ID) + } + + for _, rateLimitedItem := range r.rateLimited { + delete(q.unacked, rateLimitedItem.qi.ID) + } + + q.unackedMu.Unlock() +} + func (q *Queuer) flushToDatabase(ctx context.Context, r *assignResults) int { + // no matter what, we always ack the items in the queuer + defer q.ack(r) + succeeded, failed, err := q.repo.MarkQueueItemsProcessed(ctx, r) if err != nil { diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index e34612515..a28b14646 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -368,6 +368,12 @@ func (s *Scheduler) start(ctx context.Context) { go s.loopReplenish(ctx) } +type scheduleRateLimitResult struct { + *rateLimitResult + + qi *dbsqlc.QueueItem +} + type assignSingleResult struct { workerId pgtype.UUID ackId int @@ -375,7 +381,7 @@ type assignSingleResult struct { noSlots bool succeeded bool - rateLimitResult *rateLimitResult + rateLimitResult *scheduleRateLimitResult } type tryAssignTiming struct { @@ -420,7 +426,10 @@ func (s *Scheduler) tryAssignSingleton( rlResult := s.rl.use(ctx, sqlchelpers.UUIDToStr(qi.StepRunId), rls) if !rlResult.succeeded { - res.rateLimitResult = &rlResult + res.rateLimitResult = &scheduleRateLimitResult{ + rateLimitResult: &rlResult, + qi: qi, + } return res, timing, nil } @@ -511,17 +520,11 @@ type AssignedQueueItem struct { DispatcherId *pgtype.UUID } -type erroredQueueItem struct { - QueueItem *dbsqlc.QueueItem - Err error -} - type assignResults struct { assigned []*AssignedQueueItem - errored []*erroredQueueItem unassigned []*dbsqlc.QueueItem schedulingTimedOut []*dbsqlc.QueueItem - rateLimited []*rateLimitResult + rateLimited []*scheduleRateLimitResult } func (s *Scheduler) tryAssign( @@ -560,10 +563,9 @@ func (s *Scheduler) tryAssign( defer wg.Done() start := time.Now() assigned := make([]*AssignedQueueItem, 0, len(qis)) - errored := make([]*erroredQueueItem, 0, len(qis)) unassigned := make([]*dbsqlc.QueueItem, 0, len(qis)) schedulingTimedOut := make([]*dbsqlc.QueueItem, 0, len(qis)) - rateLimited := make([]*rateLimitResult, 0, len(qis)) + rateLimited := make([]*scheduleRateLimitResult, 0, len(qis)) startAssignment := time.Now() @@ -644,7 +646,6 @@ func (s *Scheduler) tryAssign( resultsCh <- &assignResults{ assigned: assigned, - errored: errored, unassigned: unassigned, schedulingTimedOut: schedulingTimedOut, rateLimited: rateLimited, diff --git a/sql/constraints.sql b/sql/constraints.sql index 89908d40d..a6d39e8a4 100644 --- a/sql/constraints.sql +++ b/sql/constraints.sql @@ -16,3 +16,19 @@ CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_parentStepRunId" ON "Workfl -- Additional indexes on workflow run CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflowrun_concurrency ON "WorkflowRun" ("concurrencyGroupId", "createdAt"); CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflowrun_main ON "WorkflowRun" ("tenantId", "deletedAt", "status", "workflowVersionId", "createdAt"); + +-- Additional indexes on workflow +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_version_workflow_id_order +ON "WorkflowVersion" ("workflowId", "order" DESC) +WHERE "deletedAt" IS NULL; + +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_tenant_id +ON "Workflow" ("tenantId"); + +-- Additional indexes on WorkflowTriggers +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_triggers_workflow_version_id +ON "WorkflowTriggers" ("workflowVersionId"); + +-- Additional indexes on WorkflowTriggerEventRef +CREATE INDEX idx_workflow_trigger_event_ref_event_key_parent_id +ON "WorkflowTriggerEventRef" ("eventKey", "parentId"); diff --git a/sql/migrations/20241018142125_v0.50.1.sql b/sql/migrations/20241018142125_v0.50.1.sql new file mode 100644 index 000000000..91647558d --- /dev/null +++ b/sql/migrations/20241018142125_v0.50.1.sql @@ -0,0 +1,17 @@ +-- atlas:txmode none + +-- Additional indexes on workflow +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_version_workflow_id_order +ON "WorkflowVersion" ("workflowId", "order" DESC) +WHERE "deletedAt" IS NULL; + +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_tenant_id +ON "Workflow" ("tenantId"); + +-- Additional indexes on WorkflowTriggers +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_triggers_workflow_version_id +ON "WorkflowTriggers" ("workflowVersionId"); + +-- Additional indexes on WorkflowTriggerEventRef +CREATE INDEX idx_workflow_trigger_event_ref_event_key_parent_id +ON "WorkflowTriggerEventRef" ("eventKey", "parentId"); diff --git a/sql/migrations/atlas.sum b/sql/migrations/atlas.sum index 5a2434e74..096401497 100644 --- a/sql/migrations/atlas.sum +++ b/sql/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:FbbgXGJaNMxjq7xyF5tU7HkYU8hpFguvvSIKjY2lZpg= +h1:MZ3TtBTIFTmICM2AR0oo8HUsyM3U7B8B1P36oJMGXfE= 20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k= 20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo= 20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs= @@ -67,3 +67,4 @@ h1:FbbgXGJaNMxjq7xyF5tU7HkYU8hpFguvvSIKjY2lZpg= 20241008124038_v0.49.2.sql h1:YT40sN8Wtqh21emrzDMZIcvcOkipw+4fdwIoBpF+Dek= 20241011205314_0.49.3.sql h1:54yb4/20ab+eYc+5IKr+6tDrT0GEtwDkSfI5BJeHpjA= 20241014194326_v0.50.0.sql h1:kOoBOfouMgtyrRiN6xK2HNju4RMyEpk7xGN6XXZNlWE= +20241018142125_v0.50.1.sql h1:j0fNH72m40gU3v5e8XolQIALBm4Te0/I8uvezcZ5EbU= diff --git a/sql/schema/schema.sql b/sql/schema/schema.sql index 45a1a13ec..fa92636fa 100644 --- a/sql/schema/schema.sql +++ b/sql/schema/schema.sql @@ -1628,3 +1628,19 @@ CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_parentStepRunId" ON "Workfl -- Additional indexes on workflow run CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflowrun_concurrency ON "WorkflowRun" ("concurrencyGroupId", "createdAt"); CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflowrun_main ON "WorkflowRun" ("tenantId", "deletedAt", "status", "workflowVersionId", "createdAt"); + +-- Additional indexes on workflow +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_version_workflow_id_order +ON "WorkflowVersion" ("workflowId", "order" DESC) +WHERE "deletedAt" IS NULL; + +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_tenant_id +ON "Workflow" ("tenantId"); + +-- Additional indexes on WorkflowTriggers +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_triggers_workflow_version_id +ON "WorkflowTriggers" ("workflowVersionId"); + +-- Additional indexes on WorkflowTriggerEventRef +CREATE INDEX idx_workflow_trigger_event_ref_event_key_parent_id +ON "WorkflowTriggerEventRef" ("eventKey", "parentId");