diff --git a/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts b/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts new file mode 100644 index 0000000000000..f7e0552e5a738 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class BulkUpdateError extends Error { + private _statusCode: number; + private _type: string; + + constructor({ + statusCode, + message = 'Bulk update failed with unknown reason', + type, + }: { + statusCode: number; + message?: string; + type: string; + }) { + super(message); + this._statusCode = statusCode; + this._type = type; + } + + public get statusCode() { + return this._statusCode; + } + + public get type() { + return this._type; + } +} + +export function getBulkUpdateStatusCode(error: Error | BulkUpdateError): number | undefined { + if (Boolean(error && error instanceof BulkUpdateError)) { + return (error as BulkUpdateError).statusCode; + } +} + +export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string | undefined { + if (Boolean(error && error instanceof BulkUpdateError)) { + return (error as BulkUpdateError).type; + } +} diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts index 1da1bb11d1c5d..d453edc8e7003 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts @@ -15,6 +15,7 @@ import { import { mockLogger } from '../test_utils'; import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config'; import { MsearchError } from './msearch_error'; +import { BulkUpdateError } from './bulk_update_error'; describe('createManagedConfiguration()', () => { let clock: sinon.SinonFakeTimers; @@ -280,6 +281,45 @@ describe('createManagedConfiguration()', () => { expect(subscription).toHaveBeenNthCalledWith(2, 8); }); + test('should decrease configuration at the next interval when a bulkPartialUpdate 429 error is emitted', async () => { + const { subscription, errors$ } = setupScenario(10); + errors$.next( + new BulkUpdateError({ statusCode: 429, message: 'test', type: 'too_many_requests' }) + ); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + expect(subscription).toHaveBeenNthCalledWith(1, 10); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 8); + }); + + test('should decrease configuration at the next interval when a bulkPartialUpdate 500 error is emitted', async () => { + const { subscription, errors$ } = setupScenario(10); + errors$.next( + new BulkUpdateError({ statusCode: 500, message: 'test', type: 'server_error' }) + ); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + expect(subscription).toHaveBeenNthCalledWith(1, 10); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 8); + }); + + test('should decrease configuration at the next interval when a bulkPartialUpdate 503 error is emitted', async () => { + const { subscription, errors$ } = setupScenario(10); + errors$.next( + new BulkUpdateError({ statusCode: 503, message: 'test', type: 'unavailable' }) + ); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + expect(subscription).toHaveBeenNthCalledWith(1, 10); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 8); + }); + test('should not change configuration at the next interval when other msearch error is emitted', async () => { const { subscription, errors$ } = setupScenario(10); errors$.next(new MsearchError(404)); diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts index d13c2511a2a2b..00736f2c36cdb 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts @@ -13,6 +13,7 @@ import { isEsCannotExecuteScriptError } from './identify_es_error'; import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config'; import { TaskCost } from '../task'; import { getMsearchStatusCode } from './msearch_error'; +import { getBulkUpdateStatusCode } from './bulk_update_error'; const FLUSH_MARKER = Symbol('flush'); export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000; @@ -169,7 +170,10 @@ function countErrors(errors$: Observable, countInterval: number): Observa isEsCannotExecuteScriptError(e) || getMsearchStatusCode(e) === 429 || getMsearchStatusCode(e) === 500 || - getMsearchStatusCode(e) === 503 + getMsearchStatusCode(e) === 503 || + getBulkUpdateStatusCode(e) === 429 || + getBulkUpdateStatusCode(e) === 500 || + getBulkUpdateStatusCode(e) === 503 ) ) ).pipe( diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index f1374f6d27b76..2238381552861 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -1290,6 +1290,62 @@ describe('TaskStore', () => { ); expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); }); + + test('pushes errors returned by the saved objects client to errors$', async () => { + const task = { + id: '324242', + version: 'WzQsMV0=', + attempts: 3, + }; + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + + esClient.bulk.mockResolvedValue({ + errors: true, + items: [ + { + update: { + _id: '1', + _index: 'test-index', + status: 403, + error: { reason: 'Error reason', type: 'cluster_block_exception' }, + }, + }, + ], + took: 10, + }); + + await store.bulkPartialUpdate([task]); + + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Error reason]`); + }); + + test('pushes errors for the malformed responses to errors$', async () => { + const task = { + id: '324242', + version: 'WzQsMV0=', + attempts: 3, + }; + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + + esClient.bulk.mockResolvedValue({ + errors: false, + items: [ + { + update: { + _index: 'test-index', + status: 200, + }, + }, + ], + took: 10, + }); + + await store.bulkPartialUpdate([task]); + + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: malformed response]`); + }); }); describe('remove', () => { diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 2b3440e87c0f8..b7f1cec3f5567 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -48,6 +48,7 @@ import { claimSort } from './queries/mark_available_tasks_as_claimed'; import { MAX_PARTITIONS } from './lib/task_partitioner'; import { ErrorOutput } from './lib/bulk_operation_buffer'; import { MsearchError } from './lib/msearch_error'; +import { BulkUpdateError } from './lib/bulk_update_error'; export interface StoreOpts { esClient: ElasticsearchClient; @@ -386,11 +387,19 @@ export class TaskStore { } return result.items.map((item) => { + const malformedResponseType = 'malformed response'; + if (!item.update || !item.update._id) { + const err = new BulkUpdateError({ + message: malformedResponseType, + type: malformedResponseType, + statusCode: 500, + }); + this.errors$.next(err); return asErr({ type: 'task', id: 'unknown', - error: { type: 'malformed response' }, + error: { type: malformedResponseType }, }); } @@ -399,6 +408,12 @@ export class TaskStore { : item.update._id; if (item.update?.error) { + const err = new BulkUpdateError({ + message: item.update.error.reason, + type: item.update.error.type, + statusCode: item.update.status, + }); + this.errors$.next(err); return asErr({ type: 'task', id: docId,