From 8d66d6ca213035ed239cbe38ff12a37d7436ebc0 Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Wed, 25 Sep 2024 02:04:01 +1000 Subject: [PATCH] [8.x] [Logs Data Telemetry] Create background job to collect and send logs data telemetry (#189380) (#193877) # Backport This will backport the following commits from `main` to `8.x`: - [[Logs Data Telemetry] Create background job to collect and send logs data telemetry (#189380)](https://github.com/elastic/kibana/pull/189380) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) Co-authored-by: Abdul Wahab Zahid --- .../get_data_telemetry/constants.ts | 1 + .../dataset_quality/kibana.jsonc | 5 +- .../dataset_quality/server/plugin.ts | 15 +- .../services/data_telemetry/constants.ts | 102 +++ .../data_telemetry_service.test.ts | 787 ++++++++++++++++++ .../data_telemetry/data_telemetry_service.ts | 261 ++++++ .../server/services/data_telemetry/helpers.ts | 575 +++++++++++++ .../server/services/data_telemetry/types.ts | 87 ++ .../dataset_quality/server/services/index.ts | 1 + .../dataset_quality/server/types.ts | 14 +- .../dataset_quality/tsconfig.json | 5 +- .../check_registered_task_types.ts | 1 + 12 files changed, 1848 insertions(+), 6 deletions(-) create mode 100644 x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/constants.ts create mode 100644 x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.test.ts create mode 100644 x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.ts create mode 100644 x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/helpers.ts create mode 100644 x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/types.ts diff --git a/src/plugins/telemetry/server/telemetry_collection/get_data_telemetry/constants.ts b/src/plugins/telemetry/server/telemetry_collection/get_data_telemetry/constants.ts index bd76ea04cb667..60bd01c2bd78c 100644 --- a/src/plugins/telemetry/server/telemetry_collection/get_data_telemetry/constants.ts +++ b/src/plugins/telemetry/server/telemetry_collection/get_data_telemetry/constants.ts @@ -55,6 +55,7 @@ export const DATA_DATASETS_INDEX_PATTERNS = [ { pattern: 'telegraf*', patternName: 'telegraf' }, { pattern: 'prometheusbeat*', patternName: 'prometheusbeat' }, { pattern: 'fluentbit*', patternName: 'fluentbit' }, + { pattern: 'fluent-bit*', patternName: 'fluentbit' }, { pattern: '*nginx*', patternName: 'nginx' }, { pattern: '*apache*', patternName: 'apache' }, // Already in Security (keeping it in here for documentation) { pattern: '*logs*', patternName: 'generic-logs' }, diff --git a/x-pack/plugins/observability_solution/dataset_quality/kibana.jsonc b/x-pack/plugins/observability_solution/dataset_quality/kibana.jsonc index 413917f62bd7f..cb16a7ce0455e 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/kibana.jsonc +++ b/x-pack/plugins/observability_solution/dataset_quality/kibana.jsonc @@ -23,9 +23,10 @@ "fieldFormats", "dataViews", "lens", - "fieldsMetadata" + "fieldsMetadata", + "taskManager" ], - "optionalPlugins": [], + "optionalPlugins": ["telemetry"], "requiredBundles": ["unifiedHistogram", "discover"], "extraPublicDirs": [ "common" diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/plugin.ts b/x-pack/plugins/observability_solution/dataset_quality/server/plugin.ts index 5a1b7b947641e..7c51e40c870a1 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/plugin.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/plugin.ts @@ -5,8 +5,9 @@ * 2.0. */ -import { CoreSetup, Logger, Plugin, PluginInitializerContext } from '@kbn/core/server'; +import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from '@kbn/core/server'; import { mapValues } from 'lodash'; +import { DataTelemetryService } from './services'; import { getDatasetQualityServerRouteRepository } from './routes'; import { registerRoutes } from './routes/register_routes'; import { DatasetQualityRouteHandlerResources } from './routes/types'; @@ -18,9 +19,11 @@ import { export class DatasetQualityServerPlugin implements Plugin { private readonly logger: Logger; + private readonly dataTelemetryService: DataTelemetryService; constructor(initializerContext: PluginInitializerContext) { this.logger = initializerContext.logger.get(); + this.dataTelemetryService = new DataTelemetryService(this.logger); } setup( @@ -53,10 +56,18 @@ export class DatasetQualityServerPlugin implements Plugin { getEsCapabilities, }); + // Setup Data Telemetry Service + this.dataTelemetryService.setup(core.analytics, plugins.taskManager); + return {}; } - start() { + start(core: CoreStart, plugins: DatasetQualityPluginStartDependencies) { + // Start Data Telemetry Service + this.dataTelemetryService.start(plugins.telemetry, core, plugins.taskManager).catch((error) => { + this.logger.error(`[Data Telemetry Service]: ${error}`); + }); + return {}; } } diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/constants.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/constants.ts new file mode 100644 index 0000000000000..619a6efc3bfdd --- /dev/null +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/constants.ts @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { DATA_DATASETS_INDEX_PATTERNS_UNIQUE } from '@kbn/telemetry-plugin/server/telemetry_collection/get_data_telemetry/constants'; + +import { DatasetIndexPattern } from './types'; + +export const LOGS_DATA_TELEMETRY_TASK_TYPE = 'logs-data-telemetry'; +export const LOGS_DATA_TELEMETRY_TASK_ID = 'logs-data-telemetry:collect-and-report-task-2'; + +export const TELEMETRY_TASK_INTERVAL = 24 * 60; // 24 hours (in minutes) +export const TELEMETRY_TASK_TIMEOUT = 10; // 10 minutes + +export const BREATHE_DELAY_SHORT = 1000; // 1 seconds +export const BREATHE_DELAY_MEDIUM = 5 * 1000; // 5 seconds + +export const MAX_STREAMS_TO_REPORT = 1000; + +export const NON_LOG_SIGNALS = ['metrics', 'traces', 'internal', 'synthetics']; +export const EXCLUDE_ELASTIC_LOGS = ['logs-synth', 'logs-elastic', 'logs-endpoint']; + +export const TELEMETRY_CHANNEL = 'logs-data-telemetry'; + +const LOGS_INDEX_PATTERN_NAMES = [ + 'filebeat', + 'generic-filebeat', + 'metricbeat', + 'generic-metricbeat', + 'apm', + 'functionbeat', + 'generic-functionbeat', + 'heartbeat', + 'generic-heartbeat', + 'logstash', + 'generic-logstash', + 'fluentd', + 'telegraf', + 'prometheusbeat', + 'fluentbit', + 'nginx', + 'apache', + 'generic-logs', +]; + +const TELEMETRY_PATTERNS_BY_NAME = DATA_DATASETS_INDEX_PATTERNS_UNIQUE.reduce((acc, pattern) => { + acc[pattern.patternName] = [pattern, ...(acc[pattern.patternName] || [])]; + return acc; +}, {} as Record); + +export const LOGS_DATASET_INDEX_PATTERNS = LOGS_INDEX_PATTERN_NAMES.flatMap( + (patternName) => TELEMETRY_PATTERNS_BY_NAME[patternName] || [] +); + +export const LEVEL_2_RESOURCE_FIELDS = [ + 'host.name', + 'service.name', + 'host', + 'hostname', + 'host_name', +]; + +export const PROMINENT_LOG_ECS_FIELDS = [ + 'log.level', + 'log.logger', + 'log.origin.file.name', + 'log.origin.function', + 'log.origin.file.line', + 'event.action', + 'event.category', + 'event.dataset', + 'event.kind', + 'log.file.path', +]; + +export const DATA_TELEMETRY_FIELDS = [ + 'container.id', + 'log.level', + 'container.name', + 'host.name', + 'host.hostname', + 'kubernetes.pod.name', + 'kubernetes.pod.uid', + 'cloud.provider', + 'agent.type', + 'event.dataset', + 'event.category', + 'event.module', + 'service.name', + 'service.type', + 'service.version', + 'message', + 'event.original', + 'error.message', + '@timestamp', + 'data_stream.dataset', + 'data_stream.namespace', + 'data_stream.type', +]; diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.test.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.test.ts new file mode 100644 index 0000000000000..cbbdf8bbfa518 --- /dev/null +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.test.ts @@ -0,0 +1,787 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient, type Logger } from '@kbn/core/server'; +import type { AnalyticsServiceSetup } from '@kbn/core/public'; +import { TelemetryPluginStart } from '@kbn/telemetry-plugin/server'; +import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; + +import { DataTelemetryEvent } from './types'; +import { BREATHE_DELAY_MEDIUM, MAX_STREAMS_TO_REPORT } from './constants'; +import { DataTelemetryService } from './data_telemetry_service'; + +// Mock the constants module to speed up and simplify the tests +jest.mock('./constants', () => ({ + ...jest.requireActual('./constants'), + BREATHE_DELAY_SHORT: 10, + BREATHE_DELAY_MEDIUM: 50, + BREATHE_DELAY_LONG: 100, + + MAX_STREAMS_TO_REPORT: 50, + + LOGS_DATASET_INDEX_PATTERNS: [ + { + pattern: 'test-pattern-*', + patternName: 'test', + shipper: 'custom', + }, + { + pattern: 'test-pattern-2-*', + patternName: 'test-2', + shipper: 'custom-2', + }, + ], +})); + +const TEST_TIMEOUT = 60 * 1000; +const SYNTH_DOCS = 6000000; + +describe('DataTelemetryService', () => { + let service: DataTelemetryService; + let mockEsClient: jest.Mocked; + let mockAnalyticsSetup: jest.Mocked; + let mockTelemetryStart: jest.Mocked; + let mockLogger: jest.Mocked; + let mockTaskManagerSetup: ReturnType; + let mockTaskManagerStart: ReturnType; + let runTask: ReturnType['runTask']; + + describe('Data Telemetry Task', () => { + beforeEach(async () => { + const mocks = setupMocks(); + mockEsClient = mocks.mockEsClient; + mockLogger = mocks.mockLogger; + mockAnalyticsSetup = mocks.mockAnalyticsSetup; + mockTelemetryStart = mocks.mockTelemetryStart; + mockTaskManagerSetup = mocks.taskManagerSetup; + mockTaskManagerStart = mocks.taskManagerStart; + runTask = mocks.runTask; + + service = new DataTelemetryService(mockLogger); + service.setup(mockAnalyticsSetup, mockTaskManagerSetup); + await service.start( + mockTelemetryStart, + { + elasticsearch: { client: { asInternalUser: mockEsClient } }, + } as any, + mockTaskManagerStart + ); + }); + + afterEach(() => { + jest.clearAllTimers(); + jest.clearAllMocks(); + }); + + it('should trigger task runner run method', async () => { + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend'); + + await runTask(); + + // Assert collectAndSend is called + expect(collectAndSendSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe('Docs Info', () => { + beforeEach(async () => { + const mocks = setupMocks(); + mockEsClient = mocks.mockEsClient; + mockLogger = mocks.mockLogger; + mockAnalyticsSetup = mocks.mockAnalyticsSetup; + mockTelemetryStart = mocks.mockTelemetryStart; + mockTaskManagerSetup = mocks.taskManagerSetup; + mockTaskManagerStart = mocks.taskManagerStart; + runTask = mocks.runTask; + + service = new DataTelemetryService(mockLogger); + service.setup(mockAnalyticsSetup, mockTaskManagerSetup); + await service.start( + mockTelemetryStart, + { + elasticsearch: { client: { asInternalUser: mockEsClient } }, + } as any, + mockTaskManagerStart + ); + + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + }); + + afterEach(() => { + jest.clearAllTimers(); + jest.clearAllMocks(); + }); + + it( + 'should collect and send telemetry after startup and every interval', + async () => { + const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend'); + + await runTask(); + expect(collectAndSendSpy).toHaveBeenCalledTimes(1); + + await sleepForBreathDelay(); + expect(mockEsClient.indices.getMapping).toHaveBeenCalledTimes(1); + + await runTask(); + expect(collectAndSendSpy).toHaveBeenCalledTimes(2); + + await sleepForBreathDelay(); + expect(mockEsClient.indices.getMapping).toHaveBeenCalledTimes(2); + }, + TEST_TIMEOUT + ); + + it( + 'should stop collecting and sending telemetry if stopped', + async () => { + const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend'); + + await runTask(); + expect(collectAndSendSpy).toHaveBeenCalledTimes(1); + + service.stop(); + + await runTask(); + await sleepForBreathDelay(); + expect(collectAndSendSpy).toHaveBeenCalledTimes(1); + }, + TEST_TIMEOUT + ); + + it( + 'should not collect data if telemetry is not opted in', + async () => { + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(false); + + const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend'); + + await runTask(); + expect(collectAndSendSpy).not.toHaveBeenCalled(); + + await runTask(); + await sleepForBreathDelay(); + expect(collectAndSendSpy).not.toHaveBeenCalled(); + + // Assert that logger.debug is called with appropriate message + expect(mockLogger.debug).toHaveBeenCalledWith( + '[Logs Data Telemetry] Telemetry is not opted-in.' + ); + }, + TEST_TIMEOUT + ); + + it( + 'should not collect if number of data streams exceed MAX_STREAMS_TO_REPORT', + async () => { + (mockEsClient.indices.getDataStream as unknown as jest.Mock).mockResolvedValue({ + data_streams: Array.from({ length: MAX_STREAMS_TO_REPORT + 1 }, (_, i) => ({ + name: `logs-postgresql.log-default-${i}`, + indices: [ + { + index_name: `.ds-logs-postgresql.log-default-${i}-000001`, + }, + ], + _meta: { + managed: true, + description: 'default logs template installed by x-pack', + }, + })), + }); + + await runTask(); + await sleepForBreathDelay(); + expect(mockEsClient.indices.getMapping).not.toHaveBeenCalled(); + }, + TEST_TIMEOUT + ); + + it( + 'creates and sends the telemetry events', + async () => { + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + + const reportEventsSpy = jest.spyOn(service as any, 'reportEvents'); + + await runTask(); + await sleepForBreathDelay(); + + expect(reportEventsSpy).toHaveBeenCalledTimes(1); + expect( + ( + reportEventsSpy.mock?.lastCall as [ + [Partial], + [Partial] + ] + )?.[0]?.[0] + ).toEqual( + expect.objectContaining({ + doc_count: 4000 + 500 + 200, + failure_store_doc_count: 300, + index_count: 2 + 1 + 1, + failure_store_index_count: 1, + namespace_count: 1 + 1, + size_in_bytes: 10089898 + 800000 + 500000, + pattern_name: 'test', + managed_by: ['fleet'], + package_name: ['activemq'], + beat: [], + }) + ); + }, + TEST_TIMEOUT + ); + + it( + 'should not include stats of excluded indices', + async () => { + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + const reportEventsSpy = jest.spyOn(service as any, 'reportEvents'); + await runTask(); + await sleepForBreathDelay(); + + expect(reportEventsSpy).toHaveBeenCalledTimes(1); + const events = reportEventsSpy.mock?.lastCall as [ + [Partial], + [Partial] + ]; + // doc_count should be less than SYNTH_DOCS for any event + (events[0] ?? []).forEach((event) => { + expect(event.doc_count).toBeLessThan(SYNTH_DOCS); + }); + }, + TEST_TIMEOUT + ); + }); + + describe('Fields Info and Structure Levels', () => { + beforeEach(async () => { + jest.mock('./constants', () => ({ + ...jest.requireActual('./constants'), + LOGS_DATASET_INDEX_PATTERNS: [ + { + pattern: 'test-pattern-*', + patternName: 'test', + shipper: 'custom', + }, + { + pattern: 'test-pattern-3-*', + patternName: 'test-3', + shipper: 'custom-3', + }, + ], + })); + + const mocks = setupMocks(); + mockEsClient = mocks.mockEsClient; + mockLogger = mocks.mockLogger; + mockAnalyticsSetup = mocks.mockAnalyticsSetup; + mockTelemetryStart = mocks.mockTelemetryStart; + mockTaskManagerSetup = mocks.taskManagerSetup; + mockTaskManagerStart = mocks.taskManagerStart; + runTask = mocks.runTask; + + service = new DataTelemetryService(mockLogger); + service.setup(mockAnalyticsSetup, mockTaskManagerSetup); + await service.start( + mockTelemetryStart, + { + elasticsearch: { client: { asInternalUser: mockEsClient } }, + } as any, + mockTaskManagerStart + ); + + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + }); + + afterEach(() => { + jest.clearAllTimers(); + jest.clearAllMocks(); + }); + + it( + 'should correctly calculate total fields and count of resource fields', + async () => { + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + + const reportEventsSpy = jest.spyOn(service as any, 'reportEvents'); + + await runTask(); + await sleepForBreathDelay(); + + expect(reportEventsSpy).toHaveBeenCalledTimes(1); + const lastCall = reportEventsSpy.mock?.lastCall?.[0] as [Partial]; + expect(lastCall?.[0]?.field_count).toBe(8); + expect(lastCall?.[0]?.field_existence).toEqual({ + 'container.id': 3000 + 500, + 'host.name': 3000 + 500, + message: 3000, + '@timestamp': 3000 + 500 + 200, + 'data_stream.dataset': 3000 + 500 + 200, + 'data_stream.namespace': 3000 + 500 + 200, + 'data_stream.type': 3000 + 500 + 200, + }); + }, + TEST_TIMEOUT + ); + + it('should correctly calculate structure levels', async () => { + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + + const reportEventsSpy = jest.spyOn(service as any, 'reportEvents'); + + await runTask(); + await sleepForBreathDelay(); + + expect(reportEventsSpy).toHaveBeenCalledTimes(1); + const lastCall = reportEventsSpy.mock?.lastCall?.[0] as [ + Partial, + Partial + ]; + expect(lastCall?.[1]?.structure_level).toEqual({ + '1': 1000, + '4': 500, + '6': 200, + }); + }); + }); +}); + +function sleepForBreathDelay() { + return new Promise((resolve) => setTimeout(resolve, BREATHE_DELAY_MEDIUM * 10)); +} + +function setupMocks() { + const mockEsClient = { + indices: { + stats: jest.fn().mockImplementation(() => { + const emptyAllStats: any = { _all: {} }; + + // _all shouldn't be read hence overriding with empty here + return Promise.resolve({ + ...emptyAllStats, + indices: { + ...emptyAllStats.indices, + ...MOCK_POSTGRES_DEFAULT_STATS.indices, + ...MOCK_POSTGRES_NON_DEFAULT_STATS.indices, + ...MOCK_ACTIVE_MQ_DEFAULT_STATS.indices, + ...MOCK_FLUENT_BIT_DEFAULT_STATS.indices, + ...MOCK_SYNTH_DATA_STATS.indices, + }, + }); + }), + getDataStream: jest.fn().mockImplementation((params) => { + if (params.name === 'test-pattern-2-*') { + return Promise.resolve({ + data_streams: MOCK_FLUENT_BIT_DATA_STREAMS, + }); + } + + return Promise.resolve({ + data_streams: [ + ...MOCK_POSTGRES_DATA_STREAMS, + ...MOCK_SYNTH_DATA_STREAMS, + ...MOCK_ACTIVE_MQ_FLEET_DATA_STREAMS, + ], + }); + }), + get: jest.fn().mockResolvedValue(MOCK_INDICES), + getMapping: jest.fn().mockImplementation(() => { + return Promise.resolve({ + ...MOCK_APACHE_GENERIC_INDEX_MAPPING, + ...MOCK_POSTGRES_DEFAULT_MAPPINGS, + ...MOCK_POSTGRES_NON_DEFAULT_MAPPINGS, + ...MOCK_ACTIVE_MQ_DEFAULT_MAPPINGS, + ...MOCK_FLUENT_BIT_DEFAULT_MAPPINGS, + }); + }), + }, + info: jest.fn().mockResolvedValue({}), + transport: { + request: jest.fn().mockImplementation((params) => { + if (params.path?.includes('_stats') && params?.querystring?.failure_store === 'only') { + return MOCK_ACTIVE_MQ_FAILURE_STATS; + } + + return MOCK_ACTIVE_MQ_FAILURE_STATS; + }), + }, + } as unknown as jest.Mocked; + + const mockLogger = { + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + } as unknown as jest.Mocked; + + const mockAnalyticsSetup = { + getTelemetryUrl: jest.fn().mockResolvedValue(new URL('https://telemetry.elastic.co')), + } as unknown as jest.Mocked; + + const mockTelemetryStart = { + getIsOptedIn: jest.fn().mockResolvedValue(true), + } as unknown as jest.Mocked; + + const taskManagerSetup = taskManagerMock.createSetup(); + const taskManagerStart = taskManagerMock.createStart(); + + const runTask = () => { + const taskDefinitions = taskManagerSetup.registerTaskDefinitions.mock.calls[0][0]; + const taskType = Object.keys(taskDefinitions)[0]; + const taskRunner = taskDefinitions[taskType].createTaskRunner({ taskInstance: {} as any }); + + return taskRunner.run(); + }; + + return { + mockEsClient, + mockLogger, + mockAnalyticsSetup, + mockTelemetryStart, + taskManagerSetup, + taskManagerStart, + runTask, + }; +} + +const MOCK_INDICES = { + 'apache-generic-index': {}, + '.ds-logs-postgresql.log-default-2024.07.31-000001': {}, + '.ds-logs-postgresql.log-default-2024.08.31-000002': {}, + '.ds-logs-synth.01-default-2024.07.31-000001': {}, + '.ds-logs-active-mq.fleet-2024.07.31-000001': {}, +}; + +const MOCK_POSTGRES_DATA_STREAMS = [ + { + name: 'logs-postgresql.log-default', + indices: [ + { + index_name: '.ds-logs-postgresql.log-default-2024.07.31-000001', + }, + { + index_name: '.ds-logs-postgresql.log-default-2024.08.31-000002', + }, + ], + _meta: { + managed: true, + description: 'default logs template installed by x-pack', + }, + }, + { + name: 'logs-postgresql.log-non-default', + indices: [ + { + index_name: '.ds-logs-postgresql.log-non-default-2024.07.31-000001', + }, + ], + _meta: { + managed: true, + description: 'default logs template installed by x-pack', + }, + }, +]; +const MOCK_POSTGRES_DEFAULT_STATS = { + _all: { + ...getPrimaryDocsAndStoreSize(1000 + 3000, 1000000 + 9089898), + }, + indices: { + '.ds-logs-postgresql.log-default-2024.07.31-000001': getPrimaryDocsAndStoreSize(1000, 1000000), + '.ds-logs-postgresql.log-default-2024.08.31-000002': getPrimaryDocsAndStoreSize(3000, 9089898), + }, +}; + +const MOCK_POSTGRES_NON_DEFAULT_STATS = { + _all: {}, + indices: { + '.ds-logs-postgresql.log-non-default-2024.07.31-000001': getPrimaryDocsAndStoreSize( + 500, + 800000 + ), + }, +}; + +const MOCK_POSTGRES_DEFAULT_MAPPINGS = { + '.ds-logs-postgresql.log-default-2024.08.31-000002': { + mappings: { + properties: { + ...getTimestampProp(), + ...getContainerProp(), + ...getDataStreamProps('postgresql.log', 'default', 'logs'), + ...getHostProp(), + ...getMessageProp(), + custom_field_01: { + type: 'float', + }, + }, + }, + }, +}; + +const MOCK_POSTGRES_NON_DEFAULT_MAPPINGS = { + '.ds-logs-postgresql.log-non-default-2024.07.31-000001': { + mappings: { + properties: { + ...getTimestampProp(), + ...getContainerProp(), + ...getDataStreamProps('postgresql.log', 'non-default', 'logs'), + ...getHostProp(), + custom_field_01: { + type: 'float', + }, + }, + }, + }, +}; + +const MOCK_SYNTH_DATA_STREAMS = [ + { + name: 'logs-synth.01-default', + indices: [ + { + index_name: '.ds-logs-synth.01-default-2024.07.31-000001', + }, + { + index_name: '.ds-logs-synth.01-default-2024.08.31-000002', + }, + ], + _meta: { + managed: true, + description: 'default logs template installed by x-pack', + }, + }, +]; + +// Docs from synth data shouldn't be counted in the telemetry events +const MOCK_SYNTH_DATA_STATS = { + _all: {}, + indices: { + '.ds-logs-synth.01-default-2024.07.31-000001': getPrimaryDocsAndStoreSize( + SYNTH_DOCS, + 1000000000 + ), + }, +}; + +const MOCK_APACHE_GENERIC_INDEX_MAPPING = { + 'apache-generic-index': { + mappings: { + properties: { + ...getTimestampProp(), + }, + }, + }, +}; + +const MOCK_ACTIVE_MQ_FLEET_DATA_STREAMS = [ + { + name: 'logs-active-mq.fleet', + indices: [ + { + index_name: '.ds-logs-active-mq.fleet-2024.07.31-000001', + }, + ], + _meta: { + package: { + name: 'activemq', + }, + managed_by: 'fleet', + managed: true, + }, + }, +]; + +const MOCK_ACTIVE_MQ_DEFAULT_STATS = { + _all: {}, + indices: { + '.ds-logs-active-mq.fleet-2024.07.31-000001': getPrimaryDocsAndStoreSize(200, 500000), + }, +}; + +const MOCK_ACTIVE_MQ_FAILURE_STATS = { + _all: {}, + indices: { + '.fs-logs-active-mq.fleet-2024.07.31-000001': getPrimaryDocsAndStoreSize(300, 700000), + }, +}; + +const MOCK_ACTIVE_MQ_DEFAULT_MAPPINGS = { + '.ds-logs-active-mq.fleet-2024.07.31-000001': { + mappings: { + properties: { + ...getTimestampProp(), + ...getDataStreamProps('active-mq.fleet', 'default', 'logs'), + }, + }, + }, +}; + +const MOCK_FLUENT_BIT_DATA_STREAMS = [ + { + name: 'logs-fluent-bit.fleet', + indices: [ + { + index_name: '.ds-logs-fluent-bit.fleet-2024.07.31-000001', + }, + { + index_name: '.ds-logs-fluent-bit.fleet-2024.07.31-000002', + }, + { + index_name: '.ds-logs-fluent-bit.fleet-2024.07.31-000003', + }, + ], + _meta: { + managed: true, + description: 'default logs template installed by x-pack', + }, + }, +]; + +const MOCK_FLUENT_BIT_DEFAULT_MAPPINGS = { + '.ds-logs-fluent-bit.fleet-2024.07.31-000001': { + // Level 01 + mappings: { + properties: { + ...getTimestampProp(), + ...getDataStreamProps('fluent-bit.fleet', 'default', 'logs'), + ...getEcsVersionProp(), + }, + }, + }, + '.ds-logs-fluent-bit.fleet-2024.07.31-000002': { + // Level 04 + mappings: { + properties: { + ...getTimestampProp(), + ...getHostProp(), + ...getMessageProp(), + }, + }, + }, + '.ds-logs-fluent-bit.fleet-2024.07.31-000003': { + // Level 06 + mappings: { + properties: { + ...getTimestampProp(), + ...getHostProp(), + ...getMessageProp(), + ...getEcsVersionProp(), + }, + }, + }, +}; + +const MOCK_FLUENT_BIT_DEFAULT_STATS = { + _all: {}, + indices: { + '.ds-logs-fluent-bit.fleet-2024.07.31-000001': getPrimaryDocsAndStoreSize(1000, 1000000), + '.ds-logs-fluent-bit.fleet-2024.07.31-000002': getPrimaryDocsAndStoreSize(500, 800000), + '.ds-logs-fluent-bit.fleet-2024.07.31-000003': getPrimaryDocsAndStoreSize(200, 500000), + }, +}; + +function getPrimaryDocsAndStoreSize(docs: number, storeSize: number) { + return { + primaries: { + docs: { + count: docs, + deleted: 0, + total_size_in_bytes: storeSize, + }, + store: { + size_in_bytes: storeSize, + }, + }, + }; +} + +function getTimestampProp() { + return { + '@timestamp': { + type: 'date', + ignore_malformed: false, + }, + }; +} + +function getDataStreamProps(dataset: string, namespace: string, type: string) { + return { + data_stream: { + properties: { + dataset: { + type: 'constant_keyword', + value: dataset, + }, + namespace: { + type: 'constant_keyword', + value: namespace, + }, + type: { + type: 'constant_keyword', + value: type, + }, + }, + }, + }; +} + +function getContainerProp() { + return { + container: { + properties: { + id: { + type: 'text', + fields: { + keyword: { + type: 'keyword', + ignore_above: 256, + }, + }, + }, + }, + }, + }; +} + +function getHostProp() { + return { + host: { + properties: { + name: { + type: 'text', + fields: { + keyword: { + type: 'keyword', + ignore_above: 256, + }, + }, + }, + }, + }, + }; +} + +function getEcsVersionProp() { + return { + ecs: { + properties: { + version: { + type: 'keyword', + }, + }, + }, + }; +} + +function getMessageProp() { + return { + message: { + type: 'text', + }, + }; +} diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.ts new file mode 100644 index 0000000000000..6f0ea294cfc0a --- /dev/null +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.ts @@ -0,0 +1,261 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + from, + defer, + delay, + filter, + tap, + take, + takeWhile, + exhaustMap, + switchMap, + map, + of, + EMPTY, +} from 'rxjs'; +import type { CoreStart, ElasticsearchClient, Logger } from '@kbn/core/server'; +import type { AnalyticsServiceSetup } from '@kbn/core/public'; +import { + TaskInstance, + TaskManagerSetupContract, + TaskManagerStartContract, +} from '@kbn/task-manager-plugin/server'; +import type { TelemetryPluginStart } from '@kbn/telemetry-plugin/server'; + +import { + BREATHE_DELAY_MEDIUM, + BREATHE_DELAY_SHORT, + NON_LOG_SIGNALS, + EXCLUDE_ELASTIC_LOGS, + MAX_STREAMS_TO_REPORT, + LOGS_DATASET_INDEX_PATTERNS, + LOGS_DATA_TELEMETRY_TASK_TYPE, + TELEMETRY_TASK_INTERVAL, + LOGS_DATA_TELEMETRY_TASK_ID, + TELEMETRY_TASK_TIMEOUT, +} from './constants'; +import { + getAllIndices, + addMappingsToIndices, + addNamespace, + groupStatsByPatternName, + getIndexBasicStats, + indexStatsToTelemetryEvents, + getIndexFieldStats, +} from './helpers'; + +import { DataTelemetryEvent } from './types'; + +export class DataTelemetryService { + private readonly logger: Logger; + private isStopped = false; + + private telemetryStart?: TelemetryPluginStart; + + // @ts-ignore: Unused variable + private analytics?: AnalyticsServiceSetup; + + // @ts-ignore: Unused variable + private isInProgress = false; + + private isOptedIn?: boolean = true; // Assume true until the first check + private esClient?: ElasticsearchClient; + + private run$ = defer(() => from(this.shouldCollectTelemetry())).pipe( + takeWhile(() => !this.isStopped), + tap((isOptedIn) => { + if (!isOptedIn) { + this.logTelemetryNotOptedIn(); + this.isInProgress = false; + } else { + this.isInProgress = true; + } + }), + filter((isOptedIn) => isOptedIn), + exhaustMap(() => this.collectAndSend()), + tap(() => (this.isInProgress = false)) + ); + + constructor(logger: Logger) { + this.logger = logger; + } + + public setup(analytics: AnalyticsServiceSetup, taskManager: TaskManagerSetupContract) { + this.analytics = analytics; + this.registerTask(taskManager); + } + + public async start( + telemetryStart: TelemetryPluginStart, + core: CoreStart, + taskManager: TaskManagerStartContract + ) { + this.telemetryStart = telemetryStart; + this.esClient = core?.elasticsearch.client.asInternalUser; + + if (taskManager) { + const taskInstance = await this.scheduleTask(taskManager); + if (taskInstance) { + this.logger.debug(`Task ${taskInstance.id} scheduled.`); + } + } + } + + public stop() { + this.isStopped = true; + } + + private registerTask(taskManager: TaskManagerSetupContract) { + const service = this; + taskManager.registerTaskDefinitions({ + [LOGS_DATA_TELEMETRY_TASK_TYPE]: { + title: 'Logs Data Telemetry', + description: + 'This task collects data telemetry for logs data and sends it to the telemetry service.', + timeout: `${TELEMETRY_TASK_TIMEOUT}m`, + maxAttempts: 1, // Do not retry + + createTaskRunner: () => { + return { + // Perform the work of the task. The return value should fit the TaskResult interface. + async run() { + service.logger.debug(`[Logs Data Telemetry] Running task`); + + try { + service.run$.pipe(take(1)).subscribe({ + complete: () => { + service.logger.debug(`[Logs Data Telemetry] Task completed`); + }, + }); + } catch (e) { + service.logger.error(e); + } + }, + async cancel() { + service.logger.debug(`[Logs Data Telemetry] Task cancelled`); + }, + }; + }, + }, + }); + } + + private async scheduleTask(taskManager: TaskManagerStartContract): Promise { + try { + const taskInstance = await taskManager.ensureScheduled({ + id: LOGS_DATA_TELEMETRY_TASK_ID, + taskType: LOGS_DATA_TELEMETRY_TASK_TYPE, + schedule: { + interval: `${TELEMETRY_TASK_INTERVAL}m`, + }, + params: {}, + state: {}, + scope: ['logs'], + }); + + this.logger?.debug( + `Task ${LOGS_DATA_TELEMETRY_TASK_ID} scheduled with interval ${taskInstance.schedule?.interval}.` + ); + + return taskInstance; + } catch (e) { + this.logger?.error( + `Failed to schedule task ${LOGS_DATA_TELEMETRY_TASK_ID} with interval ${TELEMETRY_TASK_INTERVAL}. ${e?.message}` + ); + + return null; + } + } + + public async shouldCollectTelemetry() { + if (process.env.CI) { + return false; // Telemetry collection flow should not run in CI + } + + this.isOptedIn = await this.telemetryStart?.getIsOptedIn(); + return this.isOptedIn === true; + } + + private collectAndSend() { + // Gather data streams and indices related to each stream of log + if (this.esClient) { + return getAllIndices({ + esClient: this.esClient, + logsIndexPatterns: LOGS_DATASET_INDEX_PATTERNS, + excludeStreamsStartingWith: [...NON_LOG_SIGNALS, ...EXCLUDE_ELASTIC_LOGS], + breatheDelay: BREATHE_DELAY_MEDIUM, + }).pipe( + switchMap((dataStreamsAndIndicesInfo) => { + if (dataStreamsAndIndicesInfo.length > MAX_STREAMS_TO_REPORT) { + this.logger.debug( + `[Logs Data Telemetry] Number of data streams exceeds ${MAX_STREAMS_TO_REPORT}. Skipping telemetry collection.` + ); + return EMPTY; + } + return of(dataStreamsAndIndicesInfo); + }), + delay(BREATHE_DELAY_MEDIUM), + switchMap((dataStreamsAndIndicesInfo) => { + return addMappingsToIndices({ + esClient: this.esClient!, + dataStreamsInfo: dataStreamsAndIndicesInfo, + logsIndexPatterns: LOGS_DATASET_INDEX_PATTERNS, + }); + }), + delay(BREATHE_DELAY_SHORT), + switchMap((dataStreamsAndIndicesInfo) => { + return addNamespace({ + dataStreamsInfo: dataStreamsAndIndicesInfo, + }); + }), + delay(BREATHE_DELAY_MEDIUM), + switchMap((infoWithNamespace) => { + return getIndexBasicStats({ + esClient: this.esClient!, + indices: infoWithNamespace, + breatheDelay: BREATHE_DELAY_MEDIUM, + }); + }), + delay(BREATHE_DELAY_SHORT), + switchMap((infoWithStats) => { + return getIndexFieldStats({ + basicStats: infoWithStats, + }); + }), + delay(BREATHE_DELAY_SHORT), + map((statsWithNamespace) => { + return groupStatsByPatternName(statsWithNamespace); + }), + map((statsByPattern) => { + return indexStatsToTelemetryEvents(statsByPattern); + }), + delay(BREATHE_DELAY_SHORT), + switchMap((dataTelemetryEvents) => { + return from(this.reportEvents(dataTelemetryEvents)); + }) + ); + } else { + this.logger.warn( + `[Logs Data Telemetry] Elasticsearch client is unavailable: cannot retrieve data streams + for stream of logs` + ); + + return EMPTY; + } + } + + private async reportEvents(events: DataTelemetryEvent[]) { + // TODO: Implement reporting events via analytics service + return Promise.resolve(events); + } + + private logTelemetryNotOptedIn() { + this.logger.debug(`[Logs Data Telemetry] Telemetry is not opted-in.`); + } +} diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/helpers.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/helpers.ts new file mode 100644 index 0000000000000..c22bc64d22086 --- /dev/null +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/helpers.ts @@ -0,0 +1,575 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { intersection } from 'lodash'; +import { from, of, Observable, concatMap, delay, map, toArray, forkJoin } from 'rxjs'; +import { + MappingPropertyBase, + IndicesGetMappingResponse, + IndicesStatsResponse, +} from '@elastic/elasticsearch/lib/api/types'; +import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { DataStreamFieldStatsPerNamespace, DatasetIndexPattern } from './types'; + +import { + IndexBasicInfo, + DataStreamStatsPerNamespace, + DataStreamStats, + DataTelemetryEvent, +} from './types'; +import { + DATA_TELEMETRY_FIELDS, + LEVEL_2_RESOURCE_FIELDS, + PROMINENT_LOG_ECS_FIELDS, +} from './constants'; + +/** + * Retrieves all indices and data streams for each stream of logs. + */ +export function getAllIndices({ + esClient, + logsIndexPatterns, + excludeStreamsStartingWith, + breatheDelay, +}: { + esClient: ElasticsearchClient; + logsIndexPatterns: DatasetIndexPattern[]; + excludeStreamsStartingWith: string[]; + breatheDelay: number; // Breathing time between each request to prioritize other cluster operations +}): Observable { + const uniqueIndices = new Set(); + const indicesInfo: IndexBasicInfo[] = []; + + return from(logsIndexPatterns).pipe( + concatMap((pattern) => + of(pattern).pipe( + delay(breatheDelay), + concatMap(() => { + return forkJoin([ + from(getDataStreamsInfoForPattern({ esClient, pattern })), + from(getIndicesInfoForPattern({ esClient, pattern })), + ]); + }), + map(([patternDataStreamsInfo, patternIndicesInfo]) => { + return [...patternDataStreamsInfo, ...patternIndicesInfo]; + }), + map((indicesAndDataStreams) => { + // Exclude indices that have already been dealt with + return indicesAndDataStreams.filter((dataStream) => { + return !uniqueIndices.has(dataStream.name); + }); + }), + map((indicesAndDataStreams) => { + // Exclude internal or backing indices + return indicesAndDataStreams.filter((dataStream) => !dataStream.name.startsWith('.')); + }), + map((indicesAndDataStreams) => { + return indicesAndDataStreams.filter( + // Exclude streams starting with non log known signals + (dataStream) => + !excludeStreamsStartingWith.some((excludeStream) => + dataStream.name.startsWith(excludeStream) + ) + ); + }), + map((indicesAndDataStreams) => { + indicesAndDataStreams.forEach((dataStream) => { + uniqueIndices.add(dataStream.name); + }); + return indicesAndDataStreams; + }), + map((dataStreamsInfoRecords) => { + indicesInfo.push(...dataStreamsInfoRecords); + return dataStreamsInfoRecords; + }) + ) + ), + toArray(), + map(() => indicesInfo) + ); +} + +/** + * Retrieves the mappings at once and adds to the indices info. + */ +export function addMappingsToIndices({ + esClient, + dataStreamsInfo, + logsIndexPatterns, +}: { + esClient: ElasticsearchClient; + dataStreamsInfo: IndexBasicInfo[]; + logsIndexPatterns: DatasetIndexPattern[]; +}): Observable { + return from( + esClient.indices.getMapping({ + index: logsIndexPatterns.map((pattern) => pattern.pattern), + }) + ).pipe( + map((mappings) => { + return dataStreamsInfo.map((info) => { + // Add mapping for each index + info.indices.forEach((index) => { + if (mappings[index]) { + info.mapping = { ...(info.mapping ?? {}), [index]: mappings[index] }; + } + }); + + return info; + }); + }) + ); +} + +/** + * Adds the namespace of the index from index mapping if available. + */ +export function addNamespace({ + dataStreamsInfo, +}: { + dataStreamsInfo: IndexBasicInfo[]; +}): Observable { + return from(dataStreamsInfo).pipe( + concatMap((indexInfo) => + of(indexInfo).pipe( + map((dataStream) => getIndexNamespace(dataStream)), + map((namespace) => { + indexInfo.namespace = namespace; + return indexInfo; + }) + ) + ), + toArray() + ); +} + +export function groupStatsByPatternName( + dataStreamsStats: DataStreamFieldStatsPerNamespace[] +): DataStreamStats[] { + const uniqueNamespaces = new Set(); + const uniqueFields = new Set(); + const statsByStream = dataStreamsStats.reduce>((acc, stats) => { + if (!stats.patternName) { + return acc; + } + + if (!acc.get(stats.patternName)) { + acc.set(stats.patternName, { + streamName: stats.patternName, + shipper: stats.shipper, + totalNamespaces: 0, + totalDocuments: 0, + failureStoreDocuments: 0, + failureStoreIndices: 0, + totalSize: 0, + totalIndices: 0, + totalFields: 0, + structureLevel: {}, + fieldsCount: {}, + managedBy: [], + packageName: [], + beat: [], + }); + } + + const streamStats = acc.get(stats.patternName)!; + + // Track unique namespaces + if (stats.namespace) { + uniqueNamespaces.add(stats.namespace); + } + streamStats.totalNamespaces = uniqueNamespaces.size; + + // Track unique fields + stats.uniqueFields.forEach((field) => uniqueFields.add(field)); + streamStats.totalFields = uniqueFields.size; + + // Aggregate structure levels + for (const [level, count] of Object.entries(stats.structureLevel)) { + streamStats.structureLevel[Number(level)] = + (streamStats.structureLevel[Number(level)] ?? 0) + count; + } + + streamStats.totalDocuments += stats.totalDocuments; + streamStats.totalIndices += stats.totalIndices; + streamStats.failureStoreDocuments += stats.failureStoreDocuments; + streamStats.failureStoreIndices += stats.failureStoreIndices; + streamStats.totalSize += stats.totalSize; + + for (const [field, count] of Object.entries(stats.fieldsCount)) { + streamStats.fieldsCount[field] = (streamStats.fieldsCount[field] ?? 0) + count; + } + + if (stats.meta?.managed_by) { + streamStats.managedBy.push(stats.meta.managed_by); + } + + if (stats.meta?.package?.name) { + streamStats.packageName.push(stats.meta.package.name); + } + + if (stats.meta?.beat) { + streamStats.beat.push(stats.meta.beat); + } + + return acc; + }, new Map()); + + return Array.from(statsByStream.values()); +} + +export function getIndexBasicStats({ + esClient, + indices, + breatheDelay, +}: { + esClient: ElasticsearchClient; + indices: IndexBasicInfo[]; + breatheDelay: number; +}): Observable { + const indexNames = indices.map((info) => info.name); + + return from( + esClient.indices.stats({ + index: indexNames, + }) + ).pipe( + delay(breatheDelay), + concatMap((allIndexStats) => { + return from(getFailureStoreStats({ esClient, indexName: indexNames.join(',') })).pipe( + map((allFailureStoreStats) => { + return indices.map((info) => + getIndexStats(allIndexStats.indices, allFailureStoreStats, info) + ); + }) + ); + }) + ); +} + +export function getIndexFieldStats({ + basicStats, +}: { + basicStats: DataStreamStatsPerNamespace[]; +}): Observable { + return from(basicStats).pipe( + map((stats) => getFieldStatsAndStructureLevels(stats, DATA_TELEMETRY_FIELDS)), + toArray() + ); +} + +export function indexStatsToTelemetryEvents(stats: DataStreamStats[]): DataTelemetryEvent[] { + return stats.map((stat) => ({ + pattern_name: stat.streamName, + shipper: stat.shipper, + doc_count: stat.totalDocuments, + structure_level: stat.structureLevel, + index_count: stat.totalIndices, + failure_store_doc_count: stat.failureStoreDocuments, + failure_store_index_count: stat.failureStoreIndices, + namespace_count: stat.totalNamespaces, + field_count: stat.totalFields, + field_existence: stat.fieldsCount, + size_in_bytes: stat.totalSize, + managed_by: Array.from(new Set(stat.managedBy)), + package_name: Array.from(new Set(stat.packageName)), + beat: Array.from(new Set(stat.beat)), + })); +} + +/** + * Retrieves information about data streams matching a given pattern. + */ +async function getDataStreamsInfoForPattern({ + esClient, + pattern, +}: { + esClient: ElasticsearchClient; + pattern: DatasetIndexPattern; +}): Promise { + const resp = await esClient.indices.getDataStream({ + name: pattern.pattern, + expand_wildcards: 'all', + }); + + return resp.data_streams.map((dataStream) => ({ + patternName: pattern.patternName, + shipper: pattern.shipper, + isDataStream: true, + name: dataStream.name, + indices: dataStream.indices.map((index) => index.index_name), + mapping: undefined, + meta: dataStream._meta, + })); +} + +async function getIndicesInfoForPattern({ + esClient, + pattern, +}: { + esClient: ElasticsearchClient; + pattern: DatasetIndexPattern; +}): Promise { + const resp = await esClient.indices.get({ + index: pattern.pattern, + }); + + return Object.entries(resp).map(([index, indexInfo]) => { + // This is needed to keep the format same for data streams and indices + const indexMapping: IndicesGetMappingResponse | undefined = indexInfo.mappings + ? { + [index]: { mappings: indexInfo.mappings }, + } + : undefined; + + return { + patternName: pattern.patternName, + shipper: pattern.shipper, + isDataStream: false, + name: index, + indices: [index], + mapping: indexMapping, + meta: indexInfo.mappings?._meta, + }; + }); +} + +/** + * Retrieves the namespace of index by checking the mappings of backing indices. + * + * @param {Object} indexInfo - The information about the index. + * @returns {string | undefined} - The namespace of the data stream found in the mapping. + */ +function getIndexNamespace(indexInfo: IndexBasicInfo): string | undefined { + for (let i = 0; i < indexInfo.indices.length; i++) { + const index = indexInfo.indices[i]; + const indexMapping = indexInfo.mapping?.[index]?.mappings; + const dataStreamMapping: MappingPropertyBase | undefined = + indexMapping?.properties?.data_stream; + if (!dataStreamMapping) { + continue; + } + const namespace = (dataStreamMapping?.properties?.namespace as { value?: string })?.value; + if (namespace) { + return namespace; + } + } + + return undefined; +} + +async function getFailureStoreStats({ + esClient, + indexName, +}: { + esClient: ElasticsearchClient; + indexName: string; +}): Promise { + try { + // TODO: Use the failure store API when it is available + const resp = await esClient.transport.request>({ + method: 'GET', + path: `/${indexName}/_stats`, + querystring: { + failure_store: 'only', + }, + }); + + return (await resp).indices; + } catch (e) { + // Failure store API may not be available + return {}; + } +} + +export function getIndexStats( + allIndexStats: IndicesStatsResponse['indices'], + allFailureStoreStats: IndicesStatsResponse['indices'], + info: IndexBasicInfo +): DataStreamStatsPerNamespace { + let totalDocs = 0; + let totalSize = 0; + let totalIndices = 0; + const indexStats: IndicesStatsResponse['indices'] = {}; + let failureStoreDocs = 0; + let failureStoreIndices = 0; + const failureStoreStats: IndicesStatsResponse['indices'] = {}; + Object.entries(allIndexStats ?? {}).forEach(([indexName, stats]) => { + if (indexName.includes(info.name)) { + totalDocs += stats.primaries?.docs?.count ?? 0; + totalSize += stats.primaries?.store?.size_in_bytes ?? 0; + totalIndices++; + + indexStats[indexName] = stats; + } + }); + + Object.entries(allFailureStoreStats ?? {}).forEach(([indexName, stats]) => { + if (indexName.includes(info.name)) { + failureStoreDocs += stats.primaries?.docs?.count ?? 0; + failureStoreIndices++; + + failureStoreStats[indexName] = stats; + } + }); + + return { + patternName: info.patternName, + shipper: info.shipper, + namespace: info.namespace, + totalDocuments: totalDocs, + totalSize, + totalIndices, + failureStoreDocuments: failureStoreDocs, + failureStoreIndices, + meta: info.meta, + mapping: info.mapping, + indexStats, + failureStoreStats, + }; +} + +function getFieldStatsAndStructureLevels( + stats: DataStreamStatsPerNamespace, + fieldsToCheck: string[] +): DataStreamFieldStatsPerNamespace { + const uniqueFields = new Set(); + const structureLevel: Record = {}; + + // Loop through each index and get the number of fields and gather how many documents have that field + const resourceFieldCounts: Record = {}; + const indexNames = Object.keys(stats.indexStats ?? {}); + for (const backingIndex of indexNames) { + const indexStats = stats.indexStats?.[backingIndex]; + const indexMapping = stats.mapping?.[backingIndex]?.mappings; + if (!indexMapping) { + continue; + } + + // Get all fields from the mapping + const indexFieldsMap = getFieldPathsMapFromMapping(indexMapping); + const indexFieldsList = Object.keys(indexFieldsMap); + indexFieldsList.forEach((field) => uniqueFields.add(field)); + + const indexDocCount = indexStats?.primaries?.docs?.count ?? 0; + if (!indexDocCount) { + continue; + } + + const indexStructureLevel = getStructureLevelForFieldsList(stats, indexFieldsMap); + structureLevel[indexStructureLevel] = + (structureLevel[indexStructureLevel] ?? 0) + indexDocCount; + + for (const field of fieldsToCheck) { + if (indexFieldsMap[field]) { + resourceFieldCounts[field] = (resourceFieldCounts[field] ?? 0) + indexDocCount; + } + } + } + + return { + ...stats, + uniqueFields: Array.from(uniqueFields), + structureLevel, + fieldsCount: resourceFieldCounts, + }; +} + +/** + * Determines the structure level of log documents based on the fields present in the list. + * + * Structure Levels: + * - Level 0: Unstructured data. No `@timestamp` or `timestamp` field. + * - Level 1: Contains `@timestamp` or `timestamp` field. + * - Level 2: Contains any of resource fields (`host.name`, `service.name`, `host`, `hostname`, `host_name`). + * - Level 3: Contains `@timestamp`, resource fields, and `message` field. + * - Level 4: Index name complies with a pattern of known shipper e.g. `logstash-*`, `heartbeat-*`. + * - Level 5a: Data stream naming scheme exists (`data_stream.dataset`, `data_stream.type`, `data_stream.namespace`). + * - Level 5b: Contains at least 3 ECS fields or `ecs.version` field. + * - Level 6: Part of an integration, managed by a known entity. + * + * @param stats - Container pattern, shipper and meta info + * @param fieldsMap - Dictionary/Map of fields present in the index with full path as key. + * @returns {number} - The structure level of the index. + */ +function getStructureLevelForFieldsList( + stats: DataStreamStatsPerNamespace, + fieldsMap: Record +): number { + // Check level 1, if @timestamp or timestamp exists + if (!fieldsMap['@timestamp'] && !fieldsMap.timestamp) { + return 0; + } + + // Check level 2, if resource fields exist + if (!LEVEL_2_RESOURCE_FIELDS.some((field) => fieldsMap[field])) { + return 1; + } + + // Check level 3, if basic structure of log message exist + if ( + !fieldsMap['@timestamp'] || + !fieldsMap.message || + (!fieldsMap['host.name'] && !fieldsMap['service.name']) + ) { + return 2; + } + + // Check level 4 (Shipper is known) + if (!stats.patternName || stats.patternName === 'generic-logs') { + return 3; + } + + // Check level 5a (Data stream scheme exists) + if ( + !fieldsMap['data_stream.dataset'] || + !fieldsMap['data_stream.type'] || + !fieldsMap['data_stream.namespace'] + ) { + // Check level 5b (ECS fields exist) + const fieldsList = Object.keys(fieldsMap); + if ( + !fieldsMap['ecs.version'] && + intersection(PROMINENT_LOG_ECS_FIELDS, fieldsList).length < 3 + ) { + return 4; + } + } + + // Check level 6 (Index is managed) + if (!stats.meta?.managed_by && !stats.meta?.managed) { + return 5; + } + + // All levels are fulfilled + return 6; +} + +/** + * Recursively traverses a mapping and returns a dictionary of field paths. + * Each key in the dictionary represents a full field path in dot notation. + * + * @param {MappingPropertyBase} mapping - The mapping to traverse. + * @returns {Record} - A dictionary of field paths. + */ +function getFieldPathsMapFromMapping(mapping: MappingPropertyBase): Record { + const fieldPathsMap: Record = {}; + + function traverseMapping(nestedMapping: MappingPropertyBase, parentField: string = ''): void { + for (const [fieldName, field] of Object.entries(nestedMapping.properties ?? {})) { + const fullFieldName = parentField ? `${parentField}.${fieldName}` : fieldName; + if ((field as MappingPropertyBase).properties) { + traverseMapping(field as MappingPropertyBase, fullFieldName); + } else { + fieldPathsMap[fullFieldName] = true; + } + } + } + + traverseMapping(mapping); + return fieldPathsMap; +} diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/types.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/types.ts new file mode 100644 index 0000000000000..0ccae5e83f1be --- /dev/null +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/types.ts @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + IndexName, + IndicesGetMappingResponse, + IndicesStatsResponse, + Metadata, +} from '@elastic/elasticsearch/lib/api/types'; + +export interface DatasetIndexPattern { + pattern: string; + patternName: string; + shipper?: string; +} + +export interface IndexBasicInfo { + patternName: string; + shipper?: string; + isDataStream: boolean; + name: string; + indices: IndexName[]; + meta?: Metadata; + mapping?: IndicesGetMappingResponse; + namespace?: string; +} + +export interface DataStreamStatsPerNamespace { + patternName: string; + shipper?: string; + namespace?: string; + totalDocuments: number; + totalSize: number; + totalIndices: number; + failureStoreDocuments: number; + failureStoreIndices: number; + meta?: Metadata; + mapping?: IndicesGetMappingResponse; + indexStats: IndicesStatsResponse['indices']; + failureStoreStats: IndicesStatsResponse['indices']; +} + +export interface DataStreamStatsWithLevelsPerNamespace extends DataStreamStatsPerNamespace { + structureLevel: Record; +} + +export interface DataStreamFieldStatsPerNamespace extends DataStreamStatsWithLevelsPerNamespace { + uniqueFields: string[]; + fieldsCount: Record; +} + +export interface DataStreamStats { + streamName: string; + shipper?: string; + totalNamespaces: number; + totalDocuments: number; + structureLevel: Record; + failureStoreDocuments: number; + failureStoreIndices: number; + totalSize: number; + totalIndices: number; + totalFields: number; + fieldsCount: Record; + managedBy: string[]; + packageName: string[]; + beat: string[]; +} + +export interface DataTelemetryEvent { + pattern_name: string; + shipper?: string; + doc_count: number; + structure_level: Record; + failure_store_doc_count: number; + index_count: number; + namespace_count: number; + field_count: number; + field_existence: Record; + size_in_bytes: number; + managed_by: string[]; + package_name: string[]; + beat: string[]; +} diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/index.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/index.ts index f1da1af720137..97106526e497c 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/services/index.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/index.ts @@ -8,3 +8,4 @@ export { dataStreamService } from './data_stream'; export { indexStatsService } from './index_stats'; export { datasetQualityPrivileges } from './privileges'; +export { DataTelemetryService } from './data_telemetry/data_telemetry_service'; diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/types.ts b/x-pack/plugins/observability_solution/dataset_quality/server/types.ts index 3874040f5d3bf..0ba17c5bb22c6 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/types.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/types.ts @@ -6,14 +6,26 @@ */ import { CustomRequestHandlerContext } from '@kbn/core/server'; -import { FleetSetupContract, FleetStartContract } from '@kbn/fleet-plugin/server'; +import type { AnalyticsServiceSetup, AnalyticsServiceStart } from '@kbn/core-analytics-server'; +import type { FleetSetupContract, FleetStartContract } from '@kbn/fleet-plugin/server'; +import { + TaskManagerSetupContract, + TaskManagerStartContract, +} from '@kbn/task-manager-plugin/server'; +import type { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server'; export interface DatasetQualityPluginSetupDependencies { fleet: FleetSetupContract; + analytics: AnalyticsServiceSetup; + telemetry: TelemetryPluginSetup; + taskManager: TaskManagerSetupContract; } export interface DatasetQualityPluginStartDependencies { fleet: FleetStartContract; + telemetry: TelemetryPluginStart; + analytics: AnalyticsServiceStart; + taskManager: TaskManagerStartContract; } // eslint-disable-next-line @typescript-eslint/no-empty-interface diff --git a/x-pack/plugins/observability_solution/dataset_quality/tsconfig.json b/x-pack/plugins/observability_solution/dataset_quality/tsconfig.json index 8bec2d8cb1a63..aab3038a9436f 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/tsconfig.json +++ b/x-pack/plugins/observability_solution/dataset_quality/tsconfig.json @@ -53,10 +53,13 @@ "@kbn/ebt-tools", "@kbn/fields-metadata-plugin", "@kbn/server-route-repository-utils", + "@kbn/core-analytics-server", "@kbn/core-analytics-browser", "@kbn/core-lifecycle-browser", "@kbn/core-notifications-browser", - "@kbn/rison" + "@kbn/telemetry-plugin", + "@kbn/rison", + "@kbn/task-manager-plugin" ], "exclude": [ "target/**/*" diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index cdc826919d0ab..df9cbaf33e1d6 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -146,6 +146,7 @@ export default function ({ getService }: FtrProviderContext) { 'fleet:unenroll_action:retry', 'fleet:update_agent_tags:retry', 'fleet:upgrade_action:retry', + 'logs-data-telemetry', 'observabilityAIAssistant:indexQueuedDocumentsTaskType', 'osquery:telemetry-configs', 'osquery:telemetry-packs',