Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply task manager backpressure whenever a 500 error is returned in the task store #198418

Merged
merged 1 commit into from
Oct 31, 2024

Conversation

mikecote
Copy link
Contributor

@mikecote mikecote commented Oct 30, 2024

In this PR, I'm making the task manager apply backpressure whenever a 500 error is returned in the task store (msearch or other SO I/O).

To verify

  1. Apply the following diff, run Kibana and notice logs about poll interval and capacity configuration changing
diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts
index 2b3440e87c0..d2ffaa2f50f 100644
--- a/x-pack/plugins/task_manager/server/task_store.ts
+++ b/x-pack/plugins/task_manager/server/task_store.ts
@@ -574,6 +574,8 @@ export class TaskStore {
     const versionMap = this.createVersionMap([]);
     let allTasks = new Array<ConcreteTaskInstance>();

+    responses[0].status = 500;
+
     for (const response of responses) {
       if (response.status !== 200) {
         const err = new MsearchError(response.status);
  1. Undo previous changes, apply the following diff, run Kibana and notice logs about poll interval and capacity configuration changing
diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts
index 2b3440e87c0..95d14152e1d 100644
--- a/x-pack/plugins/task_manager/server/task_store.ts
+++ b/x-pack/plugins/task_manager/server/task_store.ts
@@ -12,6 +12,7 @@ import murmurhash from 'murmurhash';
 import { v4 } from 'uuid';
 import { Subject } from 'rxjs';
 import { omit, defaults, get } from 'lodash';
+import { SavedObjectsErrorHelpers } from '@kbn/core/server';
 import { SavedObjectError } from '@kbn/core-saved-objects-common';

 import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
@@ -474,6 +475,7 @@ export class TaskStore {
   public async bulkGet(ids: string[]): Promise<BulkGetResult> {
     let result;
     try {
+      throw SavedObjectsErrorHelpers.decorateGeneralError(new Error('foo'));
       result = await this.savedObjectsRepository.bulkGet<SerializedConcreteTaskInstance>(
         ids.map((id) => ({ type: 'task', id }))
       );

@mikecote mikecote added Feature:Task Manager Team:ResponseOps Label for the ResponseOps team (formerly the Cases and Alerting teams) labels Oct 30, 2024
@mikecote mikecote self-assigned this Oct 30, 2024
@elasticmachine
Copy link
Contributor

💚 Build Succeeded

Metrics [docs]

✅ unchanged

cc @mikecote

@mikecote mikecote marked this pull request as ready for review October 31, 2024 12:18
@mikecote mikecote requested a review from a team as a code owner October 31, 2024 12:18
@elasticmachine
Copy link
Contributor

Pinging @elastic/response-ops (Team:ResponseOps)

@mikecote mikecote added release_note:skip Skip the PR/issue when compiling release notes v9.0.0 v8.17.0 backport:prev-minor Backport to (8.x) the previous minor version (i.e. one version back from main) labels Oct 31, 2024
@mikecote mikecote merged commit 424233e into elastic:main Oct 31, 2024
52 of 53 checks passed
@kibanamachine
Copy link
Contributor

Starting backport for target branches: 8.x

https://github.com/elastic/kibana/actions/runs/11612936076

kibanamachine pushed a commit to kibanamachine/kibana that referenced this pull request Oct 31, 2024
…he task store (elastic#198418)

In this PR, I'm making the task manager apply backpressure whenever a
500 error is returned in the task store (msearch or other SO I/O).

## To verify
1. Apply the following diff, run Kibana and notice logs about poll
interval and capacity configuration changing
```
diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts
index 2b3440e87c0..d2ffaa2f50f 100644
--- a/x-pack/plugins/task_manager/server/task_store.ts
+++ b/x-pack/plugins/task_manager/server/task_store.ts
@@ -574,6 +574,8 @@ export class TaskStore {
     const versionMap = this.createVersionMap([]);
     let allTasks = new Array<ConcreteTaskInstance>();

+    responses[0].status = 500;
+
     for (const response of responses) {
       if (response.status !== 200) {
         const err = new MsearchError(response.status);
```
2. Undo previous changes, apply the following diff, run Kibana and
notice logs about poll interval and capacity configuration changing
```
diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts
index 2b3440e87c0..95d14152e1d 100644
--- a/x-pack/plugins/task_manager/server/task_store.ts
+++ b/x-pack/plugins/task_manager/server/task_store.ts
@@ -12,6 +12,7 @@ import murmurhash from 'murmurhash';
 import { v4 } from 'uuid';
 import { Subject } from 'rxjs';
 import { omit, defaults, get } from 'lodash';
+import { SavedObjectsErrorHelpers } from '@kbn/core/server';
 import { SavedObjectError } from '@kbn/core-saved-objects-common';

 import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
@@ -474,6 +475,7 @@ export class TaskStore {
   public async bulkGet(ids: string[]): Promise<BulkGetResult> {
     let result;
     try {
+      throw SavedObjectsErrorHelpers.decorateGeneralError(new Error('foo'));
       result = await this.savedObjectsRepository.bulkGet<SerializedConcreteTaskInstance>(
         ids.map((id) => ({ type: 'task', id }))
       );
```

(cherry picked from commit 424233e)
@kibanamachine
Copy link
Contributor

💚 All backports created successfully

Status Branch Result
8.x

Note: Successful backport PRs will be merged automatically after passing CI.

Questions ?

Please refer to the Backport tool documentation

kibanamachine added a commit that referenced this pull request Oct 31, 2024
…d in the task store (#198418) (#198531)

# Backport

This will backport the following commits from `main` to `8.x`:
- [Apply task manager backpressure whenever a 500 error is returned in
the task store (#198418)](#198418)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Mike
Côté","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-10-31T13:34:42Z","message":"Apply
task manager backpressure whenever a 500 error is returned in the task
store (#198418)\n\nIn this PR, I'm making the task manager apply
backpressure whenever a\r\n500 error is returned in the task store
(msearch or other SO I/O).\r\n\r\n## To verify\r\n1. Apply the following
diff, run Kibana and notice logs about poll\r\ninterval and capacity
configuration changing\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_store.ts
b/x-pack/plugins/task_manager/server/task_store.ts\r\nindex
2b3440e87c0..d2ffaa2f50f 100644\r\n---
a/x-pack/plugins/task_manager/server/task_store.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_store.ts\r\n@@ -574,6 +574,8
@@ export class TaskStore {\r\n const versionMap =
this.createVersionMap([]);\r\n let allTasks = new
Array<ConcreteTaskInstance>();\r\n\r\n+ responses[0].status =
500;\r\n+\r\n for (const response of responses) {\r\n if
(response.status !== 200) {\r\n const err = new
MsearchError(response.status);\r\n```\r\n2. Undo previous changes, apply
the following diff, run Kibana and\r\nnotice logs about poll interval
and capacity configuration changing\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_store.ts
b/x-pack/plugins/task_manager/server/task_store.ts\r\nindex
2b3440e87c0..95d14152e1d 100644\r\n---
a/x-pack/plugins/task_manager/server/task_store.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_store.ts\r\n@@ -12,6 +12,7 @@
import murmurhash from 'murmurhash';\r\n import { v4 } from 'uuid';\r\n
import { Subject } from 'rxjs';\r\n import { omit, defaults, get } from
'lodash';\r\n+import { SavedObjectsErrorHelpers } from
'@kbn/core/server';\r\n import { SavedObjectError } from
'@kbn/core-saved-objects-common';\r\n\r\n import type * as estypes from
'@elastic/elasticsearch/lib/api/typesWithBodyKey';\r\n@@ -474,6 +475,7
@@ export class TaskStore {\r\n public async bulkGet(ids: string[]):
Promise<BulkGetResult> {\r\n let result;\r\n try {\r\n+ throw
SavedObjectsErrorHelpers.decorateGeneralError(new Error('foo'));\r\n
result = await
this.savedObjectsRepository.bulkGet<SerializedConcreteTaskInstance>(\r\n
ids.map((id) => ({ type: 'task', id }))\r\n
);\r\n```","sha":"424233e02645c5aecc4e03b01c2bd34377a96f4d","branchLabelMapping":{"^v9.0.0$":"main","^v8.17.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task
Manager","Team:ResponseOps","v9.0.0","backport:prev-minor","v8.17.0"],"title":"Apply
task manager backpressure whenever a 500 error is returned in the task
store","number":198418,"url":"https://github.com/elastic/kibana/pull/198418","mergeCommit":{"message":"Apply
task manager backpressure whenever a 500 error is returned in the task
store (#198418)\n\nIn this PR, I'm making the task manager apply
backpressure whenever a\r\n500 error is returned in the task store
(msearch or other SO I/O).\r\n\r\n## To verify\r\n1. Apply the following
diff, run Kibana and notice logs about poll\r\ninterval and capacity
configuration changing\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_store.ts
b/x-pack/plugins/task_manager/server/task_store.ts\r\nindex
2b3440e87c0..d2ffaa2f50f 100644\r\n---
a/x-pack/plugins/task_manager/server/task_store.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_store.ts\r\n@@ -574,6 +574,8
@@ export class TaskStore {\r\n const versionMap =
this.createVersionMap([]);\r\n let allTasks = new
Array<ConcreteTaskInstance>();\r\n\r\n+ responses[0].status =
500;\r\n+\r\n for (const response of responses) {\r\n if
(response.status !== 200) {\r\n const err = new
MsearchError(response.status);\r\n```\r\n2. Undo previous changes, apply
the following diff, run Kibana and\r\nnotice logs about poll interval
and capacity configuration changing\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_store.ts
b/x-pack/plugins/task_manager/server/task_store.ts\r\nindex
2b3440e87c0..95d14152e1d 100644\r\n---
a/x-pack/plugins/task_manager/server/task_store.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_store.ts\r\n@@ -12,6 +12,7 @@
import murmurhash from 'murmurhash';\r\n import { v4 } from 'uuid';\r\n
import { Subject } from 'rxjs';\r\n import { omit, defaults, get } from
'lodash';\r\n+import { SavedObjectsErrorHelpers } from
'@kbn/core/server';\r\n import { SavedObjectError } from
'@kbn/core-saved-objects-common';\r\n\r\n import type * as estypes from
'@elastic/elasticsearch/lib/api/typesWithBodyKey';\r\n@@ -474,6 +475,7
@@ export class TaskStore {\r\n public async bulkGet(ids: string[]):
Promise<BulkGetResult> {\r\n let result;\r\n try {\r\n+ throw
SavedObjectsErrorHelpers.decorateGeneralError(new Error('foo'));\r\n
result = await
this.savedObjectsRepository.bulkGet<SerializedConcreteTaskInstance>(\r\n
ids.map((id) => ({ type: 'task', id }))\r\n
);\r\n```","sha":"424233e02645c5aecc4e03b01c2bd34377a96f4d"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/198418","number":198418,"mergeCommit":{"message":"Apply
task manager backpressure whenever a 500 error is returned in the task
store (#198418)\n\nIn this PR, I'm making the task manager apply
backpressure whenever a\r\n500 error is returned in the task store
(msearch or other SO I/O).\r\n\r\n## To verify\r\n1. Apply the following
diff, run Kibana and notice logs about poll\r\ninterval and capacity
configuration changing\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_store.ts
b/x-pack/plugins/task_manager/server/task_store.ts\r\nindex
2b3440e87c0..d2ffaa2f50f 100644\r\n---
a/x-pack/plugins/task_manager/server/task_store.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_store.ts\r\n@@ -574,6 +574,8
@@ export class TaskStore {\r\n const versionMap =
this.createVersionMap([]);\r\n let allTasks = new
Array<ConcreteTaskInstance>();\r\n\r\n+ responses[0].status =
500;\r\n+\r\n for (const response of responses) {\r\n if
(response.status !== 200) {\r\n const err = new
MsearchError(response.status);\r\n```\r\n2. Undo previous changes, apply
the following diff, run Kibana and\r\nnotice logs about poll interval
and capacity configuration changing\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_store.ts
b/x-pack/plugins/task_manager/server/task_store.ts\r\nindex
2b3440e87c0..95d14152e1d 100644\r\n---
a/x-pack/plugins/task_manager/server/task_store.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_store.ts\r\n@@ -12,6 +12,7 @@
import murmurhash from 'murmurhash';\r\n import { v4 } from 'uuid';\r\n
import { Subject } from 'rxjs';\r\n import { omit, defaults, get } from
'lodash';\r\n+import { SavedObjectsErrorHelpers } from
'@kbn/core/server';\r\n import { SavedObjectError } from
'@kbn/core-saved-objects-common';\r\n\r\n import type * as estypes from
'@elastic/elasticsearch/lib/api/typesWithBodyKey';\r\n@@ -474,6 +475,7
@@ export class TaskStore {\r\n public async bulkGet(ids: string[]):
Promise<BulkGetResult> {\r\n let result;\r\n try {\r\n+ throw
SavedObjectsErrorHelpers.decorateGeneralError(new Error('foo'));\r\n
result = await
this.savedObjectsRepository.bulkGet<SerializedConcreteTaskInstance>(\r\n
ids.map((id) => ({ type: 'task', id }))\r\n
);\r\n```","sha":"424233e02645c5aecc4e03b01c2bd34377a96f4d"}},{"branch":"8.x","label":"v8.17.0","branchLabelMappingKey":"^v8.17.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Mike Côté <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport:prev-minor Backport to (8.x) the previous minor version (i.e. one version back from main) Feature:Task Manager release_note:skip Skip the PR/issue when compiling release notes Team:ResponseOps Label for the ResponseOps team (formerly the Cases and Alerting teams) v8.17.0 v9.0.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants