Skip to content

Latest commit

 

History

History
86 lines (51 loc) · 4.58 KB

tasking_system.md

File metadata and controls

86 lines (51 loc) · 4.58 KB

Tasking System

The tasking system enables our app to complete actions, i.e. tasks, asynchronously from the backend API. For example, when creating a repository, a “snapshot” task is triggered, and begins to run in parallel to the API server.

For a detailed overview of the tasking system see this video and the accompanying slide deck.

Features

  • Queue and process asynchronous tasks
  • Check the status of queued tasks
  • Requeue a task if its worker times-out or exits early, with a backoff timer
  • Schedule dependent tasks
  • Cancel a task
  • Set a task's priority

Directory structure

Package Description
pkg/tasks each file contains code for handling a particular task type
pkg/tasks/queue queue used by client and worker to schedule tasks
pkg/tasks/client an interface to enqueue a task
pkg/tasks/worker an interface to dequeue and handle tasks
pkg/tasks/payloads workaround to import certain payloads to dao layer, but payloads are not generally defined here

Concepts

Queue

Queue is an interface used by the client and worker packages for scheduling tasks. It is meant to be used through client or worker, not imported independently.

type Queue interface {
// Enqueue Enqueues a job
Enqueue(task *Task) (uuid.UUID, error)
// Dequeue Dequeues a job of a type in taskTypes, blocking until one is available.
Dequeue(ctx context.Context, taskTypes []string) (*models.TaskInfo, error)
// Status returns Status of the given task
Status(taskId uuid.UUID) (*models.TaskInfo, error)
// Finish finishes given task, setting status to completed or failed if taskError is not nil
Finish(taskId uuid.UUID, taskError error) error
// Requeue requeues the given task
Requeue(taskId uuid.UUID) error
// Heartbeats returns the tokens of all tasks older than given duration
Heartbeats(olderThan time.Duration) []uuid.UUID
// IdFromToken returns a task's ID given its token
IdFromToken(token uuid.UUID) (id uuid.UUID, isRunning bool, err error)
// RefreshHeartbeat refresh heartbeat of task given its token
RefreshHeartbeat(token uuid.UUID) error
// UpdatePayload update the payload on a task
UpdatePayload(task *models.TaskInfo, payload interface{}) (*models.TaskInfo, error)
// ListenForCancel registers a channel and listens for notification for given task, then calls cancelFunc on receive. Should run as goroutine.
ListenForCancel(ctx context.Context, taskID uuid.UUID, cancelFunc context.CancelCauseFunc)
// SendCancelNotification sends notification to cancel given task
SendCancelNotification(ctx context.Context, taskId uuid.UUID) error
// RequeueFailedTasks requeues all failed tasks of taskTypes to the queue
RequeueFailedTasks(taskTypes []string) error
}

Client

TaskClient is an interface for enqueuing or canceling tasks.

type TaskClient interface {
Enqueue(task queue.Task) (uuid.UUID, error)
SendCancelNotification(ctx context.Context, taskId string) error
}

Worker Pool

TaskWorkerPool is an interface used by the main application to configure and start the workers and the heartbeat listener.

type TaskWorkerPool interface {
// StartWorkers Starts workers up to number numWorkers defined in config.
// Should be run as a go routine.
StartWorkers(ctx context.Context)
// Stop Gracefully stops all workers
Stop()
// HeartbeatListener requeues tasks of workers whose heartbeats do not refresh within heartbeat duration
HeartbeatListener()
// RegisterHandler assigns a function of type TaskHandler to a typename.
// This function is the action performed to tasks of typename taskType.
RegisterHandler(taskType string, handler TaskHandler)
}
A worker pool will manage the individual workers. Workers are meant to be used through the TaskWorkerPool interface, not directly.

Each worker is a goroutine that follows the logic loop below:

image

Task Cancellation

Tasks can be cancelled. Every worker listens on a postgres channel, named using the task ID. If that channel receives a notification from the client, the worker's current task is cancelled.

image

Deployment

The tasking system runs in two different processes, the API and the consumer.

The API is the main API server, where tasks are enqueued from endpoint handlers.

image

The consumer runs two sets of goroutines: the workers and the heartbeat listener.

image

How to add a new task type

To add a new task you must define a handler method. Each handler method should end with a Run() method that performs the task. Tasks should be written to be idempotent i.e. they can be re-run without causing errors.

Here is the snapshot handler as an example:

func SnapshotHandler(ctx context.Context, task *models.TaskInfo, queue *queue.Queue) error {
opts := payloads.SnapshotPayload{}
if err := json.Unmarshal(task.Payload, &opts); err != nil {
return fmt.Errorf("payload incorrect type for Snapshot")
}
logger := LogForTask(task.Id.String(), task.Typename, task.RequestID)
daoReg := dao.GetDaoRegistry(db.DB)
domainName, err := daoReg.Domain.FetchOrCreateDomain(ctx, task.OrgId)
if err != nil {
return err
}
pulpClient := pulp_client.GetPulpClientWithDomain(domainName)
sr := SnapshotRepository{
orgId: task.OrgId,
domainName: domainName,
repositoryUUID: task.RepositoryUUID,
daoReg: daoReg,
pulpClient: pulpClient,
task: task,
payload: &opts,
queue: queue,
ctx: ctx,
logger: logger,
}
return sr.Run()
}

To make a task cancellable, it must be added to the list of cancellable tasks. If a cleanup action is required to support cancellation, this should be implemented as a defer call in the Run() method. See how the snapshot task does cancellation cleanup here:

defer func() {
if errors.Is(err, context.Canceled) {
cleanupErr := sr.cleanupOnCancel()
if cleanupErr != nil {
sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot")
}
}
}()

Once a handler is created, it needs to be registered to the worker pool. We register our tasks here:

wrk := worker.NewTaskWorkerPool(&pgqueue, metrics)
wrk.RegisterHandler(config.IntrospectTask, tasks.IntrospectHandler)
wrk.RegisterHandler(config.RepositorySnapshotTask, tasks.SnapshotHandler)
wrk.RegisterHandler(config.DeleteRepositorySnapshotsTask, tasks.DeleteSnapshotHandler)
wrk.RegisterHandler(config.DeleteTemplatesTask, tasks.DeleteTemplateHandler)
wrk.RegisterHandler(config.UpdateTemplateContentTask, tasks.UpdateTemplateContentHandler)
wrk.RegisterHandler(config.UpdateRepositoryTask, tasks.UpdateRepositoryHandler)

See here for a list of all current task types:

const (
RepositorySnapshotTask = "snapshot" // Task to create a snapshot for a repository config
DeleteRepositorySnapshotsTask = "delete-repository-snapshots" // Task to delete all snapshots for a repository config
IntrospectTask = "introspect" // Task to introspect repository
DeleteTemplatesTask = "delete-templates" // Task to delete all content templates marked for deletion
UpdateTemplateContentTask = "update-template-content" // Task to update the pulp distributions of a template's snapshots
UpdateRepositoryTask = "update-repository" // Task to update repository information in candlepin when the repository is updated
)