Skip to content

Commit

Permalink
Pass bulkPartialUpdate errors to taskStore.errors (elastic#198606)
Browse files Browse the repository at this point in the history
Resolves: elastic#198428
  • Loading branch information
ersin-erdal authored and mgadewoll committed Nov 7, 2024
1 parent 8b31e6a commit e14ea30
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 2 deletions.
45 changes: 45 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_update_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export class BulkUpdateError extends Error {
private _statusCode: number;
private _type: string;

constructor({
statusCode,
message = 'Bulk update failed with unknown reason',
type,
}: {
statusCode: number;
message?: string;
type: string;
}) {
super(message);
this._statusCode = statusCode;
this._type = type;
}

public get statusCode() {
return this._statusCode;
}

public get type() {
return this._type;
}
}

export function getBulkUpdateStatusCode(error: Error | BulkUpdateError): number | undefined {
if (Boolean(error && error instanceof BulkUpdateError)) {
return (error as BulkUpdateError).statusCode;
}
}

export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string | undefined {
if (Boolean(error && error instanceof BulkUpdateError)) {
return (error as BulkUpdateError).type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
import { MsearchError } from './msearch_error';
import { BulkUpdateError } from './bulk_update_error';

describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
Expand Down Expand Up @@ -280,6 +281,45 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when a bulkPartialUpdate 429 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 429, message: 'test', type: 'too_many_requests' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when a bulkPartialUpdate 500 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 500, message: 'test', type: 'server_error' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when a bulkPartialUpdate 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 503, message: 'test', type: 'unavailable' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should not change configuration at the next interval when other msearch error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError(404));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { isEsCannotExecuteScriptError } from './identify_es_error';
import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config';
import { TaskCost } from '../task';
import { getMsearchStatusCode } from './msearch_error';
import { getBulkUpdateStatusCode } from './bulk_update_error';

const FLUSH_MARKER = Symbol('flush');
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
Expand Down Expand Up @@ -169,7 +170,10 @@ function countErrors(errors$: Observable<Error>, countInterval: number): Observa
isEsCannotExecuteScriptError(e) ||
getMsearchStatusCode(e) === 429 ||
getMsearchStatusCode(e) === 500 ||
getMsearchStatusCode(e) === 503
getMsearchStatusCode(e) === 503 ||
getBulkUpdateStatusCode(e) === 429 ||
getBulkUpdateStatusCode(e) === 500 ||
getBulkUpdateStatusCode(e) === 503
)
)
).pipe(
Expand Down
56 changes: 56 additions & 0 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,62 @@ describe('TaskStore', () => {
);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});

test('pushes errors returned by the saved objects client to errors$', async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};

const firstErrorPromise = store.errors$.pipe(first()).toPromise();

esClient.bulk.mockResolvedValue({
errors: true,
items: [
{
update: {
_id: '1',
_index: 'test-index',
status: 403,
error: { reason: 'Error reason', type: 'cluster_block_exception' },
},
},
],
took: 10,
});

await store.bulkPartialUpdate([task]);

expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Error reason]`);
});

test('pushes errors for the malformed responses to errors$', async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};

const firstErrorPromise = store.errors$.pipe(first()).toPromise();

esClient.bulk.mockResolvedValue({
errors: false,
items: [
{
update: {
_index: 'test-index',
status: 200,
},
},
],
took: 10,
});

await store.bulkPartialUpdate([task]);

expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: malformed response]`);
});
});

describe('remove', () => {
Expand Down
17 changes: 16 additions & 1 deletion x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { claimSort } from './queries/mark_available_tasks_as_claimed';
import { MAX_PARTITIONS } from './lib/task_partitioner';
import { ErrorOutput } from './lib/bulk_operation_buffer';
import { MsearchError } from './lib/msearch_error';
import { BulkUpdateError } from './lib/bulk_update_error';

export interface StoreOpts {
esClient: ElasticsearchClient;
Expand Down Expand Up @@ -386,11 +387,19 @@ export class TaskStore {
}

return result.items.map((item) => {
const malformedResponseType = 'malformed response';

if (!item.update || !item.update._id) {
const err = new BulkUpdateError({
message: malformedResponseType,
type: malformedResponseType,
statusCode: 500,
});
this.errors$.next(err);
return asErr({
type: 'task',
id: 'unknown',
error: { type: 'malformed response' },
error: { type: malformedResponseType },
});
}

Expand All @@ -399,6 +408,12 @@ export class TaskStore {
: item.update._id;

if (item.update?.error) {
const err = new BulkUpdateError({
message: item.update.error.reason,
type: item.update.error.type,
statusCode: item.update.status,
});
this.errors$.next(err);
return asErr({
type: 'task',
id: docId,
Expand Down

0 comments on commit e14ea30

Please sign in to comment.