From 621b19228b10b8a574d420f911f0f7356a2f7379 Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Fri, 27 Sep 2024 14:10:16 +1000 Subject: [PATCH] [8.x] [Logs Data Telemetry] Report telemetry events to telemetry service (#192868) (#194244) # Backport This will backport the following commits from `main` to `8.x`: - [[Logs Data Telemetry] Report telemetry events to telemetry service (#192868)](https://github.com/elastic/kibana/pull/192868) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) Co-authored-by: Abdul Wahab Zahid --- .../dataset_quality/kibana.jsonc | 3 +- .../dataset_quality/server/plugin.ts | 2 +- .../services/data_telemetry/constants.ts | 4 +- .../data_telemetry_service.test.ts | 229 ++++++++++-------- .../data_telemetry/data_telemetry_service.ts | 162 +++++++++---- .../data_telemetry/register_collector.ts | 153 ++++++++++++ .../server/services/data_telemetry/types.ts | 9 + .../dataset_quality/server/types.ts | 6 +- .../dataset_quality/tsconfig.json | 2 +- .../schema/xpack_plugins.json | 142 +++++++++++ .../apis/telemetry/telemetry_local.ts | 7 + 11 files changed, 572 insertions(+), 147 deletions(-) create mode 100644 x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/register_collector.ts diff --git a/x-pack/plugins/observability_solution/dataset_quality/kibana.jsonc b/x-pack/plugins/observability_solution/dataset_quality/kibana.jsonc index cb16a7ce0455e..62b82fcb3a878 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/kibana.jsonc +++ b/x-pack/plugins/observability_solution/dataset_quality/kibana.jsonc @@ -24,7 +24,8 @@ "dataViews", "lens", "fieldsMetadata", - "taskManager" + "taskManager", + "usageCollection" ], "optionalPlugins": ["telemetry"], "requiredBundles": ["unifiedHistogram", "discover"], diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/plugin.ts b/x-pack/plugins/observability_solution/dataset_quality/server/plugin.ts index 7c51e40c870a1..86f89d3c0d146 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/plugin.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/plugin.ts @@ -57,7 +57,7 @@ export class DatasetQualityServerPlugin implements Plugin { }); // Setup Data Telemetry Service - this.dataTelemetryService.setup(core.analytics, plugins.taskManager); + this.dataTelemetryService.setup(plugins.taskManager, plugins.usageCollection); return {}; } diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/constants.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/constants.ts index 7f03b4d67ce5c..85fc201333f28 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/constants.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/constants.ts @@ -10,7 +10,7 @@ import { DATA_DATASETS_INDEX_PATTERNS_UNIQUE } from '@kbn/telemetry-plugin/serve import { DatasetIndexPattern } from './types'; export const LOGS_DATA_TELEMETRY_TASK_TYPE = 'logs-data-telemetry'; -export const LOGS_DATA_TELEMETRY_TASK_ID = 'logs-data-telemetry:collect-and-report-task-2'; +export const LOGS_DATA_TELEMETRY_TASK_ID = 'logs-data-telemetry:collect-and-report-task'; export const TELEMETRY_TASK_INTERVAL = 24 * 60; // 24 hours (in minutes) export const TELEMETRY_TASK_TIMEOUT = 10; // 10 minutes @@ -23,8 +23,6 @@ export const MAX_STREAMS_TO_REPORT = 1000; export const NON_LOG_SIGNALS = ['metrics', 'traces', 'internal', 'synthetics']; export const EXCLUDE_ELASTIC_LOGS = ['logs-synth', 'logs-elastic', 'logs-endpoint']; -export const TELEMETRY_CHANNEL = 'logs-data-telemetry'; - type ObsPatternName = (typeof DATA_DATASETS_INDEX_PATTERNS_UNIQUE)[number]['patternName']; const LOGS_INDEX_PATTERN_NAMES: ObsPatternName[] = [ 'filebeat', diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.test.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.test.ts index cbbdf8bbfa518..8ade977309da5 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.test.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.test.ts @@ -5,13 +5,16 @@ * 2.0. */ -import { ElasticsearchClient, type Logger } from '@kbn/core/server'; -import type { AnalyticsServiceSetup } from '@kbn/core/public'; -import { TelemetryPluginStart } from '@kbn/telemetry-plugin/server'; +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server'; +import type { FetchResult } from '@kbn/task-manager-plugin/server/task_store'; +import type { TelemetryPluginStart } from '@kbn/telemetry-plugin/server'; import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; +import type { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server'; +import { createUsageCollectionSetupMock } from '@kbn/usage-collection-plugin/server/mocks'; -import { DataTelemetryEvent } from './types'; -import { BREATHE_DELAY_MEDIUM, MAX_STREAMS_TO_REPORT } from './constants'; +import { DataTelemetryEvent, DataTelemetryObject } from './types'; +import { MAX_STREAMS_TO_REPORT } from './constants'; import { DataTelemetryService } from './data_telemetry_service'; // Mock the constants module to speed up and simplify the tests @@ -43,26 +46,41 @@ const SYNTH_DOCS = 6000000; describe('DataTelemetryService', () => { let service: DataTelemetryService; let mockEsClient: jest.Mocked; - let mockAnalyticsSetup: jest.Mocked; + let mockUsageCollectionSetup: jest.Mocked; let mockTelemetryStart: jest.Mocked; let mockLogger: jest.Mocked; let mockTaskManagerSetup: ReturnType; let mockTaskManagerStart: ReturnType; let runTask: ReturnType['runTask']; - describe('Data Telemetry Task', () => { + let exampleEvent: Partial; + let exampleTaskData: DataTelemetryObject; + let exampleTaskFetchResult: FetchResult; + + describe('Task and Collector Setup', () => { beforeEach(async () => { const mocks = setupMocks(); mockEsClient = mocks.mockEsClient; mockLogger = mocks.mockLogger; - mockAnalyticsSetup = mocks.mockAnalyticsSetup; + mockUsageCollectionSetup = mocks.mockUsageCollectionSetup; mockTelemetryStart = mocks.mockTelemetryStart; mockTaskManagerSetup = mocks.taskManagerSetup; mockTaskManagerStart = mocks.taskManagerStart; runTask = mocks.runTask; + exampleEvent = { + doc_count: 555, + }; + exampleTaskData = { data: [exampleEvent as DataTelemetryEvent] }; + exampleTaskFetchResult = { + docs: [{ state: { ran: true, data: [exampleEvent] } } as unknown as ConcreteTaskInstance], + versionMap: new Map(), + } as FetchResult; + + mockTaskManagerStart.fetch.mockResolvedValue(exampleTaskFetchResult); + service = new DataTelemetryService(mockLogger); - service.setup(mockAnalyticsSetup, mockTaskManagerSetup); + service.setup(mockTaskManagerSetup, mockUsageCollectionSetup); await service.start( mockTelemetryStart, { @@ -77,14 +95,49 @@ describe('DataTelemetryService', () => { jest.clearAllMocks(); }); + it('should register usage collection', () => { + expect(mockUsageCollectionSetup.makeUsageCollector).toHaveBeenCalledTimes(1); + expect(mockUsageCollectionSetup.registerCollector).toHaveBeenCalledTimes(1); + }); + it('should trigger task runner run method', async () => { jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); - const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend'); + const collectTelemetryDataSpy = jest.spyOn(service as any, 'collectTelemetryData'); + + await runTask(); + + // Assert collectTelemetryData is called + expect(collectTelemetryDataSpy).toHaveBeenCalledTimes(1); + }); + it('isReady and fetch of usage collector reflect the correct state', async () => { await runTask(); - // Assert collectAndSend is called - expect(collectAndSendSpy).toHaveBeenCalledTimes(1); + const collector = mockUsageCollectionSetup.makeUsageCollector.mock.results[0].value; + expect(await collector.isReady()).toBe(true); + expect(await collector.fetch()).toEqual(exampleTaskData); + }); + + it('isReady of usage collector should be false while the task is in progress', async () => { + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + const taskRunPromise = runTask(); + + const collector = mockUsageCollectionSetup.makeUsageCollector.mock.results[0].value; + expect(await collector.isReady()).toBe(false); + + await taskRunPromise; + }); + + it('isReady of usage collector should be false if collection is stopped', async () => { + jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); + service.stop(); + await runTask(); + + const collector = mockUsageCollectionSetup.makeUsageCollector.mock.results[0].value; + expect(await collector.isReady()).toBe(false); + + service.resume(); + expect(await collector.isReady()).toBe(true); }); }); @@ -93,14 +146,14 @@ describe('DataTelemetryService', () => { const mocks = setupMocks(); mockEsClient = mocks.mockEsClient; mockLogger = mocks.mockLogger; - mockAnalyticsSetup = mocks.mockAnalyticsSetup; + mockUsageCollectionSetup = mocks.mockUsageCollectionSetup; mockTelemetryStart = mocks.mockTelemetryStart; mockTaskManagerSetup = mocks.taskManagerSetup; mockTaskManagerStart = mocks.taskManagerStart; runTask = mocks.runTask; service = new DataTelemetryService(mockLogger); - service.setup(mockAnalyticsSetup, mockTaskManagerSetup); + service.setup(mockTaskManagerSetup, mockUsageCollectionSetup); await service.start( mockTelemetryStart, { @@ -118,38 +171,36 @@ describe('DataTelemetryService', () => { }); it( - 'should collect and send telemetry after startup and every interval', + 'should collect telemetry after startup and every interval', async () => { - const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend'); + const collectTelemetryDataSpy = jest.spyOn(service as any, 'collectTelemetryData'); await runTask(); - expect(collectAndSendSpy).toHaveBeenCalledTimes(1); + expect(collectTelemetryDataSpy).toHaveBeenCalledTimes(1); - await sleepForBreathDelay(); expect(mockEsClient.indices.getMapping).toHaveBeenCalledTimes(1); await runTask(); - expect(collectAndSendSpy).toHaveBeenCalledTimes(2); + expect(collectTelemetryDataSpy).toHaveBeenCalledTimes(2); - await sleepForBreathDelay(); expect(mockEsClient.indices.getMapping).toHaveBeenCalledTimes(2); }, TEST_TIMEOUT ); it( - 'should stop collecting and sending telemetry if stopped', + 'should stop collection if telemetry is stopped', async () => { - const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend'); + const collectTelemetryDataSpy = jest.spyOn(service as any, 'collectTelemetryData'); await runTask(); - expect(collectAndSendSpy).toHaveBeenCalledTimes(1); + expect(collectTelemetryDataSpy).toHaveBeenCalledTimes(1); service.stop(); - await runTask(); - await sleepForBreathDelay(); - expect(collectAndSendSpy).toHaveBeenCalledTimes(1); + const taskResult = await runTask(); + expect(taskResult?.state.data).toBeNull(); + expect(collectTelemetryDataSpy).toHaveBeenCalledTimes(1); }, TEST_TIMEOUT ); @@ -159,14 +210,14 @@ describe('DataTelemetryService', () => { async () => { jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(false); - const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend'); + const collectTelemetryDataSpy = jest.spyOn(service as any, 'collectTelemetryData'); await runTask(); - expect(collectAndSendSpy).not.toHaveBeenCalled(); + expect(collectTelemetryDataSpy).not.toHaveBeenCalled(); - await runTask(); - await sleepForBreathDelay(); - expect(collectAndSendSpy).not.toHaveBeenCalled(); + const taskResult = await runTask(); + expect(taskResult?.state.data).toBeNull(); + expect(collectTelemetryDataSpy).not.toHaveBeenCalled(); // Assert that logger.debug is called with appropriate message expect(mockLogger.debug).toHaveBeenCalledWith( @@ -194,45 +245,50 @@ describe('DataTelemetryService', () => { })), }); - await runTask(); - await sleepForBreathDelay(); + const taskResult = await runTask(); + expect(taskResult?.state.data).toBeNull(); expect(mockEsClient.indices.getMapping).not.toHaveBeenCalled(); }, TEST_TIMEOUT ); it( - 'creates and sends the telemetry events', + 'creates telemetry events', async () => { jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); - const reportEventsSpy = jest.spyOn(service as any, 'reportEvents'); - - await runTask(); - await sleepForBreathDelay(); + const taskResult = await runTask(); + expect(taskResult?.state.data).toBeTruthy(); + + const expectedEvent1 = { + doc_count: 4000 + 500 + 200, + failure_store_doc_count: 300, + index_count: 2 + 1 + 1, + failure_store_index_count: 1, + namespace_count: 1 + 1, + size_in_bytes: 10089898 + 800000 + 500000, + pattern_name: 'test', + managed_by: ['fleet'], + package_name: ['activemq'], + beat: [], + }; + const expectedEvent2 = { + beat: [], + doc_count: 1700, + index_count: 3, + namespace_count: 2, + package_name: [], + pattern_name: 'test-2', + shipper: 'custom-2', + size_in_bytes: 2300000, + }; - expect(reportEventsSpy).toHaveBeenCalledTimes(1); expect( - ( - reportEventsSpy.mock?.lastCall as [ - [Partial], - [Partial] - ] - )?.[0]?.[0] - ).toEqual( - expect.objectContaining({ - doc_count: 4000 + 500 + 200, - failure_store_doc_count: 300, - index_count: 2 + 1 + 1, - failure_store_index_count: 1, - namespace_count: 1 + 1, - size_in_bytes: 10089898 + 800000 + 500000, - pattern_name: 'test', - managed_by: ['fleet'], - package_name: ['activemq'], - beat: [], - }) - ); + taskResult?.state.data as [Partial, Partial] + ).toEqual([ + expect.objectContaining(expectedEvent1), + expect.objectContaining(expectedEvent2), + ]); }, TEST_TIMEOUT ); @@ -241,17 +297,14 @@ describe('DataTelemetryService', () => { 'should not include stats of excluded indices', async () => { jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); - const reportEventsSpy = jest.spyOn(service as any, 'reportEvents'); - await runTask(); - await sleepForBreathDelay(); + const taskResult = await runTask(); - expect(reportEventsSpy).toHaveBeenCalledTimes(1); - const events = reportEventsSpy.mock?.lastCall as [ - [Partial], - [Partial] + const events = taskResult?.state.data as [ + Partial, + Partial ]; // doc_count should be less than SYNTH_DOCS for any event - (events[0] ?? []).forEach((event) => { + (events ?? []).forEach((event) => { expect(event.doc_count).toBeLessThan(SYNTH_DOCS); }); }, @@ -280,14 +333,14 @@ describe('DataTelemetryService', () => { const mocks = setupMocks(); mockEsClient = mocks.mockEsClient; mockLogger = mocks.mockLogger; - mockAnalyticsSetup = mocks.mockAnalyticsSetup; + mockUsageCollectionSetup = mocks.mockUsageCollectionSetup; mockTelemetryStart = mocks.mockTelemetryStart; mockTaskManagerSetup = mocks.taskManagerSetup; mockTaskManagerStart = mocks.taskManagerStart; runTask = mocks.runTask; service = new DataTelemetryService(mockLogger); - service.setup(mockAnalyticsSetup, mockTaskManagerSetup); + service.setup(mockTaskManagerSetup, mockUsageCollectionSetup); await service.start( mockTelemetryStart, { @@ -309,15 +362,10 @@ describe('DataTelemetryService', () => { async () => { jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); - const reportEventsSpy = jest.spyOn(service as any, 'reportEvents'); - - await runTask(); - await sleepForBreathDelay(); - - expect(reportEventsSpy).toHaveBeenCalledTimes(1); - const lastCall = reportEventsSpy.mock?.lastCall?.[0] as [Partial]; - expect(lastCall?.[0]?.field_count).toBe(8); - expect(lastCall?.[0]?.field_existence).toEqual({ + const taskResult = await runTask(); + const firstEvent = (taskResult?.state.data as [Partial])?.[0]; + expect(firstEvent?.field_count).toBe(8); + expect(firstEvent?.field_existence).toEqual({ 'container.id': 3000 + 500, 'host.name': 3000 + 500, message: 3000, @@ -333,17 +381,12 @@ describe('DataTelemetryService', () => { it('should correctly calculate structure levels', async () => { jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true); - const reportEventsSpy = jest.spyOn(service as any, 'reportEvents'); + const taskResult = await runTask(); + const secondEvent = ( + taskResult?.state.data as [Partial, Partial] + )?.[1]; - await runTask(); - await sleepForBreathDelay(); - - expect(reportEventsSpy).toHaveBeenCalledTimes(1); - const lastCall = reportEventsSpy.mock?.lastCall?.[0] as [ - Partial, - Partial - ]; - expect(lastCall?.[1]?.structure_level).toEqual({ + expect(secondEvent?.structure_level).toEqual({ '1': 1000, '4': 500, '6': 200, @@ -352,10 +395,6 @@ describe('DataTelemetryService', () => { }); }); -function sleepForBreathDelay() { - return new Promise((resolve) => setTimeout(resolve, BREATHE_DELAY_MEDIUM * 10)); -} - function setupMocks() { const mockEsClient = { indices: { @@ -419,9 +458,7 @@ function setupMocks() { error: jest.fn(), } as unknown as jest.Mocked; - const mockAnalyticsSetup = { - getTelemetryUrl: jest.fn().mockResolvedValue(new URL('https://telemetry.elastic.co')), - } as unknown as jest.Mocked; + const mockUsageCollectionSetup = createUsageCollectionSetupMock(); const mockTelemetryStart = { getIsOptedIn: jest.fn().mockResolvedValue(true), @@ -441,7 +478,7 @@ function setupMocks() { return { mockEsClient, mockLogger, - mockAnalyticsSetup, + mockUsageCollectionSetup, mockTelemetryStart, taskManagerSetup, taskManagerStart, diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.ts index 6f0ea294cfc0a..a4f0b81c1f526 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/data_telemetry_service.ts @@ -9,25 +9,26 @@ import { from, defer, delay, - filter, tap, - take, - takeWhile, exhaustMap, switchMap, map, of, - EMPTY, + firstValueFrom, + throwError, } from 'rxjs'; -import type { CoreStart, ElasticsearchClient, Logger } from '@kbn/core/server'; -import type { AnalyticsServiceSetup } from '@kbn/core/public'; +import { CoreStart, ElasticsearchClient, Logger } from '@kbn/core/server'; import { + ConcreteTaskInstance, TaskInstance, TaskManagerSetupContract, TaskManagerStartContract, } from '@kbn/task-manager-plugin/server'; import type { TelemetryPluginStart } from '@kbn/telemetry-plugin/server'; +import { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server'; +import { TelemetryTaskState } from './types'; +import { registerLogsDataUsageCollector } from './register_collector'; import { BREATHE_DELAY_MEDIUM, BREATHE_DELAY_SHORT, @@ -50,25 +51,31 @@ import { getIndexFieldStats, } from './helpers'; -import { DataTelemetryEvent } from './types'; +const SKIP_COLLECTION = 'Skip Collection'; export class DataTelemetryService { private readonly logger: Logger; - private isStopped = false; + private taskManagerStart?: TaskManagerStartContract; private telemetryStart?: TelemetryPluginStart; - // @ts-ignore: Unused variable - private analytics?: AnalyticsServiceSetup; - - // @ts-ignore: Unused variable - private isInProgress = false; + private usageCollection?: UsageCollectionSetup; private isOptedIn?: boolean = true; // Assume true until the first check private esClient?: ElasticsearchClient; + private isStopped = false; + private isInProgress = false; + private run$ = defer(() => from(this.shouldCollectTelemetry())).pipe( - takeWhile(() => !this.isStopped), + switchMap((isOptedIn) => { + // If stopped, do not proceed + if (this.isStopped) { + return this.throwSkipCollection(); + } + + return of(isOptedIn); + }), tap((isOptedIn) => { if (!isOptedIn) { this.logTelemetryNotOptedIn(); @@ -77,8 +84,17 @@ export class DataTelemetryService { this.isInProgress = true; } }), - filter((isOptedIn) => isOptedIn), - exhaustMap(() => this.collectAndSend()), + switchMap((isOptedIn) => { + // If not opted in, do not proceed + if (!isOptedIn) { + return this.throwSkipCollection(); + } + + return of(isOptedIn); + }), + exhaustMap(() => { + return this.collectTelemetryData(); + }), tap(() => (this.isInProgress = false)) ); @@ -86,24 +102,36 @@ export class DataTelemetryService { this.logger = logger; } - public setup(analytics: AnalyticsServiceSetup, taskManager: TaskManagerSetupContract) { - this.analytics = analytics; - this.registerTask(taskManager); + public setup(taskManagerSetup: TaskManagerSetupContract, usageCollection?: UsageCollectionSetup) { + this.usageCollection = usageCollection; + + if (usageCollection) { + // Register Kibana task + this.registerTask(taskManagerSetup); + } else { + this.logger.warn( + `[Logs Data Telemetry] Usage collection service is not available: cannot collect telemetry data` + ); + } } public async start( telemetryStart: TelemetryPluginStart, core: CoreStart, - taskManager: TaskManagerStartContract + taskManagerStart: TaskManagerStartContract ) { + this.taskManagerStart = taskManagerStart; this.telemetryStart = telemetryStart; this.esClient = core?.elasticsearch.client.asInternalUser; - if (taskManager) { - const taskInstance = await this.scheduleTask(taskManager); + if (taskManagerStart && this.usageCollection) { + const taskInstance = await this.scheduleTask(taskManagerStart); if (taskInstance) { this.logger.debug(`Task ${taskInstance.id} scheduled.`); } + + // Create and register usage collector for logs data telemetry + registerLogsDataUsageCollector(this.usageCollection, this.getCollectorOptions()); } } @@ -111,34 +139,43 @@ export class DataTelemetryService { this.isStopped = true; } + public resume() { + this.isStopped = false; + } + private registerTask(taskManager: TaskManagerSetupContract) { const service = this; taskManager.registerTaskDefinitions({ [LOGS_DATA_TELEMETRY_TASK_TYPE]: { title: 'Logs Data Telemetry', description: - 'This task collects data telemetry for logs data and sends it to the telemetry service.', + 'This task collects data telemetry for logs data and sends it to the telemetry service via usage collector plugin.', timeout: `${TELEMETRY_TASK_TIMEOUT}m`, maxAttempts: 1, // Do not retry - createTaskRunner: () => { + createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { return { // Perform the work of the task. The return value should fit the TaskResult interface. async run() { - service.logger.debug(`[Logs Data Telemetry] Running task`); + const { state } = taskInstance; + let data = state?.data ?? null; try { - service.run$.pipe(take(1)).subscribe({ - complete: () => { - service.logger.debug(`[Logs Data Telemetry] Task completed`); - }, - }); + data = await firstValueFrom(service.run$); } catch (e) { - service.logger.error(e); + if (e.message === SKIP_COLLECTION) { + data = null; // Collection is skipped, skip reporting + } else { + service.logger.error(e); + } } + + return { + state: { ran: true, data }, + }; }, async cancel() { - service.logger.debug(`[Logs Data Telemetry] Task cancelled`); + service.logger.warn(`[Logs Data Telemetry] Task cancelled`); }, }; }, @@ -182,7 +219,7 @@ export class DataTelemetryService { return this.isOptedIn === true; } - private collectAndSend() { + private collectTelemetryData() { // Gather data streams and indices related to each stream of log if (this.esClient) { return getAllIndices({ @@ -196,8 +233,10 @@ export class DataTelemetryService { this.logger.debug( `[Logs Data Telemetry] Number of data streams exceeds ${MAX_STREAMS_TO_REPORT}. Skipping telemetry collection.` ); - return EMPTY; + + return this.throwSkipCollection(); } + return of(dataStreamsAndIndicesInfo); }), delay(BREATHE_DELAY_MEDIUM), @@ -234,10 +273,6 @@ export class DataTelemetryService { }), map((statsByPattern) => { return indexStatsToTelemetryEvents(statsByPattern); - }), - delay(BREATHE_DELAY_SHORT), - switchMap((dataTelemetryEvents) => { - return from(this.reportEvents(dataTelemetryEvents)); }) ); } else { @@ -246,16 +281,59 @@ export class DataTelemetryService { for stream of logs` ); - return EMPTY; + return this.throwSkipCollection(); } } - private async reportEvents(events: DataTelemetryEvent[]) { - // TODO: Implement reporting events via analytics service - return Promise.resolve(events); + private getCollectorOptions() { + return { + fetch: async () => { + // Retrieve the latest telemetry data from task manager + const taskState = await this.getLatestTaskState(); + + return { data: taskState.data ?? [] }; + }, + isReady: async () => { + const taskState = await this.getLatestTaskState(); + + return !this.isStopped && !this.isInProgress && taskState.ran && taskState.data !== null; + }, + }; + } + + private async getLatestTaskState() { + const defaultState: TelemetryTaskState = { + data: null, + ran: false, + }; + + if (this.taskManagerStart) { + try { + const fetchResult = await this.taskManagerStart.fetch({ + query: { bool: { filter: { term: { _id: `task:${LOGS_DATA_TELEMETRY_TASK_ID}` } } } }, + }); + + return (fetchResult.docs[0]?.state ?? defaultState) as TelemetryTaskState; + } catch (err) { + const errMessage = err && err.message ? err.message : err.toString(); + if (!errMessage.includes('NotInitialized')) { + throw err; + } + } + } else { + this.logger.error( + `[Logs Data Telemetry] Task manager is not available: cannot retrieve latest task state` + ); + } + + return defaultState; } private logTelemetryNotOptedIn() { this.logger.debug(`[Logs Data Telemetry] Telemetry is not opted-in.`); } + + private throwSkipCollection() { + return throwError(() => new Error(SKIP_COLLECTION)); + } } diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/register_collector.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/register_collector.ts new file mode 100644 index 0000000000000..2278bf48605db --- /dev/null +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/register_collector.ts @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { MakeSchemaFrom, UsageCollectionSetup } from '@kbn/usage-collection-plugin/server'; +import { DataTelemetryObject } from './types'; + +const structureLevelSchema: MakeSchemaFrom< + DataTelemetryObject, + true +>['data']['items']['structure_level'] = { + '0': { + type: 'long', + _meta: { + description: 'Total docs at structure level 0', + }, + }, + '1': { + type: 'long', + _meta: { + description: 'Total docs at structure level 1', + }, + }, + '2': { + type: 'long', + _meta: { + description: 'Total docs at structure level 2', + }, + }, + '3': { + type: 'long', + _meta: { + description: 'Total docs at structure level 3', + }, + }, + '4': { + type: 'long', + _meta: { + description: 'Total docs at structure level 4', + }, + }, + '5': { + type: 'long', + _meta: { + description: 'Total docs at structure level 5', + }, + }, + '6': { + type: 'long', + _meta: { + description: 'Total docs at structure level 6', + }, + }, +}; + +export function registerLogsDataUsageCollector( + usageCollection: UsageCollectionSetup, + collectorOptions: { + isReady: () => Promise; + fetch: () => Promise; + } +) { + const logsUsageCollector = usageCollection.makeUsageCollector({ + type: 'logs_data', + isReady: collectorOptions.isReady, + fetch: collectorOptions.fetch, + schema: { + data: { + type: 'array', + items: { + pattern_name: { + type: 'keyword', + _meta: { description: 'Logs pattern name representing the stream of logs' }, + }, + shipper: { + type: 'keyword', + _meta: { description: 'Shipper if present, sending the logs' }, + }, + doc_count: { + type: 'long', + _meta: { description: 'Total number of documents in the steam of logs' }, + }, + structure_level: structureLevelSchema, + failure_store_doc_count: { + type: 'long', + _meta: { + description: 'Total number of documents in the failure store in the stream of logs', + }, + }, + index_count: { + type: 'long', + _meta: { + description: 'Total number of indices in the stream of logs', + }, + }, + namespace_count: { + type: 'long', + _meta: { + description: 'Total number of namespaces in the stream of logs', + }, + }, + field_count: { + type: 'long', + _meta: { + description: 'Total number of fields in mappings of indices of the stream of logs', + }, + }, + field_existence: { + DYNAMIC_KEY: { + type: 'long', + _meta: { + description: 'Count of documents having the field represented by the key', + }, + }, + }, + size_in_bytes: { + type: 'long', + _meta: { + description: 'Total size in bytes of the stream of logs', + }, + }, + managed_by: { + type: 'array', + items: { + type: 'keyword', + _meta: { + description: 'Value captured in _meta.managed_by', + }, + }, + }, + package_name: { + type: 'array', + items: { + type: 'keyword', + _meta: { + description: 'Value captured in _meta.package.name', + }, + }, + }, + beat: { + type: 'array', + items: { type: 'keyword', _meta: { description: 'Value captured in _meta.beat.name' } }, + }, + }, + }, + }, + }); + + usageCollection.registerCollector(logsUsageCollector); +} diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/types.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/types.ts index 0ccae5e83f1be..4df825bf58966 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/types.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_telemetry/types.ts @@ -85,3 +85,12 @@ export interface DataTelemetryEvent { package_name: string[]; beat: string[]; } + +export interface DataTelemetryObject { + data: DataTelemetryEvent[]; +} + +export interface TelemetryTaskState { + data: DataTelemetryEvent[] | null; + ran: boolean; +} diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/types.ts b/x-pack/plugins/observability_solution/dataset_quality/server/types.ts index 0ba17c5bb22c6..e0bfb9704b7af 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/types.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/types.ts @@ -6,26 +6,26 @@ */ import { CustomRequestHandlerContext } from '@kbn/core/server'; -import type { AnalyticsServiceSetup, AnalyticsServiceStart } from '@kbn/core-analytics-server'; import type { FleetSetupContract, FleetStartContract } from '@kbn/fleet-plugin/server'; import { TaskManagerSetupContract, TaskManagerStartContract, } from '@kbn/task-manager-plugin/server'; import type { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server'; +import { UsageCollectionSetup, UsageCollectionStart } from '@kbn/usage-collection-plugin/server'; export interface DatasetQualityPluginSetupDependencies { fleet: FleetSetupContract; - analytics: AnalyticsServiceSetup; telemetry: TelemetryPluginSetup; taskManager: TaskManagerSetupContract; + usageCollection?: UsageCollectionSetup; } export interface DatasetQualityPluginStartDependencies { fleet: FleetStartContract; telemetry: TelemetryPluginStart; - analytics: AnalyticsServiceStart; taskManager: TaskManagerStartContract; + usageCollection?: UsageCollectionStart; } // eslint-disable-next-line @typescript-eslint/no-empty-interface diff --git a/x-pack/plugins/observability_solution/dataset_quality/tsconfig.json b/x-pack/plugins/observability_solution/dataset_quality/tsconfig.json index aab3038a9436f..934c0e434d9a5 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/tsconfig.json +++ b/x-pack/plugins/observability_solution/dataset_quality/tsconfig.json @@ -53,11 +53,11 @@ "@kbn/ebt-tools", "@kbn/fields-metadata-plugin", "@kbn/server-route-repository-utils", - "@kbn/core-analytics-server", "@kbn/core-analytics-browser", "@kbn/core-lifecycle-browser", "@kbn/core-notifications-browser", "@kbn/telemetry-plugin", + "@kbn/usage-collection-plugin", "@kbn/rison", "@kbn/task-manager-plugin" ], diff --git a/x-pack/plugins/telemetry_collection_xpack/schema/xpack_plugins.json b/x-pack/plugins/telemetry_collection_xpack/schema/xpack_plugins.json index ebc026d88d619..0de2cbd77db7b 100644 --- a/x-pack/plugins/telemetry_collection_xpack/schema/xpack_plugins.json +++ b/x-pack/plugins/telemetry_collection_xpack/schema/xpack_plugins.json @@ -10084,6 +10084,148 @@ } } }, + "logs_data": { + "properties": { + "data": { + "type": "array", + "items": { + "properties": { + "pattern_name": { + "type": "keyword", + "_meta": { + "description": "Logs pattern name representing the stream of logs" + } + }, + "shipper": { + "type": "keyword", + "_meta": { + "description": "Shipper if present, sending the logs" + } + }, + "doc_count": { + "type": "long", + "_meta": { + "description": "Total number of documents in the steam of logs" + } + }, + "structure_level": { + "properties": { + "0": { + "type": "long", + "_meta": { + "description": "Total docs at structure level 0" + } + }, + "1": { + "type": "long", + "_meta": { + "description": "Total docs at structure level 1" + } + }, + "2": { + "type": "long", + "_meta": { + "description": "Total docs at structure level 2" + } + }, + "3": { + "type": "long", + "_meta": { + "description": "Total docs at structure level 3" + } + }, + "4": { + "type": "long", + "_meta": { + "description": "Total docs at structure level 4" + } + }, + "5": { + "type": "long", + "_meta": { + "description": "Total docs at structure level 5" + } + }, + "6": { + "type": "long", + "_meta": { + "description": "Total docs at structure level 6" + } + } + } + }, + "failure_store_doc_count": { + "type": "long", + "_meta": { + "description": "Total number of documents in the failure store in the stream of logs" + } + }, + "index_count": { + "type": "long", + "_meta": { + "description": "Total number of indices in the stream of logs" + } + }, + "namespace_count": { + "type": "long", + "_meta": { + "description": "Total number of namespaces in the stream of logs" + } + }, + "field_count": { + "type": "long", + "_meta": { + "description": "Total number of fields in mappings of indices of the stream of logs" + } + }, + "field_existence": { + "properties": { + "DYNAMIC_KEY": { + "type": "long", + "_meta": { + "description": "Count of documents having the field represented by the key" + } + } + } + }, + "size_in_bytes": { + "type": "long", + "_meta": { + "description": "Total size in bytes of the stream of logs" + } + }, + "managed_by": { + "type": "array", + "items": { + "type": "keyword", + "_meta": { + "description": "Value captured in _meta.managed_by" + } + } + }, + "package_name": { + "type": "array", + "items": { + "type": "keyword", + "_meta": { + "description": "Value captured in _meta.package.name" + } + } + }, + "beat": { + "type": "array", + "items": { + "type": "keyword", + "_meta": { + "description": "Value captured in _meta.beat.name" + } + } + } + } + } + } + } + }, "maps": { "properties": { "mapsTotalCount": { diff --git a/x-pack/test/api_integration/apis/telemetry/telemetry_local.ts b/x-pack/test/api_integration/apis/telemetry/telemetry_local.ts index ffd37240303fc..d155bfb614cbf 100644 --- a/x-pack/test/api_integration/apis/telemetry/telemetry_local.ts +++ b/x-pack/test/api_integration/apis/telemetry/telemetry_local.ts @@ -117,6 +117,13 @@ export default function ({ getService }: FtrProviderContext) { 'number' ); + // Logs data telemetry + const logsDataTelemetryData = + stats.stack_stats.kibana.plugins.usage_collector_stats.not_ready.names.includes('logs_data') + ? [] + : stats.stack_stats.kibana.plugins.logs_data?.data; + expect(logsDataTelemetryData).to.an('array'); + expect(stats.stack_stats.kibana.os.platforms[0].platform).to.be.a('string'); expect(stats.stack_stats.kibana.os.platforms[0].count).to.be(1); expect(stats.stack_stats.kibana.os.platformReleases[0].platformRelease).to.be.a('string');