From bb1b56d4299b4f0b15149032949cd7505cc392a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Mon, 16 Sep 2024 08:36:57 -0400 Subject: [PATCH] Make task manager code use the same logger (#192574) In this PR, I'm making the sub-loggers within task manager use the main logger so we can observe the logs under `log.logger:"plugin.taskManager"`. To preserve separation, I moved the sub-logger name within a tag so we can still filter the logs via `tags:"taskClaimer"`. The wrapped_logger.ts file is copied from `x-pack/plugins/alerting/server/task_runner/lib/task_runner_logger.ts`. --------- Co-authored-by: Elastic Machine (cherry picked from commit a0973d600212096ac9e530c179a87c14b7409db2) --- .../lib/setup_test_servers.ts | 8 -- .../server/lib/wrapped_logger.test.ts | 98 +++++++++++++++++++ .../task_manager/server/lib/wrapped_logger.ts | 74 ++++++++++++++ .../server/metrics/metrics_stream.ts | 3 +- .../server/queries/task_claiming.test.ts | 3 +- .../server/queries/task_claiming.ts | 3 +- .../task_manager/server/routes/metrics.ts | 11 +-- .../task_claimers/strategy_mget.test.ts | 33 ++++--- .../server/task_claimers/strategy_mget.ts | 22 ++--- .../server/task_running/task_runner.ts | 3 +- 10 files changed, 205 insertions(+), 53 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/lib/wrapped_logger.test.ts create mode 100644 x-pack/plugins/task_manager/server/lib/wrapped_logger.ts diff --git a/x-pack/plugins/task_manager/server/integration_tests/lib/setup_test_servers.ts b/x-pack/plugins/task_manager/server/integration_tests/lib/setup_test_servers.ts index 5ec8e724ae819..6abcac64c1d13 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/lib/setup_test_servers.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/lib/setup_test_servers.ts @@ -21,14 +21,6 @@ function createRoot(settings = {}) { name: 'plugins.taskManager', level: 'all', }, - { - name: 'plugins.taskManager.metrics-debugger', - level: 'warn', - }, - { - name: 'plugins.taskManager.metrics-subscribe-debugger', - level: 'warn', - }, ], }, }, diff --git a/x-pack/plugins/task_manager/server/lib/wrapped_logger.test.ts b/x-pack/plugins/task_manager/server/lib/wrapped_logger.test.ts new file mode 100644 index 0000000000000..12857c3fef845 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/wrapped_logger.test.ts @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { loggingSystemMock } from '@kbn/core/server/mocks'; +import { LogLevel, LogRecord } from '@kbn/logging'; +import { createWrappedLogger } from './wrapped_logger'; + +describe('createWrappedLogger', () => { + test('should inject baseline tags into log messages', () => { + const logger: ReturnType = + loggingSystemMock.createLogger(); + const taskRunnerLogger = createWrappedLogger({ logger, tags: ['tag-1', 'tag-2'] }); + + taskRunnerLogger.trace('test trace message', { tags: ['tag-3'] }); + taskRunnerLogger.debug('test debug message', { tags: ['tag-4'] }); + taskRunnerLogger.info('test info message', { tags: ['tag-5'] }); + taskRunnerLogger.warn('test warn message', { tags: ['tag-6'] }); + taskRunnerLogger.error('test error message', { tags: ['tag-7'] }); + taskRunnerLogger.fatal('test fatal message', { tags: ['tag-8'] }); + + expect(logger.trace).toHaveBeenCalledWith('test trace message', { + tags: ['tag-1', 'tag-2', 'tag-3'], + }); + expect(logger.debug).toHaveBeenCalledWith('test debug message', { + tags: ['tag-1', 'tag-2', 'tag-4'], + }); + expect(logger.info).toHaveBeenCalledWith('test info message', { + tags: ['tag-1', 'tag-2', 'tag-5'], + }); + expect(logger.warn).toHaveBeenCalledWith('test warn message', { + tags: ['tag-1', 'tag-2', 'tag-6'], + }); + expect(logger.error).toHaveBeenCalledWith('test error message', { + tags: ['tag-1', 'tag-2', 'tag-7'], + }); + expect(logger.fatal).toHaveBeenCalledWith('test fatal message', { + tags: ['tag-1', 'tag-2', 'tag-8'], + }); + }); + + test('should pass through other meta fields', () => { + const logger: ReturnType = + loggingSystemMock.createLogger(); + const taskRunnerLogger = createWrappedLogger({ logger, tags: ['tag-1', 'tag-2'] }); + + taskRunnerLogger.trace('test trace message', { labels: { foo: 'bar' } }); + taskRunnerLogger.debug('test debug message', { tags: ['tag-4'], host: { cpu: { usage: 3 } } }); + taskRunnerLogger.info('test info message'); + taskRunnerLogger.warn('test warn message', { user: { email: 'abc@124.com' } }); + taskRunnerLogger.error('test error message', { agent: { id: 'agent-1' } }); + taskRunnerLogger.fatal('test fatal message'); + + expect(logger.trace).toHaveBeenCalledWith('test trace message', { + tags: ['tag-1', 'tag-2'], + labels: { foo: 'bar' }, + }); + expect(logger.debug).toHaveBeenCalledWith('test debug message', { + tags: ['tag-1', 'tag-2', 'tag-4'], + host: { cpu: { usage: 3 } }, + }); + expect(logger.info).toHaveBeenCalledWith('test info message', { tags: ['tag-1', 'tag-2'] }); + expect(logger.warn).toHaveBeenCalledWith('test warn message', { + tags: ['tag-1', 'tag-2'], + user: { email: 'abc@124.com' }, + }); + expect(logger.error).toHaveBeenCalledWith('test error message', { + tags: ['tag-1', 'tag-2'], + agent: { id: 'agent-1' }, + }); + expect(logger.fatal).toHaveBeenCalledWith('test fatal message', { tags: ['tag-1', 'tag-2'] }); + }); + + test('should pass through other functions', () => { + const logger: ReturnType = + loggingSystemMock.createLogger(); + const taskRunnerLogger = createWrappedLogger({ logger, tags: ['tag-1', 'tag-2'] }); + + taskRunnerLogger.isLevelEnabled('debug'); + expect(logger.isLevelEnabled).toHaveBeenCalledWith('debug'); + + taskRunnerLogger.get('prefix', 'another'); + expect(logger.get).toHaveBeenCalledWith('prefix', 'another'); + + const logRecord: LogRecord = { + timestamp: new Date(), + level: LogLevel.Info, + context: 'context', + message: 'message', + pid: 1, + }; + taskRunnerLogger.log(logRecord); + expect(logger.log).toHaveBeenCalledWith(logRecord); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/wrapped_logger.ts b/x-pack/plugins/task_manager/server/lib/wrapped_logger.ts new file mode 100644 index 0000000000000..a1182924d8bee --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/wrapped_logger.ts @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger, LogMeta } from '@kbn/core/server'; +import { LogLevelId, LogMessageSource, LogRecord } from '@kbn/logging'; + +interface WrappedLoggerOpts { + logger: Logger; + tags: string[]; +} + +export function createWrappedLogger(opts: WrappedLoggerOpts): Logger { + return new WrappedLogger(opts); +} + +class WrappedLogger implements Logger { + private loggerMetaTags: string[] = []; + + constructor(private readonly opts: WrappedLoggerOpts) { + this.loggerMetaTags = opts.tags; + } + + trace(message: LogMessageSource, meta?: Meta) { + this.opts.logger.trace(message, { ...meta, tags: this.combineTags(meta?.tags) }); + } + + debug(message: LogMessageSource, meta?: Meta) { + this.opts.logger.debug(message, { ...meta, tags: this.combineTags(meta?.tags) }); + } + + info(message: LogMessageSource, meta?: Meta) { + this.opts.logger.info(message, { ...meta, tags: this.combineTags(meta?.tags) }); + } + + warn(errorOrMessage: LogMessageSource | Error, meta?: Meta) { + this.opts.logger.warn(errorOrMessage, { ...meta, tags: this.combineTags(meta?.tags) }); + } + + error(errorOrMessage: LogMessageSource | Error, meta?: Meta) { + this.opts.logger.error(errorOrMessage, { ...meta, tags: this.combineTags(meta?.tags) }); + } + + fatal(errorOrMessage: LogMessageSource | Error, meta?: Meta) { + this.opts.logger.fatal(errorOrMessage, { ...meta, tags: this.combineTags(meta?.tags) }); + } + + log(record: LogRecord) { + this.opts.logger.log(record); + } + + isLevelEnabled(level: LogLevelId): boolean { + return this.opts.logger.isLevelEnabled(level); + } + + get(...childContextPaths: string[]): Logger { + return this.opts.logger.get(...childContextPaths); + } + + private combineTags(tags?: string[] | string): string[] { + if (!tags) { + return this.loggerMetaTags; + } + + if (typeof tags === 'string') { + return [...new Set([...this.loggerMetaTags, tags])]; + } + + return [...new Set([...this.loggerMetaTags, ...tags])]; + } +} diff --git a/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts b/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts index 2c56e84104b1d..b9df16b95f2d7 100644 --- a/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts +++ b/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts @@ -12,6 +12,7 @@ import { Logger } from '@kbn/core/server'; import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle'; import { TaskManagerConfig } from '../config'; import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; +import { createWrappedLogger } from '../lib/wrapped_logger'; import { isTaskManagerStatEvent, isTaskManagerMetricEvent, @@ -52,7 +53,7 @@ export function createMetricsAggregators({ taskManagerMetricsCollector, }: CreateMetricsAggregatorsOpts): AggregatedStatProvider { const aggregators: AggregatedStatProvider[] = []; - const debugLogger = logger.get('metrics-debugger'); + const debugLogger = createWrappedLogger({ logger, tags: ['metrics-debugger'] }); if (taskPollingLifecycle) { aggregators.push( createAggregator({ diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts index c4b9b6dd5836e..437af8e007bdb 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts @@ -91,7 +91,8 @@ describe('TaskClaiming', () => { }); expect(taskManagerLogger.warn).toHaveBeenCalledWith( - 'Unknown task claiming strategy "non-default", falling back to update_by_query' + 'Unknown task claiming strategy "non-default", falling back to update_by_query', + { tags: ['taskClaiming'] } ); }); diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts index f5ef18452509b..5116c25c38f3f 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts @@ -28,6 +28,7 @@ import { getTaskClaimer, } from '../task_claimers'; import { TaskPartitioner } from '../lib/task_partitioner'; +import { createWrappedLogger } from '../lib/wrapped_logger'; export type { ClaimOwnershipResult } from '../task_claimers'; export interface TaskClaimingOpts { @@ -107,7 +108,7 @@ export class TaskClaiming { this.maxAttempts = opts.maxAttempts; this.taskStore = opts.taskStore; this.getAvailableCapacity = opts.getAvailableCapacity; - this.logger = opts.logger.get('taskClaiming'); + this.logger = createWrappedLogger({ logger: opts.logger, tags: ['taskClaiming'] }); this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions); this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions)); this.excludedTaskTypes = opts.excludedTaskTypes; diff --git a/x-pack/plugins/task_manager/server/routes/metrics.ts b/x-pack/plugins/task_manager/server/routes/metrics.ts index 490cb869ba109..808675f25818b 100644 --- a/x-pack/plugins/task_manager/server/routes/metrics.ts +++ b/x-pack/plugins/task_manager/server/routes/metrics.ts @@ -37,15 +37,12 @@ const QuerySchema = schema.object({ }); export function metricsRoute(params: MetricsRouteParams) { - const { router, logger, metrics$, resetMetrics$, taskManagerId } = params; + const { router, metrics$, resetMetrics$, taskManagerId } = params; - const debugLogger = logger.get(`metrics-debugger`); - const additionalDebugLogger = logger.get(`metrics-subscribe-debugger`); let lastMetrics: NodeMetrics | null = null; metrics$.subscribe((metrics) => { lastMetrics = { process_uuid: taskManagerId, timestamp: new Date().toISOString(), ...metrics }; - additionalDebugLogger.debug(() => `subscribed metrics ${JSON.stringify(metrics)}`); }); router.get( @@ -68,12 +65,6 @@ export function metricsRoute(params: MetricsRouteParams) { req: KibanaRequest, unknown>, res: KibanaResponseFactory ): Promise { - debugLogger.debug( - () => - `/api/task_manager/metrics route accessed with reset=${req.query.reset} - metrics ${ - lastMetrics ? JSON.stringify(lastMetrics) : 'not available' - }` - ); if (req.query.reset) { resetMetrics$.next(true); } diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index cd1433d2e4009..3919860d27061 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -387,7 +387,7 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -497,7 +497,7 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 2;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -604,11 +604,11 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.warn).toHaveBeenCalledWith( 'Error updating task id-2:task to mark as unrecognized during claim: {"type":"document_missing_exception","reason":"[5]: document missing","index_uuid":"aAsFqTI0Tc2W0LCWgPNrOA","shard":"0","index":".kibana_task_manager_8.16.0_001"}', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 1;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -701,11 +701,11 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.warn).toHaveBeenCalledWith( 'Error updating tasks to mark as unrecognized during claim: Error: Oh no', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -855,7 +855,7 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -948,7 +948,7 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -1041,7 +1041,7 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 2; stale: 1; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -1140,7 +1140,7 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -1271,11 +1271,11 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.error).toHaveBeenCalledWith( 'Error getting full task id-2:task during claim: Oh no', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -1511,11 +1511,11 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.error).toHaveBeenCalledWith( 'Error updating task id-2:task during claim: {"type":"document_missing_exception","reason":"[5]: document missing","index_uuid":"aAsFqTI0Tc2W0LCWgPNrOA","shard":"0","index":".kibana_task_manager_8.16.0_001"}', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ @@ -1644,7 +1644,7 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).toHaveBeenCalledWith( 'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', - { tags: ['claimAvailableTasksMget'] } + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.error).not.toHaveBeenCalled(); @@ -2000,7 +2000,8 @@ describe('TaskClaiming', () => { ] = claimedResults; expect(taskManagerLogger.warn).toHaveBeenCalledWith( - 'Background task node "test" has no assigned partitions, claiming against all partitions' + 'Background task node "test" has no assigned partitions, claiming against all partitions', + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(query).toMatchInlineSnapshot(` Object { diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 1fd35d408a3ec..432d7f183ce39 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -15,6 +15,7 @@ import apm, { Logger } from 'elastic-apm-node'; import { Subject, Observable } from 'rxjs'; +import { createWrappedLogger } from '../lib/wrapped_logger'; import { TaskTypeDictionary } from '../task_type_dictionary'; import { @@ -105,9 +106,7 @@ async function claimAvailableTasksApm(opts: TaskClaimerOpts): Promise { const { getCapacity, claimOwnershipUntil, batches, events$, taskStore, taskPartitioner } = opts; const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts; - const { logger } = opts; - const loggerTag = claimAvailableTasksMget.name; - const logMeta = { tags: [loggerTag] }; + const logger = createWrappedLogger({ logger: opts.logger, tags: [claimAvailableTasksMget.name] }); const initialCapacity = getCapacity(); const stopTaskTimer = startTaskTimer(); @@ -227,10 +226,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise> { const { task } = this.instance; - const debugLogger = this.logger.get(`metrics-debugger`); + const debugLogger = createWrappedLogger({ logger: this.logger, tags: [`metrics-debugger`] }); const taskHasExpired = this.isExpired;