Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecote committed Oct 29, 2024
1 parent 962f731 commit 33c2cdc
Showing 1 changed file with 88 additions and 66 deletions.
154 changes: 88 additions & 66 deletions x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
const { getCapacity, claimOwnershipUntil, batches, events$, taskStore, taskPartitioner } = opts;
const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts;
const logger = createWrappedLogger({ logger: opts.logger, tags: [claimAvailableTasksMget.name] });
const initialCapacity = getCapacity();
let initialCapacity = getCapacity();
const stopTaskTimer = startTaskTimer();

const removedTypes = new Set(unusedTypes); // REMOVED_TYPES
Expand Down Expand Up @@ -126,7 +126,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
const docLatestVersions = await taskStore.getDocVersions(docs.map((doc) => `task:${doc.id}`));

// filter out stale, missing and removed tasks
const currentTasks: ConcreteTaskInstance[] = [];
const tasksToClaim: ConcreteTaskInstance[] = [];
const staleTasks: ConcreteTaskInstance[] = [];
const missingTasks: ConcreteTaskInstance[] = [];
const removedTasks: ConcreteTaskInstance[] = [];
Expand All @@ -148,78 +148,91 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
searchVersion.seqNo === latestVersion.seqNo &&
searchVersion.primaryTerm === latestVersion.primaryTerm
) {
currentTasks.push(searchDoc);
tasksToClaim.push(searchDoc);
continue;
} else {
staleTasks.push(searchDoc);
continue;
}
}

// apply limited concurrency limits (TODO: can currently starve other tasks)
const candidateTasks = selectTasksByCapacity(currentTasks, batches);

// apply capacity constraint to candidate tasks
const tasksToRun: ConcreteTaskInstance[] = [];
const leftOverTasks: ConcreteTaskInstance[] = [];

let capacityAccumulator = 0;
for (const task of candidateTasks) {
const taskCost = definitions.get(task.taskType)?.cost ?? TaskCost.Normal;
if (capacityAccumulator + taskCost <= initialCapacity) {
tasksToRun.push(task);
capacityAccumulator += taskCost;
} else {
leftOverTasks.push(task);
capacityAccumulator = initialCapacity;
}
}

// build the updated task objects we'll claim
const now = new Date();
const taskUpdates: PartialConcreteTaskInstance[] = [];
for (const task of tasksToRun) {
taskUpdates.push({
id: task.id,
version: task.version,
scheduledAt:
task.retryAt != null && new Date(task.retryAt).getTime() < Date.now()
? task.retryAt
: task.runAt,
status: TaskStatus.Running,
startedAt: now,
attempts: task.attempts + 1,
retryAt: getRetryAt(task, definitions.get(task.taskType)) ?? null,
ownerId: taskStore.taskManagerId,
});
}

// perform the task object updates, deal with errors
const updatedTaskIds: string[] = [];
const fullTasksToRun: ConcreteTaskInstance[] = [];
let conflicts = staleTasks.length;
let bulkUpdateErrors = 0;
let bulkGetErrors = 0;
while (tasksToClaim.length) {
// Reset - for following loops
tasksToClaim.push(...leftOverTasks);
leftOverTasks.length = 0;

console.log('LOOP', JSON.stringify(tasksToClaim, null, 2));

// apply limited concurrency limits (TODO: can currently starve other tasks)
const candidateTasks = selectTasksByCapacity(tasksToClaim, batches);
console.log('candidateTasks', JSON.stringify(candidateTasks, null, 2));
if (candidateTasks.length === 0) {
tasksToClaim.length = 0;
break;
}

const updateResults = await taskStore.bulkPartialUpdate(taskUpdates);
for (const updateResult of updateResults) {
if (isOk(updateResult)) {
updatedTaskIds.push(updateResult.value.id);
} else {
const { id, type, error, status } = updateResult.error;
// apply capacity constraint to candidate tasks
let capacityAccumulator = 0;
for (const task of candidateTasks) {
const taskCost = definitions.get(task.taskType)?.cost ?? TaskCost.Normal;
if (capacityAccumulator + taskCost <= initialCapacity) {
tasksToRun.push(task);
capacityAccumulator += taskCost;
} else {
leftOverTasks.push(task);
capacityAccumulator = initialCapacity;
}
}

// check for 409 conflict errors
if (status === 409) {
conflicts++;
// build the updated task objects we'll claim
const now = new Date();
const taskUpdates: PartialConcreteTaskInstance[] = [];
for (const task of tasksToRun) {
taskUpdates.push({
id: task.id,
version: task.version,
scheduledAt:
task.retryAt != null && new Date(task.retryAt).getTime() < Date.now()
? task.retryAt
: task.runAt,
status: TaskStatus.Running,
startedAt: now,
attempts: task.attempts + 1,
retryAt: getRetryAt(task, definitions.get(task.taskType)) ?? null,
ownerId: taskStore.taskManagerId,
});
}

// perform the task object updates, deal with errors
const updatedTaskIds: string[] = [];

const updateResults = await taskStore.bulkPartialUpdate(taskUpdates);
for (const updateResult of updateResults) {
if (isOk(updateResult)) {
updatedTaskIds.push(updateResult.value.id);
} else {
logger.error(`Error updating task ${id}:${type} during claim: ${JSON.stringify(error)}`);
bulkUpdateErrors++;
const { id, type, error, status } = updateResult.error;

// check for 409 conflict errors
if (status === 409) {
conflicts++;
} else {
logger.error(`Error updating task ${id}:${type} during claim: ${JSON.stringify(error)}`);
bulkUpdateErrors++;
}
}
}
}

// perform an mget to get the full task instance for claiming
const fullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce<ConcreteTaskInstance[]>(
(acc, task) => {
// perform an mget to get the full task instance for claiming
const partialFullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce<
ConcreteTaskInstance[]
>((acc, task) => {
if (isOk(task)) {
acc.push(task.value);
} else {
Expand All @@ -228,17 +241,26 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
bulkGetErrors++;
}
return acc;
},
[]
);
}, []);
fullTasksToRun.push(...partialFullTasksToRun);

// Look for tasks that have a null startedAt value, log them and manually set a startedAt field
for (const task of fullTasksToRun) {
if (task.startedAt == null) {
logger.warn(
`Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}`
);
task.startedAt = now;
for (const task of partialFullTasksToRun) {
const index = tasksToClaim.findIndex((item) => item.id === task.id);
if (index !== -1) {
tasksToClaim.splice(index, 1);
}
const taskCost = definitions.get(task.taskType)?.cost ?? TaskCost.Normal;
initialCapacity -= taskCost;
}

// Look for tasks that have a null startedAt value, log them and manually set a startedAt field
for (const task of fullTasksToRun) {
if (task.startedAt == null) {
logger.warn(
`Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}`
);
task.startedAt = now;
}
}
}

Expand Down

0 comments on commit 33c2cdc

Please sign in to comment.