Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat cache labels #976

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,13 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m
}

// recheck the tenant queue
ec.checkTenantQueue(ctx, metadata.TenantId, "", false, true)
sr, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId)

if err != nil {
return fmt.Errorf("could not get step run: %w", err)
}

ec.checkTenantQueue(ctx, metadata.TenantId, sr.SRQueue, false, true)

return nil
}
Expand Down Expand Up @@ -1030,7 +1036,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
}

// check the queue on failure
defer ec.checkTenantQueue(ctx, tenantId, "", false, true)
defer ec.checkTenantQueue(ctx, tenantId, oldStepRun.SRQueue, false, true)

// determine if step run should be retried or not
shouldRetry := oldStepRun.SRRetryCount < oldStepRun.StepRetries
Expand Down
4 changes: 4 additions & 0 deletions internal/services/controllers/jobs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ func (q *queue) handleCheckQueue(ctx context.Context, task *msgqueue.Message) er
case payload.IsStepQueued && payload.QueueName != "":
q.scheduler.Queue(ctx, metadata.TenantId, payload.QueueName)
case payload.IsSlotReleased:
if payload.QueueName != "" {
q.scheduler.Queue(ctx, metadata.TenantId, payload.QueueName)
}

q.scheduler.Replenish(ctx, metadata.TenantId)
q.updateStepRunV2Operations.RunOrContinue(metadata.TenantId)
default:
Expand Down
5 changes: 4 additions & 1 deletion pkg/repository/prisma/dbsqlc/queue.sql
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,12 @@ FOR UPDATE SKIP LOCKED;

-- name: ListQueueItemsForQueue :many
SELECT
*
sqlc.embed(qi),
sr."status"
FROM
"QueueItem" qi
JOIN
"StepRun" sr ON qi."stepRunId" = sr."id"
WHERE
qi."isQueued" = true
AND qi."tenantId" = @tenantId::uuid
Expand Down
41 changes: 25 additions & 16 deletions pkg/repository/prisma/dbsqlc/queue.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 23 additions & 22 deletions pkg/repository/prisma/dbsqlc/workflows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -354,29 +354,30 @@ INSERT INTO "WorkflowTriggerScheduledRef" (
) RETURNING *;

-- name: ListWorkflowsForEvent :many
SELECT DISTINCT ON ("WorkflowVersion"."workflowId") "WorkflowVersion".id
FROM "WorkflowVersion"
LEFT JOIN "Workflow" AS j1 ON j1.id = "WorkflowVersion"."workflowId"
LEFT JOIN "WorkflowTriggers" AS j2 ON j2."workflowVersionId" = "WorkflowVersion"."id"
-- Get all of the latest workflow versions for the tenant
WITH latest_versions AS (
SELECT DISTINCT ON("workflowId")
workflowVersions."id" AS "workflowVersionId"
FROM
"WorkflowVersion" as workflowVersions
JOIN
"Workflow" as workflow ON workflow."id" = workflowVersions."workflowId"
WHERE
workflow."tenantId" = @tenantId::uuid
AND workflowVersions."deletedAt" IS NULL
ORDER BY "workflowId", "order" DESC
)
-- select the workflow versions that have the event trigger
SELECT
latest_versions."workflowVersionId"
FROM
latest_versions
JOIN
"WorkflowTriggers" as triggers ON triggers."workflowVersionId" = latest_versions."workflowVersionId"
JOIN
"WorkflowTriggerEventRef" as eventRef ON eventRef."parentId" = triggers."id"
WHERE
(j1."tenantId"::uuid = @tenantId AND j1.id IS NOT NULL)
AND j1."deletedAt" IS NULL
AND "WorkflowVersion"."deletedAt" IS NULL
AND
(j2.id IN (
SELECT t3."parentId"
FROM "WorkflowTriggerEventRef" AS t3
WHERE t3."eventKey" = @eventKey AND t3."parentId" IS NOT NULL
) AND j2.id IS NOT NULL)
AND "WorkflowVersion".id = (
-- confirm that the workflow version is the latest
SELECT wv2.id
FROM "WorkflowVersion" wv2
WHERE wv2."workflowId" = "WorkflowVersion"."workflowId"
ORDER BY wv2."order" DESC
LIMIT 1
)
ORDER BY "WorkflowVersion"."workflowId", "WorkflowVersion"."order" DESC;
eventRef."eventKey" = @eventKey::text;

-- name: GetWorkflowVersionForEngine :many
SELECT
Expand Down
55 changes: 28 additions & 27 deletions pkg/repository/prisma/dbsqlc/workflows.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 27 additions & 12 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ type stepRunEngineRepository struct {
cf *server.ConfigFileRuntime
cachedMinQueuedIds sync.Map
cachedStepIdHasRateLimit *cache.Cache
cachedDesiredLabels *cache.Cache
callbacks []repository.Callback[*dbsqlc.ResolveWorkflowRunStatusRow]

bulkStatusBuffer *buffer.TenantBufferManager[*updateStepRunQueueData, pgtype.UUID]
Expand Down Expand Up @@ -301,6 +302,7 @@ func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *ze
queries: queries,
cf: cf,
cachedStepIdHasRateLimit: rlCache,
cachedDesiredLabels: rlCache,
updateConcurrentFactor: cf.UpdateConcurrentFactor,
maxHashFactor: cf.UpdateHashFactor,
bulkEventBuffer: eventBuffer,
Expand Down Expand Up @@ -1306,23 +1308,36 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, qlp *zerolo
desiredLabels := make(map[string][]*dbsqlc.GetDesiredLabelsRow)
hasDesired := false

// GET DESIRED LABELS
// OPTIMIZATION: CACHEABLE
stepIds := make([]pgtype.UUID, 0, len(stepIdSet))
// GET KNOWN LABELS FROM CACHE
cacheMissStepIds := make([]pgtype.UUID, 0, len(stepIdSet))
for stepId := range stepIdSet {
stepIds = append(stepIds, sqlchelpers.UUIDFromStr(stepId))
if knownLabels, ok := s.cachedDesiredLabels.Get(stepId); ok {
knownLabelsArr := knownLabels.([]*dbsqlc.GetDesiredLabelsRow)

for _, label := range knownLabelsArr {
stepIdStr := sqlchelpers.UUIDToStr(label.StepId)
desiredLabels[stepIdStr] = knownLabelsArr
s.cachedDesiredLabels.Set(stepIdStr, knownLabelsArr)
}
} else {
cacheMissStepIds = append(cacheMissStepIds, sqlchelpers.UUIDFromStr(stepId))
}
}

labels, err := s.queries.GetDesiredLabels(ctx, tx, stepIds)
// GET CACHE MISS DESIRED LABELS
if len(cacheMissStepIds) > 0 {

if err != nil {
return emptyRes, fmt.Errorf("could not get desired labels: %w", err)
}
labels, err := s.queries.GetDesiredLabels(ctx, tx, cacheMissStepIds)

for _, label := range labels {
stepId := sqlchelpers.UUIDToStr(label.StepId)
desiredLabels[stepId] = labels
hasDesired = true
if err != nil {
return emptyRes, fmt.Errorf("could not get desired labels: %w", err)
}

for _, label := range labels {
stepId := sqlchelpers.UUIDToStr(label.StepId)
desiredLabels[stepId] = labels
hasDesired = true
}
}

var workerLabels = make(map[string][]*dbsqlc.GetWorkerLabelsRow)
Expand Down
Loading
Loading