Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ResponseOps][mget] Poll for tasks less frequently when the task load doesn't need it #200260

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
08b264e
Updating task manager to increase poll interval when utilization is low
doakalexi Nov 14, 2024
751ba85
Merge branch 'main' of github.com:elastic/kibana into task-manager/mg…
doakalexi Nov 14, 2024
9f86721
[CI] Auto-commit changed files from 'node scripts/eslint --no-cache -…
kibanamachine Nov 14, 2024
e22c2ba
Adding new tests
doakalexi Nov 14, 2024
cd3f0e3
Merge branch 'task-manager/mget-poll-interval' of github.com:doakalex…
doakalexi Nov 14, 2024
e7c2589
Adding new integration tests
doakalexi Nov 18, 2024
28aa867
Merge branch 'main' into task-manager/mget-poll-interval
doakalexi Nov 18, 2024
a3d7eb1
Fixing functional tests
doakalexi Nov 18, 2024
04219bf
Merge branch 'task-manager/mget-poll-interval' of github.com:doakalex…
doakalexi Nov 18, 2024
62cda35
Merge branch 'main' into task-manager/mget-poll-interval
doakalexi Nov 21, 2024
7433aa3
Merge branch 'main' into task-manager/mget-poll-interval
doakalexi Nov 25, 2024
6f26535
Merge branch 'main' of github.com:elastic/kibana into task-manager/mg…
doakalexi Nov 27, 2024
aa66b87
Merge branch 'main' into task-manager/mget-poll-interval
doakalexi Dec 2, 2024
48aa436
Merge branch 'task-manager/mget-poll-interval' of github.com:doakalex…
doakalexi Dec 4, 2024
7994fff
Adding log message when poll interval is reset
doakalexi Dec 4, 2024
ec2e546
Fixing flaky test
doakalexi Dec 4, 2024
c1138e3
Merge branch 'main' into task-manager/mget-poll-interval
doakalexi Dec 4, 2024
239d5b8
Merge branch 'main' of github.com:elastic/kibana into task-manager/mg…
doakalexi Jan 2, 2025
7e10912
Fixing bad changes from merge conflicts
doakalexi Jan 2, 2025
d6bd09e
Fixing more bad merge conflicts
doakalexi Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,33 @@ import sinon from 'sinon';
import { Client } from '@elastic/elasticsearch';
import { elasticsearchServiceMock, savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { SavedObjectsErrorHelpers, Logger } from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';
import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin';
import { coreMock } from '@kbn/core/server/mocks';
import { TaskManagerConfig } from '../config';
import { BulkUpdateError } from '../lib/bulk_update_error';

const mockTaskTypeRunFn = jest.fn();
const mockCreateTaskRunner = jest.fn();
const mockTaskType = {
title: '',
description: '',
stateSchemaByVersion: {
1: {
up: (state: Record<string, unknown>) => ({ ...state, baz: state.baz || '' }),
schema: schema.object({
foo: schema.string(),
bar: schema.string(),
baz: schema.string(),
}),
},
},
createTaskRunner: mockCreateTaskRunner.mockImplementation(() => ({
run: mockTaskTypeRunFn,
})),
};

describe('managed configuration', () => {
let taskManagerStart: TaskManagerStartContract;
let logger: Logger;
Expand All @@ -36,56 +57,58 @@ describe('managed configuration', () => {
},
};

const config = {
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
level: 'debug' as const,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 4000,
monitored_stats_running_average_window: 50,
request_capacity: 1000,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
auto_calculate_default_ech_capacity: false,
};

afterEach(() => clock.restore());

describe('managed poll interval', () => {
describe('managed poll interval with default claim strategy', () => {
beforeEach(async () => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
level: 'debug' as const,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 4000,
monitored_stats_running_average_window: 50,
request_capacity: 1000,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
auto_calculate_default_ech_capacity: false,
});
const context = coreMock.createPluginInitializerContext<TaskManagerConfig>(config);
logger = context.logger.get('taskManager');

const taskManager = new TaskManagerPlugin(context);
Expand Down Expand Up @@ -184,54 +207,122 @@ describe('managed configuration', () => {
});
});

describe('managed capacity with default claim strategy', () => {
describe('managed poll interval with mget claim strategy', () => {
beforeEach(async () => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
level: 'debug' as const,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 4000,
monitored_stats_running_average_window: 50,
request_capacity: 1000,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
auto_calculate_default_ech_capacity: false,
...config,
poll_interval: 500,
claim_strategy: 'mget',
});
logger = context.logger.get('taskManager');

const taskManager = new TaskManagerPlugin(context);
(
await taskManager.setup(coreMock.createSetup(), { usageCollection: undefined })
).registerTaskDefinitions({
fooType: mockTaskType,
});

const coreStart = coreMock.createStart();
coreStart.elasticsearch = esStart;
esStart.client.asInternalUser.child.mockReturnValue(
esStart.client.asInternalUser as unknown as Client
);
coreStart.savedObjects.createInternalRepository.mockReturnValue(savedObjectsClient);
taskManagerStart = taskManager.start(coreStart, {});

// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
clock.tick(0);
});

test('should increase poll interval when Elasticsearch returns 429 error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);

// Cause "too many requests" error to be thrown
await expect(
taskManagerStart.schedule({
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 500 to 600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 600ms');
});

test('should increase poll interval when Elasticsearch returns "cannot execute [inline] scripts" error', async () => {
const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked<Client>;
childEsClient.search.mockImplementationOnce(async () => {
throw inlineScriptError;
});

await expect(taskManagerStart.fetch({})).rejects.toThrowErrorMatchingInlineSnapshot(
`"cannot execute [inline] scripts\\" error"`
);

clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 500 to 600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 600ms');
});

test('should increase poll interval TM untilization is low', async () => {
savedObjectsClient.create.mockImplementationOnce(
async (type: string, attributes: unknown) => ({
id: 'testid',
type,
attributes,
references: [],
version: '123',
})
);

await taskManagerStart.schedule({
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
});

mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
});

clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after a decrease in the task load.'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 500 to 3000 after seeing a change in the task load.'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3000ms');
});
});

describe('managed capacity with default claim strategy', () => {
beforeEach(async () => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>(config);
logger = context.logger.get('taskManager');

const taskManager = new TaskManagerPlugin(context);
Expand Down Expand Up @@ -312,47 +403,8 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
level: 'debug' as const,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 4000,
monitored_stats_running_average_window: 50,
request_capacity: 1000,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
...config,
claim_strategy: 'mget',
request_timeouts: {
update_by_query: 1000,
},
auto_calculate_default_ech_capacity: false,
});
logger = context.logger.get('taskManager');

Expand Down
Loading