-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ResponseOps][TaskManager] fix limited concurrency starvation in mget task claimer #187809
Conversation
/ci |
/ci |
💛 Build succeeded, but was flaky
Failed CI Steps
Test Failures
Metrics [docs]
History
|
/ci |
/ci |
Pinging @elastic/response-ops (Team:ResponseOps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left round of comments after code review. Will verify it next!
|
||
for (const response of responses) { | ||
if (response.status !== 200) { | ||
throw new Error(`Unexpected status code: ${response.status}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we pass this error to this.errors$
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, though it's making me wonder, with the weird partial result error stuff from CCS calls, should we just skip over these? If just one of the queries is bad for some reason, but the other ones were ok, and that was consistent, we'd never pull any tasks. Vs pulling tasks for everything but one of the "inner searches" failing.
I guess we'll figure that out ... :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 842d402
@@ -504,6 +505,36 @@ export class TaskStore { | |||
} | |||
} | |||
|
|||
async msearch(opts: SearchOpts[] = []): Promise<FetchResult> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a unit test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note this is NOT in 842d402, as I wanted the functional changes in, I think this test will likely be pretty hairy, and could probably be deferred (but taking a look right now!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 4b357c1.
x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts
Show resolved
Hide resolved
} | ||
|
||
const capacity = getCapacity(definition.type); | ||
result.limitedTypes.set(definition.type, capacity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check for capacity=0 and not add to this map to avoid issuing a query with size 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 842d402
RecognizedTask | ||
); | ||
|
||
const query = matchesClauses(queryForLimitedTasks, filterDownBy(InactiveTasks)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to add tasksWithPartitions
to this clause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 842d402
} | ||
|
||
const capacity = getCapacity(definition.type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The capacity that's returned is actually now returned in cost (for the mget claim strategy), so for a normal cost task with maxConcurrency=1
, it'll return 2
. To convert capacity to a "number of tasks we can search for", I would divide this by the cost of the task:
const capacity = getCapacity(definition.type) / definition.cost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 842d402
|
||
const { types, excludedTypes, removedTypes, getCapacity, definitions } = opts; | ||
for (const type of types) { | ||
if (excludedTypes.has(type)) continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed while adding an integration test #189431 that this uses slightly different logic than the default task claimer and doesn't respect wildcards. I think we should use the same function used for the default task claimer. Updated in my integration test PR so one of us will have a conflict!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up fixing this in the last main merge, since that was part of what the merge conflicted with.
@@ -15,23 +15,6 @@ import { | |||
MustNotCondition, | |||
} from './query_clauses'; | |||
|
|||
export function taskWithLessThanMaxAttempts(type: string, maxAttempts: number): MustCondition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed a few lingering references to search-related things regarding tasks running too many attempts. I believe this got resolved in #152841; though not sure if that applies to recurring tasks. @mikecote @ymao1 ??? In any case, this function was no longer being used, so figured I might as well delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I don't think we enforced anything with max attempts for recurring task types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 shouldn't be used for recurring tasks, only ad-hoc (one time) tasks
@elasticmachine merge upstream |
@elasticmachine merge upstream |
@elasticmachine merge upstream |
query, | ||
sort, | ||
size, | ||
size: capacity, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wonder if we should add a size multiplier here to account for possible conflicts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we should as the same concept for mget applies here. I'll add that in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 48806ca.
@@ -167,7 +166,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi | |||
} | |||
|
|||
// apply limited concurrency limits (TODO: can currently starve other tasks) | |||
const candidateTasks = applyLimitedConcurrency(currentTasks, batches); | |||
const candidateTasks = selectTasksByCapacity(currentTasks, batches); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wonder if we still need this since we're searching directly using the msearch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think because we will now apply the SIZE_MULTIPLIER_FOR_TASK_FETCH
multiplier, we'll need to replicate the concurrency limitations in Kibana. I think this function will still be necessary but looking at the code, it should still consider the available capacity (tasks currently running).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed offline and given the code works with concurrency of 1
, we can follow up the work to fix the code when concurrencies > 1 #191301
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
💛 Build succeeded, but was flaky
Failed CI StepsTest Failures
Metrics [docs]
History
To update your PR or re-run it, just comment with: |
resolves #184937
Summary
Fixes problem with limited concurrency tasks potentially starving unlimited concurrency tasks, by using
_msearch
to search limited concurrency tasks separately from unlimited concurrency tasks.Checklist