From 07f4828f9a979f99b56359002bf21ad2393150d6 Mon Sep 17 00:00:00 2001 From: Sandra G Date: Tue, 14 Sep 2021 15:28:08 -0400 Subject: [PATCH] [Stack Monitoring] convert server/logstash dir to typescript (#110706) * add types to logstash server dir * remove comment * fix type * remove unused type * add helper function to test * add issue to comment --- x-pack/plugins/monitoring/common/types/es.ts | 3 + .../lib/cluster/get_clusters_from_request.ts | 2 +- .../monitoring/server/lib/create_query.ts | 2 +- ...luster_status.js => get_cluster_status.ts} | 8 +- .../lib/logstash/get_logstash_for_clusters.ts | 2 +- .../server/lib/logstash/get_node_info.test.js | 177 -------------- .../server/lib/logstash/get_node_info.test.ts | 212 +++++++++++++++++ .../server/lib/logstash/get_node_info.ts | 3 - .../server/lib/logstash/get_nodes.ts | 4 - ...ipelines.js => get_paginated_pipelines.ts} | 217 ++++++++++++------ ..._pipeline.test.js => get_pipeline.test.ts} | 18 +- .../server/lib/logstash/get_pipeline.ts | 30 +-- .../server/lib/logstash/get_pipeline_ids.ts | 24 +- .../logstash/get_pipeline_state_document.ts | 27 +-- ...n.js => get_pipeline_stats_aggregation.ts} | 51 ++-- ...e_versions.js => get_pipeline_versions.ts} | 34 ++- .../lib/logstash/get_pipeline_vertex.ts | 32 +-- ... get_pipeline_vertex_stats_aggregation.ts} | 69 ++++-- .../{sort_pipelines.js => sort_pipelines.ts} | 8 +- .../lib/pagination/{filter.js => filter.ts} | 9 +- .../pagination/{paginate.js => paginate.ts} | 2 +- ....js => standalone_cluster_query_filter.ts} | 0 .../server/routes/api/v1/logstash/pipeline.js | 8 +- .../pipelines/cluster_pipeline_ids.js | 2 +- .../logstash/pipelines/cluster_pipelines.js | 11 +- .../v1/logstash/pipelines/node_pipelines.js | 11 +- x-pack/plugins/monitoring/server/types.ts | 74 ++++++ 27 files changed, 667 insertions(+), 373 deletions(-) rename x-pack/plugins/monitoring/server/lib/logstash/{get_cluster_status.js => get_cluster_status.ts} (87%) delete mode 100644 x-pack/plugins/monitoring/server/lib/logstash/get_node_info.test.js create mode 100644 x-pack/plugins/monitoring/server/lib/logstash/get_node_info.test.ts rename x-pack/plugins/monitoring/server/lib/logstash/{get_paginated_pipelines.js => get_paginated_pipelines.ts} (51%) rename x-pack/plugins/monitoring/server/lib/logstash/{get_pipeline.test.js => get_pipeline.test.ts} (94%) rename x-pack/plugins/monitoring/server/lib/logstash/{get_pipeline_stats_aggregation.js => get_pipeline_stats_aggregation.ts} (80%) rename x-pack/plugins/monitoring/server/lib/logstash/{get_pipeline_versions.js => get_pipeline_versions.ts} (78%) rename x-pack/plugins/monitoring/server/lib/logstash/{get_pipeline_vertex_stats_aggregation.js => get_pipeline_vertex_stats_aggregation.ts} (75%) rename x-pack/plugins/monitoring/server/lib/logstash/{sort_pipelines.js => sort_pipelines.ts} (56%) rename x-pack/plugins/monitoring/server/lib/pagination/{filter.js => filter.ts} (75%) rename x-pack/plugins/monitoring/server/lib/pagination/{paginate.js => paginate.ts} (77%) rename x-pack/plugins/monitoring/server/lib/standalone_clusters/{standalone_cluster_query_filter.js => standalone_cluster_query_filter.ts} (100%) diff --git a/x-pack/plugins/monitoring/common/types/es.ts b/x-pack/plugins/monitoring/common/types/es.ts index f324164b09302..a9d020813ce84 100644 --- a/x-pack/plugins/monitoring/common/types/es.ts +++ b/x-pack/plugins/monitoring/common/types/es.ts @@ -228,6 +228,7 @@ export interface ElasticsearchLegacySource { }; queue?: { type?: string; + events?: number; }; jvm?: { uptime_in_millis?: number; @@ -249,6 +250,8 @@ export interface ElasticsearchLegacySource { }; events?: { out?: number; + in?: number; + filtered?: number; }; reloads?: { failures?: number; diff --git a/x-pack/plugins/monitoring/server/lib/cluster/get_clusters_from_request.ts b/x-pack/plugins/monitoring/server/lib/cluster/get_clusters_from_request.ts index a2b3434e6b3f7..9016f1916542b 100644 --- a/x-pack/plugins/monitoring/server/lib/cluster/get_clusters_from_request.ts +++ b/x-pack/plugins/monitoring/server/lib/cluster/get_clusters_from_request.ts @@ -190,7 +190,7 @@ export async function getClustersFromRequest( // add logstash data if (isInCodePath(codePaths, [CODE_PATH_LOGSTASH])) { const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters); - const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, 1); + const pipelines = await getLogstashPipelineIds({ req, lsIndexPattern, clusterUuid, size: 1 }); logstashes.forEach((logstash) => { const clusterIndex = clusters.findIndex( (cluster) => diff --git a/x-pack/plugins/monitoring/server/lib/create_query.ts b/x-pack/plugins/monitoring/server/lib/create_query.ts index 83817280730f2..8dead521d24fb 100644 --- a/x-pack/plugins/monitoring/server/lib/create_query.ts +++ b/x-pack/plugins/monitoring/server/lib/create_query.ts @@ -78,7 +78,7 @@ export function createQuery(options: { const isFromStandaloneCluster = clusterUuid === STANDALONE_CLUSTER_CLUSTER_UUID; - let typeFilter; + let typeFilter: any; if (type) { typeFilter = { bool: { should: [{ term: { type } }, { term: { 'metricset.name': type } }] } }; } else if (types) { diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_cluster_status.js b/x-pack/plugins/monitoring/server/lib/logstash/get_cluster_status.ts similarity index 87% rename from x-pack/plugins/monitoring/server/lib/logstash/get_cluster_status.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_cluster_status.ts index 59ee4f9981bda..dfd1eaa155069 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_cluster_status.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_cluster_status.ts @@ -8,6 +8,7 @@ import { get } from 'lodash'; import { checkParam } from '../error_missing_required'; import { getLogstashForClusters } from './get_logstash_for_clusters'; +import { LegacyRequest } from '../../types'; /** * Get the cluster status for Logstash instances. @@ -19,9 +20,12 @@ import { getLogstashForClusters } from './get_logstash_for_clusters'; * @param {String} clusterUuid The cluster UUID for the associated Elasticsearch cluster. * @returns {Promise} The cluster status object. */ -export function getClusterStatus(req, lsIndexPattern, { clusterUuid }) { +export function getClusterStatus( + req: LegacyRequest, + lsIndexPattern: string, + { clusterUuid }: { clusterUuid: string } +) { checkParam(lsIndexPattern, 'lsIndexPattern in logstash/getClusterStatus'); - const clusters = [{ cluster_uuid: clusterUuid }]; return getLogstashForClusters(req, lsIndexPattern, clusters).then((clusterStatus) => get(clusterStatus, '[0].stats') diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.ts b/x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.ts index c0c29756818ee..480b7176b7aba 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.ts +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.ts @@ -40,7 +40,7 @@ const getQueueTypes = (queueBuckets: Array ) { checkParam(lsIndexPattern, 'lsIndexPattern in logstash/getLogstashForClusters'); diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.test.js b/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.test.js deleted file mode 100644 index e34defc43afc8..0000000000000 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.test.js +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import moment from 'moment'; -import { STANDALONE_CLUSTER_CLUSTER_UUID } from '../../../common/constants'; -import { handleResponse, getNodeInfo } from './get_node_info'; -import { standaloneClusterFilter } from '../standalone_clusters/standalone_cluster_query_filter'; - -describe('get_logstash_info', () => { - // TODO: test was not running before and is not up to date - it.skip('return undefined for empty response', () => { - const result = handleResponse({}); - expect(result).toBe(undefined); - }); - - it('return mapped data for result with hits, availability = true', () => { - const result = handleResponse({ - hits: { - hits: [ - { - _source: { - logstash_stats: { - timestamp: moment().format(), - logstash: { - host: 'myhost', - }, - events: { - in: 300, - filtered: 300, - out: 300, - }, - reloads: { - successes: 5, - failures: 2, - }, - queue: { - type: 'persisted', - events: 100, - }, - }, - }, - }, - ], - }, - }); - expect(result).toEqual({ - host: 'myhost', - availability: true, - events: { - filtered: 300, - in: 300, - out: 300, - }, - reloads: { - successes: 5, - failures: 2, - }, - queue_type: 'persisted', - }); - }); - - it('return mapped data for result with hits, availability = false', () => { - const result = handleResponse({ - hits: { - hits: [ - { - _source: { - logstash_stats: { - timestamp: moment().subtract(11, 'minutes').format(), - logstash: { - host: 'myhost', - }, - events: { - in: 300, - filtered: 300, - out: 300, - }, - reloads: { - successes: 5, - failures: 2, - }, - queue: { - type: 'persisted', - events: 100, - }, - }, - }, - }, - ], - }, - }); - expect(result).toEqual({ - host: 'myhost', - availability: false, - events: { - filtered: 300, - in: 300, - out: 300, - }, - reloads: { - successes: 5, - failures: 2, - }, - queue_type: 'persisted', - }); - }); - - it('default to no queue type if none specified', () => { - const result = handleResponse({ - hits: { - hits: [ - { - _source: { - logstash_stats: { - timestamp: moment().subtract(11, 'minutes').format(), - logstash: { - host: 'myhost', - }, - events: { - in: 300, - filtered: 300, - out: 300, - }, - reloads: { - successes: 5, - failures: 2, - }, - }, - }, - }, - ], - }, - }); - expect(result).toEqual({ - host: 'myhost', - availability: false, - events: { - filtered: 300, - in: 300, - out: 300, - }, - reloads: { - successes: 5, - failures: 2, - }, - }); - }); - - it('works with standalone cluster', async () => { - const callWithRequest = jest.fn().mockReturnValue({ - then: jest.fn(), - }); - const req = { - server: { - plugins: { - elasticsearch: { - getCluster: () => ({ - callWithRequest, - }), - }, - }, - }, - }; - await getNodeInfo(req, '.monitoring-logstash-*', { - clusterUuid: STANDALONE_CLUSTER_CLUSTER_UUID, - }); - expect(callWithRequest.mock.calls.length).toBe(1); - expect(callWithRequest.mock.calls[0].length).toBe(3); - expect(callWithRequest.mock.calls[0][2].body.query.bool.filter[0]).toBe( - standaloneClusterFilter - ); - }); -}); diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.test.ts b/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.test.ts new file mode 100644 index 0000000000000..ea2eac7febb6d --- /dev/null +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.test.ts @@ -0,0 +1,212 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import moment from 'moment'; +import { set, unset } from 'lodash'; +import { STANDALONE_CLUSTER_CLUSTER_UUID } from '../../../common/constants'; +import { handleResponse, getNodeInfo } from './get_node_info'; +import { LegacyRequest } from '../../types'; +import { ElasticsearchResponseHit } from '../../../common/types/es'; +import { standaloneClusterFilter } from '../standalone_clusters/standalone_cluster_query_filter'; + +interface HitParams { + path: string; + value?: string; +} + +// deletes, adds, or updates the properties based on a default object +function createResponseObjHit(params?: HitParams[]): ElasticsearchResponseHit { + const defaultResponseObj: ElasticsearchResponseHit = { + _index: 'index', + _source: { + cluster_uuid: '123', + timestamp: '2021-08-31T15:00:26.330Z', + logstash_stats: { + timestamp: moment().format(), + logstash: { + pipeline: { + batch_size: 2, + workers: 2, + }, + host: 'myhost', + uuid: 'd63b22f8-7f77-4a23-9aac-9813c760e0e0', + version: '8.0.0', + status: 'green', + name: 'desktop-192-168-162-170.local', + http_address: '127.0.0.1:9600', + }, + events: { + in: 300, + filtered: 300, + out: 300, + }, + reloads: { + successes: 5, + failures: 2, + }, + queue: { + type: 'persisted', + events: 100, + }, + }, + }, + }; + + if (!params) return defaultResponseObj; + return params.reduce((acc, change) => { + if (!change.value) { + // delete if no value provided + unset(acc, change.path); + return acc; + } + return set(acc, change.path, change.value); + }, defaultResponseObj); +} + +const createResponseFromHits = (hits: ElasticsearchResponseHit[]) => { + return { + hits: { + total: { + value: hits.length, + }, + hits, + }, + }; +}; + +describe('get_logstash_info', () => { + it('return mapped data for result with hits, availability = true', () => { + const hits = [createResponseObjHit()]; + const res = createResponseFromHits(hits); + const result = handleResponse(res); + expect(result).toEqual({ + host: 'myhost', + uuid: 'd63b22f8-7f77-4a23-9aac-9813c760e0e0', + version: '8.0.0', + status: 'green', + uptime: undefined, + name: 'desktop-192-168-162-170.local', + pipeline: { + batch_size: 2, + workers: 2, + }, + http_address: '127.0.0.1:9600', + availability: true, + events: { + filtered: 300, + in: 300, + out: 300, + }, + reloads: { + successes: 5, + failures: 2, + }, + queue_type: 'persisted', + }); + }); + + it('return mapped data for result with hits, availability = false', () => { + const hits = [ + createResponseObjHit([ + { + path: '_source.logstash_stats.timestamp', + value: moment().subtract(11, 'minutes').format(), + }, + ]), + ]; + const res = createResponseFromHits(hits); + + const result = handleResponse(res); + expect(result).toEqual({ + host: 'myhost', + pipeline: { + batch_size: 2, + workers: 2, + }, + uuid: 'd63b22f8-7f77-4a23-9aac-9813c760e0e0', + version: '8.0.0', + status: 'green', + name: 'desktop-192-168-162-170.local', + http_address: '127.0.0.1:9600', + availability: false, + events: { + filtered: 300, + in: 300, + out: 300, + }, + reloads: { + successes: 5, + failures: 2, + }, + queue_type: 'persisted', + }); + }); + + it('default to no queue type if none specified', () => { + const hits = [ + createResponseObjHit([ + { + path: '_source.logstash_stats.queue', // delete queue property + }, + { + path: '_source.logstash_stats.timestamp', // update the timestamp property + value: moment().subtract(11, 'minutes').format(), + }, + ]), + ]; + const res = createResponseFromHits(hits); + const result = handleResponse(res); + expect(result).toEqual({ + host: 'myhost', + pipeline: { + batch_size: 2, + workers: 2, + }, + uuid: 'd63b22f8-7f77-4a23-9aac-9813c760e0e0', + version: '8.0.0', + status: 'green', + name: 'desktop-192-168-162-170.local', + http_address: '127.0.0.1:9600', + availability: false, + events: { + filtered: 300, + in: 300, + out: 300, + }, + reloads: { + successes: 5, + failures: 2, + }, + }); + }); + + it('works with standalone cluster', async () => { + const callWithRequest = jest.fn().mockReturnValue({ + then: jest.fn(), + }); + const req = ({ + server: { + plugins: { + elasticsearch: { + getCluster: () => ({ + callWithRequest, + }), + }, + }, + }, + } as unknown) as LegacyRequest; + await getNodeInfo(req, '.monitoring-logstash-*', { + clusterUuid: STANDALONE_CLUSTER_CLUSTER_UUID, + logstashUuid: 'logstash_uuid', + }); + expect(callWithRequest.mock.calls.length).toBe(1); + expect(callWithRequest.mock.calls[0].length).toBe(3); + expect(callWithRequest.mock.calls[0][2].body.query.bool.filter[0]).toBe( + standaloneClusterFilter + ); + }); +}); diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.ts b/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.ts index 276b8b119bba3..ebd1128dce364 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.ts +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.ts @@ -6,14 +6,11 @@ */ import { merge } from 'lodash'; -// @ts-ignore import { checkParam, MissingRequiredError } from '../error_missing_required'; -// @ts-ignore import { calculateAvailability } from '../calculate_availability'; import { LegacyRequest } from '../../types'; import { ElasticsearchResponse } from '../../../common/types/es'; import { STANDALONE_CLUSTER_CLUSTER_UUID } from '../../../common/constants'; -// @ts-ignore import { standaloneClusterFilter } from '../standalone_clusters/standalone_cluster_query_filter'; export function handleResponse(resp: ElasticsearchResponse) { diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_nodes.ts b/x-pack/plugins/monitoring/server/lib/logstash/get_nodes.ts index 42d1b69aee5f3..153c2ece13830 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_nodes.ts +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_nodes.ts @@ -6,13 +6,9 @@ */ import moment from 'moment'; -// @ts-ignore import { checkParam } from '../error_missing_required'; -// @ts-ignore import { createQuery } from '../create_query'; -// @ts-ignore import { calculateAvailability } from '../calculate_availability'; -// @ts-ignore import { LogstashMetric } from '../metrics'; import { LegacyRequest } from '../../types'; import { ElasticsearchResponse } from '../../../common/types/es'; diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.js b/x-pack/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.ts similarity index 51% rename from x-pack/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.ts index a4645edda73d0..ee41e12ea322b 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.ts @@ -11,6 +11,16 @@ import { getLogstashPipelineIds } from './get_pipeline_ids'; import { sortPipelines } from './sort_pipelines'; import { paginate } from '../pagination/paginate'; import { getMetrics } from '../details/get_metrics'; +import { + LegacyRequest, + Pipeline, + PipelineMetricKey, + PipelineMetricsRes, + PipelineNodeCountMetricKey, + PipelinesResponse, + PipelineThroughputMetricKey, + PipelineWithMetrics, +} from '../../types'; /** * This function performs an optimization around the pipeline listing tables in the UI. To avoid @@ -22,80 +32,109 @@ import { getMetrics } from '../details/get_metrics'; * * @param {*} req - Server request object * @param {*} lsIndexPattern - The index pattern to search against (`.monitoring-logstash-*`) - * @param {*} uuids - The optional `clusterUuid` and `logstashUuid` to filter the results from - * @param {*} metricSet - The array of metrics that are sortable in the UI + * @param {*} clusterUuid - clusterUuid to filter the results from + * @param {*} logstashUuid - logstashUuid to filter the results from + * @param {*} metrics - The array of metrics that are sortable in the UI * @param {*} pagination - ({ index, size }) * @param {*} sort - ({ field, direction }) * @param {*} queryText - Text that will be used to filter out pipelines */ -export async function getPaginatedPipelines( + +interface GetPaginatedPipelinesParams { + req: LegacyRequest; + lsIndexPattern: string; + clusterUuid: string; + logstashUuid?: string; + metrics: { + throughputMetric: PipelineThroughputMetricKey; + nodesCountMetric: PipelineNodeCountMetricKey; + }; + pagination: { index: number; size: number }; + sort: { field: PipelineMetricKey | ''; direction: 'asc' | 'desc' }; + queryText: string; +} +export async function getPaginatedPipelines({ req, lsIndexPattern, - { clusterUuid, logstashUuid }, - { throughputMetric, nodesCountMetric }, + clusterUuid, + logstashUuid, + metrics, pagination, - sort = { field: null }, - queryText -) { + sort = { field: '', direction: 'desc' }, + queryText, +}: GetPaginatedPipelinesParams) { + const { throughputMetric, nodesCountMetric } = metrics; const sortField = sort.field; const config = req.server.config(); - const size = config.get('monitoring.ui.max_bucket_size'); - const pipelines = await getLogstashPipelineIds( + // TODO type config + const size = (config.get('monitoring.ui.max_bucket_size') as unknown) as number; + let pipelines = await getLogstashPipelineIds({ req, lsIndexPattern, - { clusterUuid, logstashUuid }, - size - ); - + clusterUuid, + logstashUuid, + size, + }); + // this is needed for sorting if (sortField === throughputMetric) { - await getPaginatedThroughputData(pipelines, req, lsIndexPattern, throughputMetric); + pipelines = await getPaginatedThroughputData(pipelines, req, lsIndexPattern, throughputMetric); } else if (sortField === nodesCountMetric) { - await getPaginatedNodesData(pipelines, req, lsIndexPattern, nodesCountMetric); + pipelines = await getPaginatedNodesData(pipelines, req, lsIndexPattern, nodesCountMetric); } - // Filtering const filteredPipelines = filter(pipelines, queryText, ['id']); // We only support filtering by id right now - // Sorting const sortedPipelines = sortPipelines(filteredPipelines, sort); - // Pagination const pageOfPipelines = paginate(pagination, sortedPipelines); const response = { - pipelines: await getPipelines( + pipelines: await getPipelines({ req, lsIndexPattern, - pageOfPipelines, + pipelines: pageOfPipelines, throughputMetric, - nodesCountMetric - ), + nodesCountMetric, + }), totalPipelineCount: filteredPipelines.length, }; return processPipelinesAPIResponse(response, throughputMetric, nodesCountMetric); } -function processPipelinesAPIResponse(response, throughputMetricKey, nodesCountMetricKey) { - // Clone to avoid mutating original response - const processedResponse = cloneDeep(response); - +function processPipelinesAPIResponse( + response: { pipelines: PipelineWithMetrics[]; totalPipelineCount: number }, + throughputMetricKey: PipelineThroughputMetricKey, + nodeCountMetricKey: PipelineNodeCountMetricKey +) { // Normalize metric names for shared component code // Calculate latest throughput and node count for each pipeline - processedResponse.pipelines.forEach((pipeline) => { - pipeline.metrics = { - throughput: pipeline.metrics[throughputMetricKey], - nodesCount: pipeline.metrics[nodesCountMetricKey], - }; + const processedResponse = response.pipelines.reduce( + (acc, pipeline) => { + acc.pipelines.push({ + ...pipeline, + metrics: { + throughput: pipeline.metrics[throughputMetricKey], + nodesCount: pipeline.metrics[nodeCountMetricKey], + }, + latestThroughput: (last(pipeline.metrics[throughputMetricKey]?.data) || [])[1], + latestNodesCount: (last(pipeline.metrics[nodeCountMetricKey]?.data) || [])[1], + }); + return acc; + }, + { totalPipelineCount: response.totalPipelineCount, pipelines: [] } + ); - pipeline.latestThroughput = (last(pipeline.metrics.throughput.data) || [])[1]; - pipeline.latestNodesCount = (last(pipeline.metrics.nodesCount.data) || [])[1]; - }); return processedResponse; } -async function getPaginatedThroughputData(pipelines, req, lsIndexPattern, throughputMetric) { - const metricSeriesData = Object.values( +async function getPaginatedThroughputData( + pipelines: Pipeline[], + req: LegacyRequest, + lsIndexPattern: string, + throughputMetric: PipelineThroughputMetricKey +): Promise { + const metricSeriesData: any = Object.values( await Promise.all( pipelines.map((pipeline) => { return new Promise(async (resolve, reject) => { @@ -135,21 +174,33 @@ async function getPaginatedThroughputData(pipelines, req, lsIndexPattern, throug }) ) ); - - for (const pipelineAggregationData of metricSeriesData) { - for (const pipeline of pipelines) { - if (pipelineAggregationData.id === pipeline.id) { - const dataSeries = get(pipelineAggregationData, `metrics.${throughputMetric}.data`, [[]]); - if (dataSeries.length === 0) { - continue; - } - pipeline[throughputMetric] = dataSeries.pop()[1]; + return pipelines.reduce((acc, pipeline) => { + const match = metricSeriesData.find((metric: { id: string }) => metric.id === pipeline.id); + if (match) { + const dataSeries = get(match, `metrics.${throughputMetric}.data`, [[]]); + if (dataSeries.length) { + const newPipeline = { + ...pipeline, + [throughputMetric]: dataSeries.pop()[1], + }; + acc.push(newPipeline); + } else { + acc.push(pipeline); } + } else { + acc.push(pipeline); } - } + return acc; + }, []); } -async function getPaginatedNodesData(pipelines, req, lsIndexPattern, nodesCountMetric) { +async function getPaginatedNodesData( + pipelines: Pipeline[], + req: LegacyRequest, + lsIndexPattern: string, + nodesCountMetric: PipelineNodeCountMetricKey +): Promise { + const pipelineWithMetrics = cloneDeep(pipelines); const metricSeriesData = await getMetrics( req, lsIndexPattern, @@ -161,39 +212,71 @@ async function getPaginatedNodesData(pipelines, req, lsIndexPattern, nodesCountM }, }, ], - { pageOfPipelines: pipelines }, + { pageOfPipelines: pipelineWithMetrics }, 2 ); const { data } = metricSeriesData[nodesCountMetric][0] || [[]]; const pipelinesMap = (data.pop() || [])[1] || {}; if (!Object.keys(pipelinesMap).length) { - return; + return pipelineWithMetrics; } - pipelines.forEach((pipeline) => void (pipeline[nodesCountMetric] = pipelinesMap[pipeline.id])); + return pipelineWithMetrics.map((pipeline) => ({ + ...pipeline, + [nodesCountMetric]: pipelinesMap[pipeline.id], + })); } -async function getPipelines(req, lsIndexPattern, pipelines, throughputMetric, nodesCountMetric) { +async function getPipelines({ + req, + lsIndexPattern, + pipelines, + throughputMetric, + nodesCountMetric, +}: { + req: LegacyRequest; + lsIndexPattern: string; + pipelines: Pipeline[]; + throughputMetric: PipelineThroughputMetricKey; + nodesCountMetric: PipelineNodeCountMetricKey; +}): Promise { const throughputPipelines = await getThroughputPipelines( req, lsIndexPattern, pipelines, throughputMetric ); - const nodePipelines = await getNodePipelines(req, lsIndexPattern, pipelines, nodesCountMetric); + const nodeCountPipelines = await getNodePipelines( + req, + lsIndexPattern, + pipelines, + nodesCountMetric + ); const finalPipelines = pipelines.map(({ id }) => { - const pipeline = { + const matchThroughputPipeline = throughputPipelines.find((p) => p.id === id); + const matchNodesCountPipeline = nodeCountPipelines.find((p) => p.id === id); + return { id, metrics: { - [throughputMetric]: throughputPipelines.find((p) => p.id === id).metrics[throughputMetric], - [nodesCountMetric]: nodePipelines.find((p) => p.id === id).metrics[nodesCountMetric], + [throughputMetric]: + matchThroughputPipeline && throughputMetric in matchThroughputPipeline.metrics + ? matchThroughputPipeline.metrics[throughputMetric] + : undefined, + [nodesCountMetric]: + matchNodesCountPipeline && nodesCountMetric in matchNodesCountPipeline.metrics + ? matchNodesCountPipeline.metrics[nodesCountMetric] + : undefined, }, }; - return pipeline; }); return finalPipelines; } -async function getThroughputPipelines(req, lsIndexPattern, pipelines, throughputMetric) { +async function getThroughputPipelines( + req: LegacyRequest, + lsIndexPattern: string, + pipelines: Pipeline[], + throughputMetric: string +): Promise { const metricsResponse = await Promise.all( pipelines.map((pipeline) => { return new Promise(async (resolve, reject) => { @@ -231,11 +314,15 @@ async function getThroughputPipelines(req, lsIndexPattern, pipelines, throughput }); }) ); - - return Object.values(metricsResponse); + return Object.values(metricsResponse) as PipelineWithMetrics[]; } -async function getNodePipelines(req, lsIndexPattern, pipelines, nodesCountMetric) { +async function getNodePipelines( + req: LegacyRequest, + lsIndexPattern: string, + pipelines: Pipeline[], + nodesCountMetric: string +): Promise { const metricData = await getMetrics( req, lsIndexPattern, @@ -252,7 +339,7 @@ async function getNodePipelines(req, lsIndexPattern, pipelines, nodesCountMetric } ); - const metricObject = metricData[nodesCountMetric][0]; + const metricObject = metricData[nodesCountMetric][0] as PipelineMetricsRes; const pipelinesData = pipelines.map(({ id }) => { return { id, @@ -268,10 +355,10 @@ async function getNodePipelines(req, lsIndexPattern, pipelines, nodesCountMetric return pipelinesData; } -function reduceData({ id }, data) { +function reduceData(pipeline: Pipeline, data: any) { return { - id, - metrics: Object.keys(data).reduce((accum, metricName) => { + id: pipeline.id, + metrics: Object.keys(data).reduce((accum, metricName) => { accum[metricName] = data[metricName][0]; return accum; }, {}), diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.test.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.test.ts similarity index 94% rename from x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.test.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.test.ts index cb329db9a3855..d71c67dba524c 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.test.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.test.ts @@ -5,17 +5,19 @@ * 2.0. */ +import { ElasticsearchSourceLogstashPipelineVertex } from '../../../common/types/es'; import { _vertexStats, _enrichStateWithStatsAggregation } from './get_pipeline'; describe('get_pipeline', () => { describe('_vertexStats function', () => { - let vertex; - let vertexStatsBucket; - let totalProcessorsDurationInMillis; - let timeseriesIntervalInSeconds; + let vertex: ElasticsearchSourceLogstashPipelineVertex; + let vertexStatsBucket: any; + let totalProcessorsDurationInMillis: number; + let timeseriesIntervalInSeconds: number; beforeEach(() => { vertex = { + id: 'test', plugin_type: 'input', }; @@ -47,6 +49,7 @@ describe('get_pipeline', () => { describe('vertex represents filter plugin', () => { beforeEach(() => { vertex = { + id: 'test', plugin_type: 'filter', }; }); @@ -70,6 +73,7 @@ describe('get_pipeline', () => { describe('vertex represents output plugin', () => { beforeEach(() => { vertex = { + id: 'test', plugin_type: 'output', }; }); @@ -92,9 +96,9 @@ describe('get_pipeline', () => { }); describe('_enrichStateWithStatsAggregation function', () => { - let stateDocument; - let statsAggregation; - let timeseriesInterval; + let stateDocument: any; + let statsAggregation: object; + let timeseriesInterval: number; beforeEach(() => { stateDocument = { diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts index d8bfd91a4aec8..7882256f83d22 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts @@ -7,14 +7,11 @@ import boom from '@hapi/boom'; import { get } from 'lodash'; -// @ts-ignore import { checkParam } from '../error_missing_required'; import { getPipelineStateDocument } from './get_pipeline_state_document'; -// @ts-ignore import { getPipelineStatsAggregation } from './get_pipeline_stats_aggregation'; -// @ts-ignore import { calculateTimeseriesInterval } from '../calculate_timeseries_interval'; -import { LegacyRequest } from '../../types'; +import { LegacyRequest, PipelineVersion } from '../../types'; import { ElasticsearchSource, ElasticsearchSourceLogstashPipelineVertex, @@ -119,16 +116,10 @@ export async function getPipeline( lsIndexPattern: string, clusterUuid: string, pipelineId: string, - version: { firstSeen: string; lastSeen: string; hash: string } + version: PipelineVersion ) { checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline'); - const options: any = { - clusterUuid, - pipelineId, - version, - }; - // Determine metrics' timeseries interval based on version's timespan const minIntervalSeconds = config.get('monitoring.ui.min_interval_seconds'); const timeseriesInterval = calculateTimeseriesInterval( @@ -138,8 +129,21 @@ export async function getPipeline( ); const [stateDocument, statsAggregation] = await Promise.all([ - getPipelineStateDocument(req, lsIndexPattern, options), - getPipelineStatsAggregation(req, lsIndexPattern, timeseriesInterval, options), + getPipelineStateDocument({ + req, + logstashIndexPattern: lsIndexPattern, + clusterUuid, + pipelineId, + version, + }), + getPipelineStatsAggregation({ + req, + logstashIndexPattern: lsIndexPattern, + timeseriesInterval, + clusterUuid, + pipelineId, + version, + }), ]); if (stateDocument === null || !statsAggregation) { diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_ids.ts b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_ids.ts index 1a5595d45ffbb..c9b7a3adfc18e 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_ids.ts +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_ids.ts @@ -7,16 +7,24 @@ import moment from 'moment'; import { get } from 'lodash'; -import { LegacyRequest, Bucket } from '../../types'; +import { LegacyRequest, Bucket, Pipeline } from '../../types'; import { createQuery } from '../create_query'; import { LogstashMetric } from '../metrics'; -export async function getLogstashPipelineIds( - req: LegacyRequest, - logstashIndexPattern: string, - { clusterUuid, logstashUuid }: { clusterUuid: string; logstashUuid?: string }, - size: number -) { +interface GetLogstashPipelineIdsParams { + req: LegacyRequest; + lsIndexPattern: string; + clusterUuid: string; + size: number; + logstashUuid?: string; +} +export async function getLogstashPipelineIds({ + req, + lsIndexPattern, + clusterUuid, + logstashUuid, + size, +}: GetLogstashPipelineIdsParams): Promise { const start = moment.utc(req.payload.timeRange.min).valueOf(); const end = moment.utc(req.payload.timeRange.max).valueOf(); @@ -26,7 +34,7 @@ export async function getLogstashPipelineIds( } const params = { - index: logstashIndexPattern, + index: lsIndexPattern, size: 0, ignore_unavailable: true, filter_path: ['aggregations.nest.id.buckets', 'aggregations.nest_mb.id.buckets'], diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.ts b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.ts index 61c99c3a069b3..8558e117fb2f8 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.ts +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.ts @@ -5,22 +5,24 @@ * 2.0. */ -// @ts-ignore import { createQuery } from '../create_query'; -// @ts-ignore import { LogstashMetric } from '../metrics'; -import { LegacyRequest } from '../../types'; +import { LegacyRequest, PipelineVersion } from '../../types'; import { ElasticsearchResponse } from '../../../common/types/es'; -export async function getPipelineStateDocument( - req: LegacyRequest, - logstashIndexPattern: string, - { - clusterUuid, - pipelineId, - version, - }: { clusterUuid: string; pipelineId: string; version: { hash: string } } -) { +export async function getPipelineStateDocument({ + req, + logstashIndexPattern, + clusterUuid, + pipelineId, + version, +}: { + req: LegacyRequest; + logstashIndexPattern: string; + clusterUuid: string; + pipelineId: string; + version: PipelineVersion; +}) { const { callWithRequest } = req.server.plugins?.elasticsearch.getCluster('monitoring'); const filters = [ { term: { 'logstash_state.pipeline.id': pipelineId } }, @@ -52,7 +54,6 @@ export async function getPipelineStateDocument( }; const resp = (await callWithRequest(req, 'search', params)) as ElasticsearchResponse; - // Return null if doc not found return resp.hits?.hits[0]?._source ?? null; } diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_stats_aggregation.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_stats_aggregation.ts similarity index 80% rename from x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_stats_aggregation.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_stats_aggregation.ts index 4d9d2a720a162..0205ce21c59b9 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_stats_aggregation.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_stats_aggregation.ts @@ -5,16 +5,22 @@ * 2.0. */ +import { LegacyRequest, PipelineVersion } from '../../types'; import { createQuery } from '../create_query'; import { LogstashMetric } from '../metrics'; -function scalarCounterAggregation(field, fieldPath, ephemeralIdField, maxBucketSize) { +function scalarCounterAggregation( + field: string, + fieldPath: string, + ephemeralIdField: string, + maxBucketSize: string +) { const fullPath = `${fieldPath}.${field}`; const byEphemeralIdName = `${field}_temp_by_ephemeral_id`; const sumName = `${field}_total`; - const aggs = {}; + const aggs: { [key: string]: any } = {}; aggs[byEphemeralIdName] = { terms: { @@ -46,7 +52,7 @@ function scalarCounterAggregation(field, fieldPath, ephemeralIdField, maxBucketS return aggs; } -function nestedVertices(maxBucketSize) { +function nestedVertices(maxBucketSize: string) { const fieldPath = 'logstash_stats.pipelines.vertices'; const ephemeralIdField = 'logstash_stats.pipelines.vertices.pipeline_ephemeral_id'; @@ -73,7 +79,7 @@ function nestedVertices(maxBucketSize) { }; } -function createScopedAgg(pipelineId, pipelineHash, agg) { +function createScopedAgg(pipelineId: string, pipelineHash: string, agg: { [key: string]: any }) { return { pipelines: { nested: { path: 'logstash_stats.pipelines' }, @@ -95,13 +101,13 @@ function createScopedAgg(pipelineId, pipelineHash, agg) { } function fetchPipelineLatestStats( - query, - logstashIndexPattern, - pipelineId, - version, - maxBucketSize, - callWithRequest, - req + query: object, + logstashIndexPattern: string, + pipelineId: string, + version: PipelineVersion, + maxBucketSize: string, + callWithRequest: any, + req: LegacyRequest ) { const params = { index: logstashIndexPattern, @@ -115,7 +121,7 @@ function fetchPipelineLatestStats( 'aggregations.pipelines.scoped.total_processor_duration_stats', ], body: { - query: query, + query, aggs: createScopedAgg(pipelineId, version.hash, { vertices: nestedVertices(maxBucketSize), total_processor_duration_stats: { @@ -130,12 +136,21 @@ function fetchPipelineLatestStats( return callWithRequest(req, 'search', params); } -export function getPipelineStatsAggregation( +export function getPipelineStatsAggregation({ req, logstashIndexPattern, timeseriesInterval, - { clusterUuid, start, end, pipelineId, version } -) { + clusterUuid, + pipelineId, + version, +}: { + req: LegacyRequest; + logstashIndexPattern: string; + timeseriesInterval: number; + clusterUuid: string; + pipelineId: string; + version: PipelineVersion; +}) { const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring'); const filters = [ { @@ -153,8 +168,8 @@ export function getPipelineStatsAggregation( }, ]; - start = version.lastSeen - timeseriesInterval * 1000; - end = version.lastSeen; + const start = version.lastSeen - timeseriesInterval * 1000; + const end = version.lastSeen; const query = createQuery({ types: ['stats', 'logstash_stats'], @@ -172,6 +187,8 @@ export function getPipelineStatsAggregation( logstashIndexPattern, pipelineId, version, + // @ts-ignore not undefined, need to get correct config + // https://github.com/elastic/kibana/issues/112146 config.get('monitoring.ui.max_bucket_size'), callWithRequest, req diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_versions.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_versions.ts similarity index 78% rename from x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_versions.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_versions.ts index c52d41a363055..18330a83185ca 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_versions.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_versions.ts @@ -5,14 +5,25 @@ * 2.0. */ +import { get } from 'lodash'; import { createQuery } from '../create_query'; import { LogstashMetric } from '../metrics'; -import { get } from 'lodash'; import { checkParam } from '../error_missing_required'; +import { LegacyRequest } from '../../types'; -function fetchPipelineVersions(...args) { - const [req, config, logstashIndexPattern, clusterUuid, pipelineId] = args; - checkParam(logstashIndexPattern, 'logstashIndexPattern in getPipelineVersions'); +function fetchPipelineVersions({ + req, + lsIndexPattern, + clusterUuid, + pipelineId, +}: { + req: LegacyRequest; + lsIndexPattern: string; + clusterUuid: string; + pipelineId: string; +}) { + const config = req.server.config(); + checkParam(lsIndexPattern, 'logstashIndexPattern in getPipelineVersions'); const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring'); const filters = [ @@ -80,7 +91,7 @@ function fetchPipelineVersions(...args) { }; const params = { - index: logstashIndexPattern, + index: lsIndexPattern, size: 0, ignore_unavailable: true, body: { @@ -93,20 +104,25 @@ function fetchPipelineVersions(...args) { return callWithRequest(req, 'search', params); } -export function _handleResponse(response) { +export function _handleResponse(response: any) { const pipelineHashes = get( response, 'aggregations.pipelines.scoped.by_pipeline_hash.buckets', [] ); - return pipelineHashes.map((pipelineHash) => ({ + return pipelineHashes.map((pipelineHash: any) => ({ hash: pipelineHash.key, firstSeen: get(pipelineHash, 'path_to_root.first_seen.value'), lastSeen: get(pipelineHash, 'path_to_root.last_seen.value'), })); } -export async function getPipelineVersions(...args) { - const response = await fetchPipelineVersions(...args); +export async function getPipelineVersions(args: { + req: LegacyRequest; + lsIndexPattern: string; + clusterUuid: string; + pipelineId: string; +}) { + const response = await fetchPipelineVersions(args); return _handleResponse(response); } diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.ts b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.ts index e41eea0bce64a..b75bf13dafd62 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.ts +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.ts @@ -7,14 +7,11 @@ import boom from '@hapi/boom'; import { get } from 'lodash'; -// @ts-ignore import { checkParam } from '../error_missing_required'; import { getPipelineStateDocument } from './get_pipeline_state_document'; -// @ts-ignore import { getPipelineVertexStatsAggregation } from './get_pipeline_vertex_stats_aggregation'; -// @ts-ignore import { calculateTimeseriesInterval } from '../calculate_timeseries_interval'; -import { LegacyRequest } from '../../types'; +import { LegacyRequest, PipelineVersion } from '../../types'; import { ElasticsearchSource, ElasticsearchSourceLogstashPipelineVertex, @@ -135,18 +132,11 @@ export async function getPipelineVertex( lsIndexPattern: string, clusterUuid: string, pipelineId: string, - version: { hash: string; firstSeen: string; lastSeen: string }, + version: PipelineVersion, vertexId: string ) { checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline'); - const options = { - clusterUuid, - pipelineId, - version, - vertexId, - }; - // Determine metrics' timeseries interval based on version's timespan const minIntervalSeconds = config.get('monitoring.ui.min_interval_seconds'); const timeseriesInterval = calculateTimeseriesInterval( @@ -156,8 +146,22 @@ export async function getPipelineVertex( ); const [stateDocument, statsAggregation] = await Promise.all([ - getPipelineStateDocument(req, lsIndexPattern, options), - getPipelineVertexStatsAggregation(req, lsIndexPattern, timeseriesInterval, options), + getPipelineStateDocument({ + req, + logstashIndexPattern: lsIndexPattern, + clusterUuid, + pipelineId, + version, + }), + getPipelineVertexStatsAggregation({ + req, + logstashIndexPattern: lsIndexPattern, + timeSeriesIntervalInSeconds: timeseriesInterval, + clusterUuid, + pipelineId, + version, + vertexId, + }), ]); if (stateDocument === null || !statsAggregation) { diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.ts similarity index 75% rename from x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.ts index 97a8c463a2259..875d6ef962981 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.ts @@ -5,16 +5,22 @@ * 2.0. */ +import { LegacyRequest, PipelineVersion } from '../../types'; import { createQuery } from '../create_query'; import { LogstashMetric } from '../metrics'; -function scalarCounterAggregation(field, fieldPath, ephemeralIdField, maxBucketSize) { +function scalarCounterAggregation( + field: string, + fieldPath: string, + ephemeralIdField: string, + maxBucketSize: number +) { const fullPath = `${fieldPath}.${field}`; const byEphemeralIdName = `${field}_temp_by_ephemeral_id`; const sumName = `${field}_total`; - const aggs = {}; + const aggs: any = {}; aggs[byEphemeralIdName] = { terms: { @@ -46,11 +52,11 @@ function scalarCounterAggregation(field, fieldPath, ephemeralIdField, maxBucketS return aggs; } -function createAggsObjectFromAggsList(aggsList) { - return aggsList.reduce((aggsSoFar, agg) => ({ ...aggsSoFar, ...agg }), {}); +function createAggsObjectFromAggsList(aggsList: any) { + return aggsList.reduce((aggsSoFar: object, agg: object) => ({ ...aggsSoFar, ...agg }), {}); } -function createNestedVertexAgg(vertexId, maxBucketSize) { +function createNestedVertexAgg(vertexId: string, maxBucketSize: number) { const fieldPath = 'logstash_stats.pipelines.vertices'; const ephemeralIdField = 'logstash_stats.pipelines.vertices.pipeline_ephemeral_id'; @@ -96,7 +102,7 @@ function createTotalProcessorDurationStatsAgg() { }; } -function createScopedAgg(pipelineId, pipelineHash, ...aggsList) { +function createScopedAgg(pipelineId: string, pipelineHash: string, ...aggsList: object[]) { return { pipelines: { nested: { path: 'logstash_stats.pipelines' }, @@ -117,7 +123,7 @@ function createScopedAgg(pipelineId, pipelineHash, ...aggsList) { }; } -function createTimeSeriesAgg(timeSeriesIntervalInSeconds, ...aggsList) { +function createTimeSeriesAgg(timeSeriesIntervalInSeconds: number, ...aggsList: object[]) { return { timeseries: { date_histogram: { @@ -129,7 +135,7 @@ function createTimeSeriesAgg(timeSeriesIntervalInSeconds, ...aggsList) { }; } -function fetchPipelineVertexTimeSeriesStats( +function fetchPipelineVertexTimeSeriesStats({ query, logstashIndexPattern, pipelineId, @@ -138,8 +144,18 @@ function fetchPipelineVertexTimeSeriesStats( timeSeriesIntervalInSeconds, maxBucketSize, callWithRequest, - req -) { + req, +}: { + query: object; + logstashIndexPattern: string; + pipelineId: string; + version: PipelineVersion; + vertexId: string; + timeSeriesIntervalInSeconds: number; + maxBucketSize: number; + callWithRequest: (req: any, endpoint: string, params: any) => Promise; + req: LegacyRequest; +}) { const aggs = { ...createTimeSeriesAgg( timeSeriesIntervalInSeconds, @@ -165,7 +181,7 @@ function fetchPipelineVertexTimeSeriesStats( 'aggregations.timeseries.buckets.pipelines.scoped.total_processor_duration_stats', ], body: { - query: query, + query, aggs, }, }; @@ -173,12 +189,23 @@ function fetchPipelineVertexTimeSeriesStats( return callWithRequest(req, 'search', params); } -export function getPipelineVertexStatsAggregation( +export function getPipelineVertexStatsAggregation({ req, logstashIndexPattern, timeSeriesIntervalInSeconds, - { clusterUuid, start, end, pipelineId, version, vertexId } -) { + clusterUuid, + pipelineId, + version, + vertexId, +}: { + req: LegacyRequest; + logstashIndexPattern: string; + timeSeriesIntervalInSeconds: number; + clusterUuid: string; + pipelineId: string; + version: PipelineVersion; + vertexId: string; +}) { const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring'); const filters = [ { @@ -196,8 +223,8 @@ export function getPipelineVertexStatsAggregation( }, ]; - start = version.firstSeen; - end = version.lastSeen; + const start = version.firstSeen; + const end = version.lastSeen; const query = createQuery({ types: ['stats', 'logstash_stats'], @@ -210,15 +237,17 @@ export function getPipelineVertexStatsAggregation( const config = req.server.config(); - return fetchPipelineVertexTimeSeriesStats( + return fetchPipelineVertexTimeSeriesStats({ query, logstashIndexPattern, pipelineId, version, vertexId, timeSeriesIntervalInSeconds, - config.get('monitoring.ui.max_bucket_size'), + // @ts-ignore not undefined, need to get correct config + // https://github.com/elastic/kibana/issues/112146 + maxBucketSize: config.get('monitoring.ui.max_bucket_size'), callWithRequest, - req - ); + req, + }); } diff --git a/x-pack/plugins/monitoring/server/lib/logstash/sort_pipelines.js b/x-pack/plugins/monitoring/server/lib/logstash/sort_pipelines.ts similarity index 56% rename from x-pack/plugins/monitoring/server/lib/logstash/sort_pipelines.js rename to x-pack/plugins/monitoring/server/lib/logstash/sort_pipelines.ts index 19e0a78865635..5ecd4f6632c48 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/sort_pipelines.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/sort_pipelines.ts @@ -6,11 +6,15 @@ */ import { orderBy } from 'lodash'; +import { Pipeline, PipelineMetricKey } from '../../types'; -export function sortPipelines(pipelines, sort) { +export function sortPipelines( + pipelines: Pipeline[], + sort: { field: PipelineMetricKey | ''; direction: 'asc' | 'desc' } +): Pipeline[] { if (!sort) { return pipelines; } - return orderBy(pipelines, (pipeline) => pipeline[sort.field], sort.direction); + return orderBy(pipelines, [sort.field], [sort.direction]); } diff --git a/x-pack/plugins/monitoring/server/lib/pagination/filter.js b/x-pack/plugins/monitoring/server/lib/pagination/filter.ts similarity index 75% rename from x-pack/plugins/monitoring/server/lib/pagination/filter.js rename to x-pack/plugins/monitoring/server/lib/pagination/filter.ts index 96c4fda34eb84..d6d5f55dcdd3c 100644 --- a/x-pack/plugins/monitoring/server/lib/pagination/filter.js +++ b/x-pack/plugins/monitoring/server/lib/pagination/filter.ts @@ -7,14 +7,19 @@ import { get } from 'lodash'; -function defaultFilterFn(value, query) { +function defaultFilterFn(value: string, query: string) { if (value.toLowerCase().includes(query.toLowerCase())) { return true; } return false; } -export function filter(data, queryText, fields, filterFn = defaultFilterFn) { +export function filter( + data: T[], + queryText: string, + fields: string[], + filterFn = defaultFilterFn +): T[] { return data.filter((item) => { for (const field of fields) { if (filterFn(get(item, field, ''), queryText)) { diff --git a/x-pack/plugins/monitoring/server/lib/pagination/paginate.js b/x-pack/plugins/monitoring/server/lib/pagination/paginate.ts similarity index 77% rename from x-pack/plugins/monitoring/server/lib/pagination/paginate.js rename to x-pack/plugins/monitoring/server/lib/pagination/paginate.ts index 2d0e9e83ea901..15d3cb085787e 100644 --- a/x-pack/plugins/monitoring/server/lib/pagination/paginate.js +++ b/x-pack/plugins/monitoring/server/lib/pagination/paginate.ts @@ -5,7 +5,7 @@ * 2.0. */ -export function paginate({ size, index }, data) { +export function paginate({ size, index }: { size: number; index: number }, data: T[]): T[] { const start = index * size; return data.slice(start, start + size); } diff --git a/x-pack/plugins/monitoring/server/lib/standalone_clusters/standalone_cluster_query_filter.js b/x-pack/plugins/monitoring/server/lib/standalone_clusters/standalone_cluster_query_filter.ts similarity index 100% rename from x-pack/plugins/monitoring/server/lib/standalone_clusters/standalone_cluster_query_filter.js rename to x-pack/plugins/monitoring/server/lib/standalone_clusters/standalone_cluster_query_filter.ts diff --git a/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipeline.js b/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipeline.js index 8b8e5cdcccdb4..b68a708b1f208 100644 --- a/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipeline.js +++ b/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipeline.js @@ -61,7 +61,13 @@ export function logstashPipelineRoute(server) { // Figure out which version of the pipeline we want to show let versions; try { - versions = await getPipelineVersions(req, config, lsIndexPattern, clusterUuid, pipelineId); + versions = await getPipelineVersions({ + req, + config, + lsIndexPattern, + clusterUuid, + pipelineId, + }); } catch (err) { return handleError(err, req); } diff --git a/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipeline_ids.js b/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipeline_ids.js index 5be8b9b965d95..c881ff7b3d23c 100644 --- a/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipeline_ids.js +++ b/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipeline_ids.js @@ -40,7 +40,7 @@ export function logstashClusterPipelineIdsRoute(server) { const size = config.get('monitoring.ui.max_bucket_size'); try { - const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, size); + const pipelines = await getLogstashPipelineIds({ req, lsIndexPattern, clusterUuid, size }); return pipelines; } catch (err) { throw handleError(err, req); diff --git a/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js b/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js index 646cd047d8b41..1f7a5e1d436b1 100644 --- a/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js +++ b/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js @@ -61,17 +61,16 @@ export function logstashClusterPipelinesRoute(server) { if (sort) { sort.field = sortMetricSetMap[sort.field] || sort.field; } - try { - const response = await getPaginatedPipelines( + const response = await getPaginatedPipelines({ req, lsIndexPattern, - { clusterUuid }, - { throughputMetric, nodesCountMetric }, + clusterUuid, + metrics: { throughputMetric, nodesCountMetric }, pagination, sort, - queryText - ); + queryText, + }); return { ...response, diff --git a/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js b/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js index c2af754af4563..47b8fd81a4d44 100644 --- a/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js +++ b/x-pack/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js @@ -64,15 +64,16 @@ export function logstashNodePipelinesRoute(server) { } try { - const response = await getPaginatedPipelines( + const response = await getPaginatedPipelines({ req, lsIndexPattern, - { clusterUuid, logstashUuid }, - { throughputMetric, nodesCountMetric }, + clusterUuid, + logstashUuid, + metrics: { throughputMetric, nodesCountMetric }, pagination, sort, - queryText - ); + queryText, + }); return { ...response, diff --git a/x-pack/plugins/monitoring/server/types.ts b/x-pack/plugins/monitoring/server/types.ts index e1010a7a5fd98..425c7f239138a 100644 --- a/x-pack/plugins/monitoring/server/types.ts +++ b/x-pack/plugins/monitoring/server/types.ts @@ -183,3 +183,77 @@ export interface ClusterSettingsReasonResponse { } export type ErrorTypes = Error | Boom.Boom | ResponseError | ElasticsearchClientError; + +export type Pipeline = { + id: string; + nodeIds: string[]; +} & { + [key in PipelineMetricKey]?: number; +}; + +export type PipelineMetricKey = + | 'logstash_cluster_pipeline_throughput' + | 'logstash_cluster_pipeline_node_count' + | 'logstash_node_pipeline_node_count' + | 'logstash_node_pipeline_throughput'; + +export type PipelineThroughputMetricKey = + | 'logstash_cluster_pipeline_throughput' + | 'logstash_node_pipeline_throughput'; + +export type PipelineNodeCountMetricKey = + | 'logstash_cluster_pipeline_node_count' + | 'logstash_node_pipeline_node_count'; + +export interface PipelineWithMetrics { + id: string; + metrics: { + logstash_cluster_pipeline_throughput?: PipelineMetricsProcessed; + logstash_cluster_pipeline_node_count?: PipelineMetricsProcessed; + logstash_node_pipeline_throughput?: PipelineMetricsProcessed; + logstash_node_pipeline_node_count?: PipelineMetricsProcessed; + }; +} + +export interface PipelineResponse { + id: string; + latestThroughput: number | null; + latestNodesCount: number | null; + metrics: { + nodesCount?: PipelineMetricsProcessed; + throughput?: PipelineMetricsProcessed; + }; +} +export interface PipelinesResponse { + pipelines: PipelineResponse[]; + totalPipelineCount: number; +} +export interface PipelineMetrics { + bucket_size: string; + timeRange: { + min: number; + max: number; + }; + metric: { + app: string; + field: string; + label: string; + description: string; + units: string; + format: string; + hasCalculation: boolean; + isDerivative: boolean; + }; +} +export type PipelineMetricsRes = PipelineMetrics & { + data: Array<[number, { [key: string]: number }]>; +}; +export type PipelineMetricsProcessed = PipelineMetrics & { + data: Array>; +}; + +export interface PipelineVersion { + firstSeen: number; + lastSeen: number; + hash: string; +}