Skip to content

Commit

Permalink
[8.x] [Response Ops][Task Manager] Adding integration test to ensure …
Browse files Browse the repository at this point in the history
…no `WorkloadAggregator` errors when there are unrecognized task types. (#193479) (#194016)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Response Ops][Task Manager] Adding integration test to ensure no
`WorkloadAggregator` errors when there are unrecognized task
types. (#193479)](#193479)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Ying
Mao","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-09-25T14:22:11Z","message":"[Response
Ops][Task Manager] Adding integration test to ensure no
`WorkloadAggregator` errors when there are unrecognized task types.
(#193479)\n\nFixes
https://github.com/elastic/kibana-team/issues/1036\r\n\r\n##
Summary\r\n\r\nAdding integration test as RCA action for incident where
unrecognized\r\ntask types was causing issues generating the workload
portion of the\r\ntask manager health report.\r\n\r\n## To
verify\r\n\r\nAdd this line to your code to that will throw an error
when there are\r\nunrecognized task types when generating the health
report\r\n\r\n```\r\n---
a/x-pack/plugins/task_manager/server/task_type_dictionary.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_type_dictionary.ts\r\n@@
-128,6 +128,7 @@ export class TaskTypeDictionary {\r\n }\r\n\r\n public
get(type: string): TaskDefinition | undefined {\r\n+
this.ensureHas(type);\r\n return this.definitions.get(type);\r\n
}\r\n```\r\n\r\nRun the integration test `node
scripts/jest_integration.js\r\nx-pack/plugins/task_manager/server/integration_tests/removed_types.test.ts`\r\nand
see that it fails because a `WorkloadAggregator` error is
logged.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"01eae1556266c8377f6557f4ccacc53e0b4db7fc","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task
Manager","Team:ResponseOps","v9.0.0","backport:prev-minor","v8.16.0"],"title":"[Response
Ops][Task Manager] Adding integration test to ensure no
`WorkloadAggregator` errors when there are unrecognized task
types.","number":193479,"url":"https://github.com/elastic/kibana/pull/193479","mergeCommit":{"message":"[Response
Ops][Task Manager] Adding integration test to ensure no
`WorkloadAggregator` errors when there are unrecognized task types.
(#193479)\n\nFixes
https://github.com/elastic/kibana-team/issues/1036\r\n\r\n##
Summary\r\n\r\nAdding integration test as RCA action for incident where
unrecognized\r\ntask types was causing issues generating the workload
portion of the\r\ntask manager health report.\r\n\r\n## To
verify\r\n\r\nAdd this line to your code to that will throw an error
when there are\r\nunrecognized task types when generating the health
report\r\n\r\n```\r\n---
a/x-pack/plugins/task_manager/server/task_type_dictionary.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_type_dictionary.ts\r\n@@
-128,6 +128,7 @@ export class TaskTypeDictionary {\r\n }\r\n\r\n public
get(type: string): TaskDefinition | undefined {\r\n+
this.ensureHas(type);\r\n return this.definitions.get(type);\r\n
}\r\n```\r\n\r\nRun the integration test `node
scripts/jest_integration.js\r\nx-pack/plugins/task_manager/server/integration_tests/removed_types.test.ts`\r\nand
see that it fails because a `WorkloadAggregator` error is
logged.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"01eae1556266c8377f6557f4ccacc53e0b4db7fc"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/193479","number":193479,"mergeCommit":{"message":"[Response
Ops][Task Manager] Adding integration test to ensure no
`WorkloadAggregator` errors when there are unrecognized task types.
(#193479)\n\nFixes
https://github.com/elastic/kibana-team/issues/1036\r\n\r\n##
Summary\r\n\r\nAdding integration test as RCA action for incident where
unrecognized\r\ntask types was causing issues generating the workload
portion of the\r\ntask manager health report.\r\n\r\n## To
verify\r\n\r\nAdd this line to your code to that will throw an error
when there are\r\nunrecognized task types when generating the health
report\r\n\r\n```\r\n---
a/x-pack/plugins/task_manager/server/task_type_dictionary.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_type_dictionary.ts\r\n@@
-128,6 +128,7 @@ export class TaskTypeDictionary {\r\n }\r\n\r\n public
get(type: string): TaskDefinition | undefined {\r\n+
this.ensureHas(type);\r\n return this.definitions.get(type);\r\n
}\r\n```\r\n\r\nRun the integration test `node
scripts/jest_integration.js\r\nx-pack/plugins/task_manager/server/integration_tests/removed_types.test.ts`\r\nand
see that it fails because a `WorkloadAggregator` error is
logged.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"01eae1556266c8377f6557f4ccacc53e0b4db7fc"}},{"branch":"8.x","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Ying Mao <[email protected]>
  • Loading branch information
kibanamachine and ymao1 authored Sep 25, 2024
1 parent 9307e92 commit 1108910
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ describe('EphemeralTaskLifecycle', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe('managed configuration', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down Expand Up @@ -205,7 +205,7 @@ describe('managed configuration', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { v4 as uuidV4 } from 'uuid';
import { ElasticsearchClient } from '@kbn/core/server';
import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin';
import { injectTask, retry, setupTestServers } from './lib';
import { TestElasticsearchUtils, TestKibanaUtils } from '@kbn/core-test-helpers-kbn-server';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import { CreateWorkloadAggregatorOpts } from '../monitoring/workload_statistics';

const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');

const { createWorkloadAggregator: createWorkloadAggregatorMock } = jest.requireMock(
'../monitoring/workload_statistics'
);
jest.mock('../monitoring/workload_statistics', () => {
const actual = jest.requireActual('../monitoring/workload_statistics');
return {
...actual,
createWorkloadAggregator: jest.fn().mockImplementation((opts) => {
return new actual.createWorkloadAggregator(opts);
}),
};
});

describe('unrecognized task types', () => {
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
let taskManagerPlugin: TaskManagerStartContract;
let createWorkloadAggregatorOpts: CreateWorkloadAggregatorOpts;

const taskIdsToRemove: string[] = [];

beforeAll(async () => {
const setupResult = await setupTestServers({
xpack: {
task_manager: {
monitored_aggregated_stats_refresh_rate: 5000,
},
},
});
esServer = setupResult.esServer;
kibanaServer = setupResult.kibanaServer;

expect(taskManagerStartSpy).toHaveBeenCalledTimes(1);
taskManagerPlugin = taskManagerStartSpy.mock.results[0].value;

expect(createWorkloadAggregatorMock).toHaveBeenCalledTimes(1);
createWorkloadAggregatorOpts = createWorkloadAggregatorMock.mock.calls[0][0];
});

afterAll(async () => {
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});

beforeEach(async () => {
jest.clearAllMocks();
});

afterEach(async () => {
while (taskIdsToRemove.length > 0) {
const id = taskIdsToRemove.pop();
await taskManagerPlugin.removeIfExists(id!);
}
});

test('should be no workload aggregator errors when there are removed task types', async () => {
const errorLogSpy = jest.spyOn(createWorkloadAggregatorOpts.logger, 'error');
const removeTypeId = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: removeTypeId,
taskType: 'sampleTaskRemovedType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
const notRegisteredTypeId = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: notRegisteredTypeId,
taskType: 'sampleTaskNotRegisteredType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});

taskIdsToRemove.push(removeTypeId);
taskIdsToRemove.push(notRegisteredTypeId);

await retry(async () => {
const task = await getTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser);
expect(task?._source?.task?.status).toBe('unrecognized');
});

// monitored_aggregated_stats_refresh_rate is set to the minimum of 5 seconds
// so we want to wait that long to let it refresh
await new Promise((r) => setTimeout(r, 5100));

expect(errorLogSpy).not.toHaveBeenCalled();
});
});

async function getTask(esClient: ElasticsearchClient) {
const response = await esClient.search<{ task: ConcreteTaskInstance }>({
index: '.kibana_task_manager',
body: {
query: {
bool: {
filter: [
{
term: {
'task.taskType': 'sampleTaskRemovedType',
},
},
],
},
},
},
});

return response.hits.hits[0];
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const config = {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand All @@ -78,7 +78,7 @@ const getStatsWithTimestamp = ({
timestamp,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
status: HealthStatus.OK,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const config: TaskManagerConfig = {
},
version_conflict_threshold: 80,
worker_utilization_running_average_window: 5,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ function mockStats(
timestamp: new Date().toISOString(),
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 0,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('Configuration Statistics Aggregator', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand All @@ -75,7 +75,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 10,
as_cost: 20,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 6000000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand All @@ -94,7 +94,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 8,
as_cost: 16,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 6000000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand All @@ -113,7 +113,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 8,
as_cost: 16,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type ScheduledIntervals = ScheduleDensityResult['histogram']['buckets'][0];
// Set an upper bound just in case a customer sets a really high refresh rate
const MAX_SCHEDULE_DENSITY_BUCKETS = 50;

interface CreateWorkloadAggregatorOpts {
export interface CreateWorkloadAggregatorOpts {
taskStore: TaskStore;
elasticsearchAndSOAvailability$: Observable<boolean>;
refreshInterval: number;
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const pluginInitializerContextParams = {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe('TaskPollingLifecycle', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/routes/health.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ function mockHealthStats(overrides = {}) {
timestamp: new Date().toISOString(),
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
status: HealthStatus.OK,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down

0 comments on commit 1108910

Please sign in to comment.