From 216f8996214cb93c2a44ff85fa844f8e428017b7 Mon Sep 17 00:00:00 2001 From: Ersin Erdal <92688503+ersin-erdal@users.noreply.github.com> Date: Thu, 31 Oct 2024 20:30:16 +0100 Subject: [PATCH] Push the total request-body-bytes to usage-api (#194429) Resolves: https://github.com/elastic/response-ops-team/issues/209 This PR is a follow-on of https://github.com/elastic/kibana/pull/186804. Creates a new task that runs every 1 hour to push the total connector-request-body-bytes that have been saved in the event log to usage-api. --- x-pack/plugins/actions/kibana.jsonc | 3 +- .../actions_client/actions_client.test.ts | 4 + .../actions/server/actions_config.test.ts | 5 +- x-pack/plugins/actions/server/config.test.ts | 9 + x-pack/plugins/actions/server/config.ts | 19 +- .../axios_utils_connection.test.ts | 5 +- .../axios_utils_proxy.test.ts | 5 +- .../server/lib/custom_host_settings.test.ts | 5 +- x-pack/plugins/actions/server/plugin.test.ts | 15 + x-pack/plugins/actions/server/plugin.ts | 15 + .../connector_usage_reporting_task.test.ts | 394 ++++++++++++++++++ .../usage/connector_usage_reporting_task.ts | 309 ++++++++++++++ x-pack/plugins/actions/server/usage/types.ts | 15 + x-pack/plugins/actions/tsconfig.json | 3 +- .../check_registered_task_types.ts | 1 + 15 files changed, 792 insertions(+), 15 deletions(-) create mode 100644 x-pack/plugins/actions/server/usage/connector_usage_reporting_task.test.ts create mode 100644 x-pack/plugins/actions/server/usage/connector_usage_reporting_task.ts diff --git a/x-pack/plugins/actions/kibana.jsonc b/x-pack/plugins/actions/kibana.jsonc index 3cb9e8bfd79c5..882c832245951 100644 --- a/x-pack/plugins/actions/kibana.jsonc +++ b/x-pack/plugins/actions/kibana.jsonc @@ -26,7 +26,8 @@ "spaces", "security", "monitoringCollection", - "serverless" + "serverless", + "cloud" ], "extraPublicDirs": [ "common" diff --git a/x-pack/plugins/actions/server/actions_client/actions_client.test.ts b/x-pack/plugins/actions/server/actions_client/actions_client.test.ts index fbb64fd8303c9..3421e381d07b6 100644 --- a/x-pack/plugins/actions/server/actions_client/actions_client.test.ts +++ b/x-pack/plugins/actions/server/actions_client/actions_client.test.ts @@ -51,6 +51,7 @@ import { OAuthParams } from '../routes/get_oauth_access_token'; import { eventLogClientMock } from '@kbn/event-log-plugin/server/event_log_client.mock'; import { GetGlobalExecutionKPIParams, GetGlobalExecutionLogParams } from '../../common'; import { estypes } from '@elastic/elasticsearch'; +import { DEFAULT_USAGE_API_URL } from '../config'; jest.mock('@kbn/core-saved-objects-utils-server', () => { const actual = jest.requireActual('@kbn/core-saved-objects-utils-server'); @@ -588,6 +589,9 @@ describe('create()', () => { microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL, microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE, microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL, + usage: { + url: DEFAULT_USAGE_API_URL, + }, }); const localActionTypeRegistryParams = { diff --git a/x-pack/plugins/actions/server/actions_config.test.ts b/x-pack/plugins/actions/server/actions_config.test.ts index a6966e0e85c40..2b5c4efc283b6 100644 --- a/x-pack/plugins/actions/server/actions_config.test.ts +++ b/x-pack/plugins/actions/server/actions_config.test.ts @@ -6,7 +6,7 @@ */ import { ByteSizeValue } from '@kbn/config-schema'; -import { ActionsConfig } from './config'; +import { ActionsConfig, DEFAULT_USAGE_API_URL } from './config'; import { DEFAULT_MICROSOFT_EXCHANGE_URL, DEFAULT_MICROSOFT_GRAPH_API_SCOPE, @@ -42,6 +42,9 @@ const defaultActionsConfig: ActionsConfig = { microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL, microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE, microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL, + usage: { + url: DEFAULT_USAGE_API_URL, + }, }; describe('ensureUriAllowed', () => { diff --git a/x-pack/plugins/actions/server/config.test.ts b/x-pack/plugins/actions/server/config.test.ts index 5adc9c18b07a7..4034fc5cb50b5 100644 --- a/x-pack/plugins/actions/server/config.test.ts +++ b/x-pack/plugins/actions/server/config.test.ts @@ -38,6 +38,9 @@ describe('config validation', () => { "proxyRejectUnauthorizedCertificates": true, "rejectUnauthorized": true, "responseTimeout": "PT1M", + "usage": Object { + "url": "https://usage-api.usage-api/api/v1/usage", + }, } `); }); @@ -85,6 +88,9 @@ describe('config validation', () => { "proxyRejectUnauthorizedCertificates": false, "rejectUnauthorized": false, "responseTimeout": "PT1M", + "usage": Object { + "url": "https://usage-api.usage-api/api/v1/usage", + }, } `); }); @@ -225,6 +231,9 @@ describe('config validation', () => { "proxyVerificationMode": "none", "verificationMode": "none", }, + "usage": Object { + "url": "https://usage-api.usage-api/api/v1/usage", + }, } `); }); diff --git a/x-pack/plugins/actions/server/config.ts b/x-pack/plugins/actions/server/config.ts index d806bde1fa227..f475c05424df4 100644 --- a/x-pack/plugins/actions/server/config.ts +++ b/x-pack/plugins/actions/server/config.ts @@ -72,6 +72,8 @@ const connectorTypeSchema = schema.object({ maxAttempts: schema.maybe(schema.number({ min: MIN_MAX_ATTEMPTS, max: MAX_MAX_ATTEMPTS })), }); +export const DEFAULT_USAGE_API_URL = 'https://usage-api.usage-api/api/v1/usage'; + // We leverage enabledActionTypes list by allowing the other plugins to overwrite it by using "setEnabledConnectorTypes" in the plugin setup. // The list can be overwritten only if it's not already been set in the config. const enabledConnectorTypesSchema = schema.arrayOf( @@ -145,15 +147,14 @@ export const configSchema = schema.object({ max: schema.maybe(schema.number({ min: MIN_QUEUED_MAX, defaultValue: DEFAULT_QUEUED_MAX })), }) ), - usage: schema.maybe( - schema.object({ - ca: schema.maybe( - schema.object({ - path: schema.string(), - }) - ), - }) - ), + usage: schema.object({ + url: schema.string({ defaultValue: DEFAULT_USAGE_API_URL }), + ca: schema.maybe( + schema.object({ + path: schema.string(), + }) + ), + }), }); export type ActionsConfig = TypeOf; diff --git a/x-pack/plugins/actions/server/integration_tests/axios_utils_connection.test.ts b/x-pack/plugins/actions/server/integration_tests/axios_utils_connection.test.ts index 200656d339ac3..3a4101bb9f152 100644 --- a/x-pack/plugins/actions/server/integration_tests/axios_utils_connection.test.ts +++ b/x-pack/plugins/actions/server/integration_tests/axios_utils_connection.test.ts @@ -20,7 +20,7 @@ import { ByteSizeValue } from '@kbn/config-schema'; import { Logger } from '@kbn/core/server'; import { loggingSystemMock } from '@kbn/core/server/mocks'; import { createReadySignal } from '@kbn/event-log-plugin/server/lib/ready_signal'; -import { ActionsConfig } from '../config'; +import { ActionsConfig, DEFAULT_USAGE_API_URL } from '../config'; import { ActionsConfigurationUtilities, getActionsConfigurationUtilities } from '../actions_config'; import { resolveCustomHosts } from '../lib/custom_host_settings'; import { @@ -691,6 +691,9 @@ const BaseActionsConfig: ActionsConfig = { microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL, microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE, microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL, + usage: { + url: DEFAULT_USAGE_API_URL, + }, }; function getACUfromConfig(config: Partial = {}): ActionsConfigurationUtilities { diff --git a/x-pack/plugins/actions/server/integration_tests/axios_utils_proxy.test.ts b/x-pack/plugins/actions/server/integration_tests/axios_utils_proxy.test.ts index f29b2a9855186..1c1d411111253 100644 --- a/x-pack/plugins/actions/server/integration_tests/axios_utils_proxy.test.ts +++ b/x-pack/plugins/actions/server/integration_tests/axios_utils_proxy.test.ts @@ -20,7 +20,7 @@ import { ByteSizeValue } from '@kbn/config-schema'; import { Logger } from '@kbn/core/server'; import { loggingSystemMock } from '@kbn/core/server/mocks'; import { createReadySignal } from '@kbn/event-log-plugin/server/lib/ready_signal'; -import { ActionsConfig } from '../config'; +import { ActionsConfig, DEFAULT_USAGE_API_URL } from '../config'; import { ActionsConfigurationUtilities, getActionsConfigurationUtilities } from '../actions_config'; import { resolveCustomHosts } from '../lib/custom_host_settings'; import { @@ -597,6 +597,9 @@ const BaseActionsConfig: ActionsConfig = { microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL, microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE, microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL, + usage: { + url: DEFAULT_USAGE_API_URL, + }, }; function getACUfromConfig(config: Partial = {}): ActionsConfigurationUtilities { diff --git a/x-pack/plugins/actions/server/lib/custom_host_settings.test.ts b/x-pack/plugins/actions/server/lib/custom_host_settings.test.ts index 818d7fb9bcd0a..0a9d9c6df31e7 100644 --- a/x-pack/plugins/actions/server/lib/custom_host_settings.test.ts +++ b/x-pack/plugins/actions/server/lib/custom_host_settings.test.ts @@ -10,7 +10,7 @@ import { resolve as pathResolve, join as pathJoin } from 'path'; import { ByteSizeValue } from '@kbn/config-schema'; import moment from 'moment'; -import { ActionsConfig } from '../config'; +import { ActionsConfig, DEFAULT_USAGE_API_URL } from '../config'; import { Logger } from '@kbn/core/server'; import { loggingSystemMock } from '@kbn/core/server/mocks'; @@ -82,6 +82,9 @@ describe('custom_host_settings', () => { microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL, microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE, microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL, + usage: { + url: DEFAULT_USAGE_API_URL, + }, }; test('ensure it copies over the config parts that it does not touch', () => { diff --git a/x-pack/plugins/actions/server/plugin.test.ts b/x-pack/plugins/actions/server/plugin.test.ts index 89efb80867fd7..4ff87aa0459ef 100644 --- a/x-pack/plugins/actions/server/plugin.test.ts +++ b/x-pack/plugins/actions/server/plugin.test.ts @@ -30,6 +30,7 @@ import { DEFAULT_MICROSOFT_GRAPH_API_SCOPE, DEFAULT_MICROSOFT_GRAPH_API_URL, } from '../common'; +import { cloudMock } from '@kbn/cloud-plugin/server/mocks'; const executor: ExecutorType<{}, {}, {}, void> = async (options) => { return { status: 'ok', actionId: options.actionId }; @@ -59,6 +60,9 @@ function getConfig(overrides = {}) { microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL, microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE, microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL, + usage: { + url: 'ca.path', + }, ...overrides, }; } @@ -84,6 +88,9 @@ describe('Actions Plugin', () => { microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL, microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE, microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL, + usage: { + url: 'ca.path', + }, }); plugin = new ActionsPlugin(context); coreSetup = coreMock.createSetup(); @@ -95,6 +102,7 @@ describe('Actions Plugin', () => { eventLog: eventLogMock.createSetup(), usageCollection: usageCollectionPluginMock.createSetupContract(), features: featuresPluginMock.createSetup(), + cloud: cloudMock.createSetup(), }; coreSetup.getStartServices.mockResolvedValue([ coreMock.createStart(), @@ -347,6 +355,7 @@ describe('Actions Plugin', () => { eventLog: eventLogMock.createSetup(), usageCollection: usageCollectionPluginMock.createSetupContract(), features: featuresPluginMock.createSetup(), + cloud: cloudMock.createSetup(), }; } @@ -374,6 +383,7 @@ describe('Actions Plugin', () => { usageCollection: usageCollectionPluginMock.createSetupContract(), features: featuresPluginMock.createSetup(), serverless: serverlessPluginMock.createSetupContract(), + cloud: cloudMock.createSetup(), }; } @@ -585,6 +595,9 @@ describe('Actions Plugin', () => { microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL, microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE, microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL, + usage: { + url: 'ca.path', + }, }); plugin = new ActionsPlugin(context); coreSetup = coreMock.createSetup(); @@ -596,6 +609,7 @@ describe('Actions Plugin', () => { eventLog: eventLogMock.createSetup(), usageCollection: usageCollectionPluginMock.createSetupContract(), features: featuresPluginMock.createSetup(), + cloud: cloudMock.createSetup(), }; pluginsStart = { licensing: licensingMock.createStart(), @@ -680,6 +694,7 @@ describe('Actions Plugin', () => { eventLog: eventLogMock.createSetup(), usageCollection: usageCollectionPluginMock.createSetupContract(), features: featuresPluginMock.createSetup(), + cloud: cloudMock.createSetup(), }; pluginsStart = { licensing: licensingMock.createStart(), diff --git a/x-pack/plugins/actions/server/plugin.ts b/x-pack/plugins/actions/server/plugin.ts index 36fc4b68d443c..57304c176c13d 100644 --- a/x-pack/plugins/actions/server/plugin.ts +++ b/x-pack/plugins/actions/server/plugin.ts @@ -42,6 +42,7 @@ import { import { MonitoringCollectionSetup } from '@kbn/monitoring-collection-plugin/server'; import { ServerlessPluginSetup, ServerlessPluginStart } from '@kbn/serverless/server'; +import type { CloudSetup } from '@kbn/cloud-plugin/server'; import { ActionsConfig, AllowedHosts, EnabledConnectorTypes, getValidatedConfig } from './config'; import { resolveCustomHosts } from './lib/custom_host_settings'; import { events } from './lib/event_based_telemetry'; @@ -108,6 +109,7 @@ import type { IUnsecuredActionsClient } from './unsecured_actions_client/unsecur import { UnsecuredActionsClient } from './unsecured_actions_client/unsecured_actions_client'; import { createBulkUnsecuredExecutionEnqueuerFunction } from './create_unsecured_execute_function'; import { createSystemConnectors } from './create_system_actions'; +import { ConnectorUsageReportingTask } from './usage/connector_usage_reporting_task'; export interface PluginSetupContract { registerType< @@ -180,6 +182,7 @@ export interface ActionsPluginsSetup { spaces?: SpacesPluginSetup; monitoringCollection?: MonitoringCollectionSetup; serverless?: ServerlessPluginSetup; + cloud: CloudSetup; } export interface ActionsPluginsStart { @@ -214,6 +217,7 @@ export class ActionsPlugin implements Plugin {}); + return { isActionTypeEnabled: (id, options = { notifyUsage: false }) => { return this.actionTypeRegistry!.isActionTypeEnabled(id, options); diff --git a/x-pack/plugins/actions/server/usage/connector_usage_reporting_task.test.ts b/x-pack/plugins/actions/server/usage/connector_usage_reporting_task.test.ts new file mode 100644 index 0000000000000..77dec7f15e156 --- /dev/null +++ b/x-pack/plugins/actions/server/usage/connector_usage_reporting_task.test.ts @@ -0,0 +1,394 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import fs from 'fs'; +import axios from 'axios'; +import { loggingSystemMock } from '@kbn/core/server/mocks'; +import { coreMock } from '@kbn/core/server/mocks'; +import { + TaskManagerSetupContract, + TaskManagerStartContract, + TaskStatus, +} from '@kbn/task-manager-plugin/server'; +import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; +import { + CONNECTOR_USAGE_REPORTING_SOURCE_ID, + CONNECTOR_USAGE_REPORTING_TASK_ID, + CONNECTOR_USAGE_REPORTING_TASK_SCHEDULE, + CONNECTOR_USAGE_REPORTING_TASK_TYPE, + ConnectorUsageReportingTask, +} from './connector_usage_reporting_task'; +import type { CoreSetup, ElasticsearchClient } from '@kbn/core/server'; +import { ActionsPluginsStart } from '../plugin'; +import { SearchResponse } from '@elastic/elasticsearch/lib/api/types'; + +jest.mock('axios'); +const mockedAxiosPost = jest.spyOn(axios, 'post'); + +const nowStr = '2024-01-01T12:00:00.000Z'; +const nowDate = new Date(nowStr); + +jest.useFakeTimers(); +jest.setSystemTime(nowDate.getTime()); +const readFileSpy = jest.spyOn(fs, 'readFileSync'); + +describe('ConnectorUsageReportingTask', () => { + const logger = loggingSystemMock.createLogger(); + const { createSetup } = coreMock; + const { createSetup: taskManagerSetupMock, createStart: taskManagerStartMock } = taskManagerMock; + let mockEsClient: jest.Mocked; + let mockCore: CoreSetup; + let mockTaskManagerSetup: jest.Mocked; + let mockTaskManagerStart: jest.Mocked; + + beforeEach(async () => { + mockTaskManagerSetup = taskManagerSetupMock(); + mockTaskManagerStart = taskManagerStartMock(); + mockCore = createSetup(); + mockEsClient = (await mockCore.getStartServices())[0].elasticsearch.client + .asInternalUser as jest.Mocked; + }); + + afterEach(() => { + jest.resetAllMocks(); + }); + + const createTaskRunner = async ({ + lastReportedUsageDate, + projectId, + attempts = 0, + }: { + lastReportedUsageDate: Date; + projectId?: string; + attempts?: number; + }) => { + const timestamp = new Date(new Date().setMinutes(-15)); + const task = new ConnectorUsageReportingTask({ + eventLogIndex: 'test-index', + projectId, + logger, + core: mockCore, + taskManager: mockTaskManagerSetup, + config: { + url: 'usage-api', + ca: { + path: './ca.crt', + }, + }, + }); + + await task.start(mockTaskManagerStart); + + const createTaskRunnerFunction = + mockTaskManagerSetup.registerTaskDefinitions.mock.calls[0][0][ + CONNECTOR_USAGE_REPORTING_TASK_TYPE + ].createTaskRunner; + + return createTaskRunnerFunction({ + taskInstance: { + id: CONNECTOR_USAGE_REPORTING_TASK_ID, + runAt: timestamp, + attempts: 0, + ownerId: '', + status: TaskStatus.Running, + startedAt: timestamp, + scheduledAt: timestamp, + retryAt: null, + params: {}, + state: { + lastReportedUsageDate, + attempts, + }, + taskType: CONNECTOR_USAGE_REPORTING_TASK_TYPE, + }, + }); + }; + + it('registers the task', async () => { + readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---'); + new ConnectorUsageReportingTask({ + eventLogIndex: 'test-index', + projectId: 'test-projecr', + logger, + core: createSetup(), + taskManager: mockTaskManagerSetup, + config: { + url: 'usage-api', + ca: { + path: './ca.crt', + }, + }, + }); + + expect(mockTaskManagerSetup.registerTaskDefinitions).toBeCalledTimes(1); + expect(mockTaskManagerSetup.registerTaskDefinitions).toHaveBeenCalledWith({ + [CONNECTOR_USAGE_REPORTING_TASK_TYPE]: { + title: 'Connector usage reporting task', + timeout: '1m', + createTaskRunner: expect.any(Function), + }, + }); + }); + + it('schedules the task', async () => { + readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---'); + const core = createSetup(); + const taskManagerStart = taskManagerStartMock(); + + const task = new ConnectorUsageReportingTask({ + eventLogIndex: 'test-index', + projectId: 'test-projecr', + logger, + core, + taskManager: mockTaskManagerSetup, + config: { + url: 'usage-api', + ca: { + path: './ca.crt', + }, + }, + }); + + await task.start(taskManagerStart); + + expect(taskManagerStart.ensureScheduled).toBeCalledTimes(1); + expect(taskManagerStart.ensureScheduled).toHaveBeenCalledWith({ + id: CONNECTOR_USAGE_REPORTING_TASK_ID, + taskType: CONNECTOR_USAGE_REPORTING_TASK_TYPE, + schedule: { + ...CONNECTOR_USAGE_REPORTING_TASK_SCHEDULE, + }, + state: {}, + params: {}, + }); + }); + + it('logs error if task manager is not ready', async () => { + readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---'); + const core = createSetup(); + const taskManagerStart = taskManagerStartMock(); + + const task = new ConnectorUsageReportingTask({ + eventLogIndex: 'test-index', + projectId: 'test-projecr', + logger, + core, + taskManager: mockTaskManagerSetup, + config: { + url: 'usage-api', + ca: { + path: './ca.crt', + }, + }, + }); + + await task.start(); + + expect(taskManagerStart.ensureScheduled).not.toBeCalled(); + expect(logger.error).toHaveBeenCalledWith( + `Missing required task manager service during start of ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}` + ); + }); + + it('logs error if scheduling task fails', async () => { + readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---'); + const core = createSetup(); + const taskManagerStart = taskManagerStartMock(); + taskManagerStart.ensureScheduled.mockRejectedValue(new Error('test')); + + const task = new ConnectorUsageReportingTask({ + eventLogIndex: 'test-index', + projectId: 'test-projecr', + logger, + core, + taskManager: mockTaskManagerSetup, + config: { + url: 'usage-api', + ca: { + path: './ca.crt', + }, + }, + }); + + await task.start(taskManagerStart); + + expect(logger.error).toHaveBeenCalledWith( + 'Error scheduling task actions:connector_usage_reporting, received test' + ); + }); + + it('returns the existing state and logs a warning when project id is missing', async () => { + const lastReportedUsageDateStr = '2024-01-01T00:00:00.000Z'; + const lastReportedUsageDate = new Date(lastReportedUsageDateStr); + + const taskRunner = await createTaskRunner({ lastReportedUsageDate }); + + const response = await taskRunner.run(); + + expect(logger.warn).toHaveBeenCalledWith( + 'Missing required project id while running actions:connector_usage_reporting' + ); + + expect(response).toEqual({ + state: { + attempts: 0, + lastReportedUsageDate, + }, + }); + }); + + it('returns the existing state and logs an error when the CA Certificate is missing', async () => { + const lastReportedUsageDateStr = '2024-01-01T00:00:00.000Z'; + const lastReportedUsageDate = new Date(lastReportedUsageDateStr); + readFileSpy.mockImplementationOnce((func) => { + throw new Error('Mock file read error.'); + }); + + const taskRunner = await createTaskRunner({ lastReportedUsageDate, projectId: 'test-id' }); + + const response = await taskRunner.run(); + + expect(logger.error).toHaveBeenCalledTimes(2); + + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `CA Certificate for the project "test-id" couldn't be loaded, Error: Mock file read error.` + ); + + expect(logger.error).toHaveBeenNthCalledWith( + 2, + 'Missing required CA Certificate while running actions:connector_usage_reporting' + ); + + expect(response).toEqual({ + state: { + attempts: 0, + lastReportedUsageDate, + }, + }); + }); + + it('runs the task', async () => { + readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---'); + mockEsClient.search.mockResolvedValueOnce({ + aggregations: { total_usage: 215 }, + } as SearchResponse); + + mockedAxiosPost.mockResolvedValueOnce(200); + + const lastReportedUsageDateStr = '2024-01-01T00:00:00.000Z'; + const lastReportedUsageDate = new Date(lastReportedUsageDateStr); + + const taskRunner = await createTaskRunner({ lastReportedUsageDate, projectId: 'test-project' }); + + const response = await taskRunner.run(); + + const report = [ + { + creation_timestamp: nowStr, + id: 'connector-request-body-bytes-test-project-2024-01-01T12:00:00.000Z', + source: { + id: CONNECTOR_USAGE_REPORTING_SOURCE_ID, + instance_group_id: 'test-project', + }, + usage: { + period_seconds: 43200, + quantity: 0, + type: 'connector_request_body_bytes', + }, + usage_timestamp: nowStr, + }, + ]; + + expect(mockedAxiosPost).toHaveBeenCalledWith('usage-api', report, { + headers: { 'Content-Type': 'application/json' }, + timeout: 30000, + httpsAgent: expect.any(Object), + }); + + expect(response).toEqual({ + state: { + attempts: 0, + lastReportedUsageDate: expect.any(Date), + }, + }); + }); + + it('re-runs the task when search for records fails', async () => { + readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---'); + mockEsClient.search.mockRejectedValue(new Error('500')); + + mockedAxiosPost.mockResolvedValueOnce(200); + + const lastReportedUsageDate = new Date('2024-01-01T00:00:00.000Z'); + + const taskRunner = await createTaskRunner({ lastReportedUsageDate, projectId: 'test-project' }); + + const response = await taskRunner.run(); + + expect(response).toEqual({ + state: { + lastReportedUsageDate, + attempts: 0, + }, + runAt: nowDate, + }); + }); + + it('re-runs the task when it fails to push the usage record', async () => { + readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---'); + mockEsClient.search.mockResolvedValueOnce({ + aggregations: { total_usage: 215 }, + } as SearchResponse); + + mockedAxiosPost.mockRejectedValueOnce(500); + + const lastReportedUsageDate = new Date('2024-01-01T00:00:00.000Z'); + + const taskRunner = await createTaskRunner({ lastReportedUsageDate, projectId: 'test-project' }); + + const response = await taskRunner.run(); + + expect(response).toEqual({ + state: { + lastReportedUsageDate, + attempts: 1, + }, + runAt: new Date(nowDate.getTime() + 60000), // After a min + }); + }); + + it('stops retrying after 5 attempts', async () => { + readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---'); + mockEsClient.search.mockResolvedValueOnce({ + aggregations: { total_usage: 215 }, + } as SearchResponse); + + mockedAxiosPost.mockRejectedValueOnce(new Error('test-error')); + + const lastReportedUsageDate = new Date('2024-01-01T00:00:00.000Z'); + + const taskRunner = await createTaskRunner({ + lastReportedUsageDate, + projectId: 'test-project', + attempts: 4, + }); + + const response = await taskRunner.run(); + + expect(response).toEqual({ + state: { + lastReportedUsageDate, + attempts: 0, + }, + }); + + expect(logger.error).toHaveBeenCalledWith( + 'Usage data could not be pushed to usage-api. Stopped retrying after 5 attempts. Error:test-error' + ); + }); +}); diff --git a/x-pack/plugins/actions/server/usage/connector_usage_reporting_task.ts b/x-pack/plugins/actions/server/usage/connector_usage_reporting_task.ts new file mode 100644 index 0000000000000..ce44718749006 --- /dev/null +++ b/x-pack/plugins/actions/server/usage/connector_usage_reporting_task.ts @@ -0,0 +1,309 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import fs from 'fs'; +import { Logger, CoreSetup, type ElasticsearchClient } from '@kbn/core/server'; +import { + IntervalSchedule, + type ConcreteTaskInstance, + TaskManagerStartContract, + TaskManagerSetupContract, +} from '@kbn/task-manager-plugin/server'; +import { AggregationsSumAggregate } from '@elastic/elasticsearch/lib/api/types'; +import axios from 'axios'; +import https from 'https'; +import { ActionsConfig } from '../config'; +import { ConnectorUsageReport } from './types'; +import { ActionsPluginsStart } from '../plugin'; + +export const CONNECTOR_USAGE_REPORTING_TASK_SCHEDULE: IntervalSchedule = { interval: '1h' }; +export const CONNECTOR_USAGE_REPORTING_TASK_ID = 'connector_usage_reporting'; +export const CONNECTOR_USAGE_REPORTING_TASK_TYPE = `actions:${CONNECTOR_USAGE_REPORTING_TASK_ID}`; +export const CONNECTOR_USAGE_REPORTING_TASK_TIMEOUT = 30000; +export const CONNECTOR_USAGE_TYPE = `connector_request_body_bytes`; +export const CONNECTOR_USAGE_REPORTING_SOURCE_ID = `task-connector-usage-report`; +export const MAX_PUSH_ATTEMPTS = 5; + +export class ConnectorUsageReportingTask { + private readonly logger: Logger; + private readonly eventLogIndex: string; + private readonly projectId: string | undefined; + private readonly caCertificate: string | undefined; + private readonly usageApiUrl: string; + + constructor({ + logger, + eventLogIndex, + core, + taskManager, + projectId, + config, + }: { + logger: Logger; + eventLogIndex: string; + core: CoreSetup; + taskManager: TaskManagerSetupContract; + projectId: string | undefined; + config: ActionsConfig['usage']; + }) { + this.logger = logger; + this.projectId = projectId; + this.eventLogIndex = eventLogIndex; + this.usageApiUrl = config.url; + const caCertificatePath = config.ca?.path; + + if (caCertificatePath && caCertificatePath.length > 0) { + try { + this.caCertificate = fs.readFileSync(caCertificatePath, 'utf8'); + } catch (e) { + this.caCertificate = undefined; + this.logger.error( + `CA Certificate for the project "${projectId}" couldn't be loaded, Error: ${e.message}` + ); + } + } + + taskManager.registerTaskDefinitions({ + [CONNECTOR_USAGE_REPORTING_TASK_TYPE]: { + title: 'Connector usage reporting task', + timeout: '1m', + createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { + return { + run: async () => this.runTask(taskInstance, core), + cancel: async () => {}, + }; + }, + }, + }); + } + + public start = async (taskManager?: TaskManagerStartContract) => { + if (!taskManager) { + this.logger.error( + `Missing required task manager service during start of ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}` + ); + return; + } + + try { + await taskManager.ensureScheduled({ + id: CONNECTOR_USAGE_REPORTING_TASK_ID, + taskType: CONNECTOR_USAGE_REPORTING_TASK_TYPE, + schedule: { + ...CONNECTOR_USAGE_REPORTING_TASK_SCHEDULE, + }, + state: {}, + params: {}, + }); + } catch (e) { + this.logger.error( + `Error scheduling task ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}, received ${e.message}` + ); + } + }; + + private runTask = async (taskInstance: ConcreteTaskInstance, core: CoreSetup) => { + const { state } = taskInstance; + + if (!this.projectId) { + this.logger.warn( + `Missing required project id while running ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}` + ); + return { + state, + }; + } + + if (!this.caCertificate) { + this.logger.error( + `Missing required CA Certificate while running ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}` + ); + return { + state, + }; + } + + const [{ elasticsearch }] = await core.getStartServices(); + const esClient = elasticsearch.client.asInternalUser; + + const now = new Date(); + const oneDayAgo = new Date(new Date().getTime() - 24 * 60 * 60 * 1000); + const lastReportedUsageDate: Date = !!state.lastReportedUsageDate + ? new Date(state.lastReportedUsageDate) + : oneDayAgo; + + let attempts: number = state.attempts || 0; + + const fromDate = lastReportedUsageDate; + const toDate = now; + + let totalUsage = 0; + try { + totalUsage = await this.getTotalUsage({ + esClient, + fromDate, + toDate, + }); + } catch (e) { + this.logger.error(`Usage data could not be fetched. It will be retried. Error:${e.message}`); + return { + state: { + lastReportedUsageDate, + attempts, + }, + runAt: now, + }; + } + + const record: ConnectorUsageReport = this.createUsageRecord({ + totalUsage, + fromDate, + toDate, + projectId: this.projectId, + }); + + this.logger.debug(`Record: ${JSON.stringify(record)}`); + + try { + attempts = attempts + 1; + await this.pushUsageRecord(record); + this.logger.info( + `Connector usage record has been successfully reported, ${record.creation_timestamp}, usage: ${record.usage.quantity}, period:${record.usage.period_seconds}` + ); + } catch (e) { + if (attempts < MAX_PUSH_ATTEMPTS) { + this.logger.error( + `Usage data could not be pushed to usage-api. It will be retried (${attempts}). Error:${e.message}` + ); + + return { + state: { + lastReportedUsageDate, + attempts, + }, + runAt: this.getDelayedRetryDate({ attempts, now }), + }; + } + this.logger.error( + `Usage data could not be pushed to usage-api. Stopped retrying after ${attempts} attempts. Error:${e.message}` + ); + return { + state: { + lastReportedUsageDate, + attempts: 0, + }, + }; + } + + return { + state: { lastReportedUsageDate: toDate, attempts: 0 }, + }; + }; + + private getTotalUsage = async ({ + esClient, + fromDate, + toDate, + }: { + esClient: ElasticsearchClient; + fromDate: Date; + toDate: Date; + }): Promise => { + const usageResult = await esClient.search({ + index: this.eventLogIndex, + sort: '@timestamp', + size: 0, + query: { + bool: { + filter: { + bool: { + must: [ + { + term: { 'event.action': 'execute' }, + }, + { + term: { 'event.provider': 'actions' }, + }, + { + exists: { + field: 'kibana.action.execution.usage.request_body_bytes', + }, + }, + { + range: { + '@timestamp': { + gt: fromDate, + lte: toDate, + }, + }, + }, + ], + }, + }, + }, + }, + aggs: { + total_usage: { sum: { field: 'kibana.action.execution.usage.request_body_bytes' } }, + }, + }); + + return (usageResult.aggregations?.total_usage as AggregationsSumAggregate)?.value ?? 0; + }; + + private createUsageRecord = ({ + totalUsage, + fromDate, + toDate, + projectId, + }: { + totalUsage: number; + fromDate: Date; + toDate: Date; + projectId: string; + }): ConnectorUsageReport => { + const period = Math.round((toDate.getTime() - fromDate.getTime()) / 1000); + const toStr = toDate.toISOString(); + const timestamp = new Date(toStr); + timestamp.setMinutes(0); + timestamp.setSeconds(0); + timestamp.setMilliseconds(0); + + return { + id: `connector-request-body-bytes-${projectId}-${timestamp.toISOString()}`, + usage_timestamp: toStr, + creation_timestamp: toStr, + usage: { + type: CONNECTOR_USAGE_TYPE, + period_seconds: period, + quantity: totalUsage, + }, + source: { + id: CONNECTOR_USAGE_REPORTING_SOURCE_ID, + instance_group_id: projectId, + }, + }; + }; + + private pushUsageRecord = async (record: ConnectorUsageReport) => { + return axios.post(this.usageApiUrl, [record], { + headers: { 'Content-Type': 'application/json' }, + timeout: CONNECTOR_USAGE_REPORTING_TASK_TIMEOUT, + httpsAgent: new https.Agent({ + ca: this.caCertificate, + }), + }); + }; + + private getDelayedRetryDate = ({ attempts, now }: { attempts: number; now: Date }) => { + const baseDelay = 60 * 1000; + const delayByAttempts = baseDelay * attempts; + + const delayedTime = now.getTime() + delayByAttempts; + + return new Date(delayedTime); + }; +} diff --git a/x-pack/plugins/actions/server/usage/types.ts b/x-pack/plugins/actions/server/usage/types.ts index 6bdfe316c76e2..d57de6f4dad33 100644 --- a/x-pack/plugins/actions/server/usage/types.ts +++ b/x-pack/plugins/actions/server/usage/types.ts @@ -65,3 +65,18 @@ export const byServiceProviderTypeSchema: MakeSchemaFrom['count_ac other: { type: 'long' }, ses: { type: 'long' }, }; + +export interface ConnectorUsageReport { + id: string; + usage_timestamp: string; + creation_timestamp: string; + usage: { + type: string; + period_seconds: number; + quantity: number | string | undefined; + }; + source: { + id: string | undefined; + instance_group_id: string; + }; +} diff --git a/x-pack/plugins/actions/tsconfig.json b/x-pack/plugins/actions/tsconfig.json index d060287d24143..384aba6a6b014 100644 --- a/x-pack/plugins/actions/tsconfig.json +++ b/x-pack/plugins/actions/tsconfig.json @@ -47,7 +47,8 @@ "@kbn/core-http-server", "@kbn/core-test-helpers-kbn-server", "@kbn/security-plugin-types-server", - "@kbn/core-application-common" + "@kbn/core-application-common", + "@kbn/cloud-plugin" ], "exclude": [ "target/**/*", diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index 55856f3c80402..79bdd3ad7df09 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -81,6 +81,7 @@ export default function ({ getService }: FtrProviderContext) { 'actions:.torq', 'actions:.webhook', 'actions:.xmatters', + 'actions:connector_usage_reporting', 'actions_telemetry', 'ad_hoc_run-backfill', 'alerting:.es-query',