diff --git a/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/graph_visualization.tsx b/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/graph_visualization.tsx index 08b986602caf3..f074b0cffb932 100644 --- a/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/graph_visualization.tsx +++ b/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/graph_visualization.tsx @@ -7,10 +7,10 @@ import { css } from '@emotion/react'; import { useActor } from '@xstate/react'; -import cytoscape, { CytoscapeOptions, NodeSingular } from 'cytoscape'; +import cytoscape, { CytoscapeOptions, EdgeSingular, NodeSingular } from 'cytoscape'; import dagre from 'cytoscape-dagre'; import React, { useEffect, useState } from 'react'; -import { useIngestPathwaysPageStateContext } from '../../state_machines/ingest_pathways'; +import { Agent, useIngestPathwaysPageStateContext } from '../../state_machines/ingest_pathways'; export const ConnectedGraphVisualization = React.memo(() => { const [state] = useActor(useIngestPathwaysPageStateContext()); @@ -64,6 +64,8 @@ const initialGraphOptions: CytoscapeOptions & { layout: Record } = }, 'text-wrap': 'wrap', shape: 'ellipse', + 'text-valign': 'center', + 'text-halign': 'left', }, }, { @@ -71,6 +73,15 @@ const initialGraphOptions: CytoscapeOptions & { layout: Record } = style: { label: 'data(dataStream.id)', shape: 'hexagon', + 'text-valign': 'center', + 'text-halign': 'right', + }, + }, + { + selector: 'node.ingestPipeline', + style: { + label: 'data(ingestPipeline.id)', + shape: 'diamond', }, }, { @@ -81,18 +92,33 @@ const initialGraphOptions: CytoscapeOptions & { layout: Record } = }, }, { - selector: 'edge.agentShipsTo', + selector: 'edge.shipsTo', style: { - label: 'data(relation.signalCount)', 'target-arrow-shape': 'chevron', 'target-arrow-fill': 'filled', }, }, + { + selector: 'edge.agentShipsTo', + style: { + // label: 'data(shipsTo.signalCount)', + width: (edge: EdgeSingular) => { + const agent: Agent = edge.data('agent'); + const totalSignalCount = agent.shipsTo.reduce( + (accumulatedSignalCount, { signalCount }) => accumulatedSignalCount + signalCount, + 0 + ); + return 1 + (9.0 / totalSignalCount) * edge.data('shipsTo').signalCount; + }, + }, + }, ], layout: { name: 'dagre', rankDir: 'LR', rankSep: 300, + nodeSep: 30, + ranker: 'longest-path', }, }; diff --git a/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/ingest_pathways.tsx b/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/ingest_pathways.tsx index da1cbe11b438c..a614c4741501a 100644 --- a/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/ingest_pathways.tsx +++ b/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/ingest_pathways.tsx @@ -10,6 +10,7 @@ import React, { useMemo } from 'react'; import { ObservabilityLogExplorerPageTemplate } from '../page_template'; import { ConnectedGraphVisualization } from './graph_visualization'; import { ConnectedLoadingIndicator } from './loading_indicator'; +import { ConnectedReloadButton } from './reload_button'; export const IngestPathways = React.memo(() => { const pageHeaderProps = useMemo( @@ -17,7 +18,7 @@ export const IngestPathways = React.memo(() => { alignItems: 'center', bottomBorder: 'extended', pageTitle: 'Ingest Pathways', - rightSideItems: [], + rightSideItems: [, ], }), [] ); diff --git a/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/reload_button.tsx b/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/reload_button.tsx new file mode 100644 index 0000000000000..f6f580c174582 --- /dev/null +++ b/x-pack/plugins/observability_log_explorer/public/components/ingest_pathways/reload_button.tsx @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { EuiButton } from '@elastic/eui'; +import { useActor } from '@xstate/react'; +import React from 'react'; +import { useIngestPathwaysPageStateContext } from '../../state_machines/ingest_pathways'; + +export const ConnectedReloadButton = React.memo(() => { + const [, send] = useActor(useIngestPathwaysPageStateContext()); + + return ( + { + send({ + type: 'load', + }); + }} + > + Reload + + ); +}); diff --git a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/graph/calculate_graph.ts b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/graph/calculate_graph.ts index 6a4ddc368fa02..5038cf31ab111 100644 --- a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/graph/calculate_graph.ts +++ b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/graph/calculate_graph.ts @@ -6,25 +6,34 @@ */ import { ElementDefinition } from 'cytoscape'; -import { Agent, DataStream, IngestPathwaysData, Relation } from '../types'; +import { + Agent, + DataStreamEntry, + IndexTemplate, + IngestPathwaysData, + IngestPipelineEntry, +} from '../types'; export const calculateGraph = ({ agents, dataStreams, - relations, + indexTemplates, + ingestPipelines, }: IngestPathwaysData): { elements: ElementDefinition[] } => { const dataStreamElements = Object.values(dataStreams).flatMap(convertDataStreamToGraphElements); - const agentElements = Object.values(agents).flatMap(convertAgentToGraphElements); - const relationElements = relations.flatMap( - convertRelationToGraphElements({ agents, dataStreams }) + const agentElements = Object.values(agents).flatMap( + convertAgentToGraphElements({ dataStreams, indexTemplates }) + ); + const ingestPipelineElements = Object.values(ingestPipelines).flatMap( + convertIngestPipelineToGraphElements ); return { - elements: [...dataStreamElements, ...agentElements, ...relationElements], + elements: [...dataStreamElements, ...agentElements, ...ingestPipelineElements], }; }; -const convertDataStreamToGraphElements = (dataStream: DataStream): ElementDefinition[] => [ +const convertDataStreamToGraphElements = (dataStream: DataStreamEntry): ElementDefinition[] => [ { group: 'nodes', classes: 'dataStream', @@ -35,49 +44,106 @@ const convertDataStreamToGraphElements = (dataStream: DataStream): ElementDefini }, ]; -const convertAgentToGraphElements = (agent: Agent): ElementDefinition[] => [ - { - group: 'nodes', - classes: 'agent', - data: { - id: getAgentElementId(agent), - agent, - }, - }, -]; - -const convertRelationToGraphElements = +const convertAgentToGraphElements = ({ - agents, dataStreams, + indexTemplates, }: { - agents: Record; - dataStreams: Record; + dataStreams: Record; + indexTemplates: Record; }) => - (relation: Relation): ElementDefinition[] => { - if (relation.type === 'agent-ships-to-data-stream') { - const agent = agents[relation.agentId]; - const dataStream = dataStreams[relation.dataStreamId]; - - return [ - { + (agent: Agent): ElementDefinition[] => + [ + { + group: 'nodes', + classes: 'agent', + data: { + id: getAgentElementId(agent), + agent, + }, + }, + ...agent.shipsTo.flatMap((shipsTo): ElementDefinition[] => { + const dataStream = dataStreams[shipsTo.dataStreamId]; + const source = getAgentElementId(agent); + const target = getDataStreamElementId({ + type: 'dataStreamStub', + id: shipsTo.dataStreamId, + }); + const agentDataStreamEdge: ElementDefinition = { group: 'edges', - classes: 'agentShipsTo', + classes: 'shipsTo agentShipsTo', data: { - id: `relation-agent-${relation.agentId}-ships-to-dataStream-${relation.dataStreamId}`, - source: getAgentElementId(agent), - target: getDataStreamElementId(dataStream), - relation, + id: `relation-${source}-ships-to-${target}`, + source, + target, agent, - dataStream, + shipsTo, }, - }, - ]; - } else { - return []; - } - }; + }; + + if (dataStream.type === 'dataStream') { + const indexTemplate = indexTemplates[dataStream.indexTemplateId]; -const getDataStreamElementId = ({ id }: DataStream) => `dataStream-${id}`; + return indexTemplate.ingestPipelineIds.reduce( + (edges, ingestPipelineId, ingestPipelineIndex, ingestPipelineIds) => { + const lastEdge = edges[edges.length - 1]; + const leadingEdges = edges.slice(0, -1); + + const ingestPipelineElementId = getIngestPipelineElementId({ + type: 'ingestPipelineStub', + id: ingestPipelineId, + }); + + const splitEdges: ElementDefinition[] = [ + { + group: 'edges', + classes: lastEdge.classes, + data: { + id: `relation-${lastEdge.data.source}-ships-to-${ingestPipelineElementId}`, + source: lastEdge.data.source, + target: ingestPipelineElementId, + shipsTo: lastEdge.data.shipsTo, + agent, + }, + }, + { + group: 'edges', + classes: 'shipsTo', + data: { + id: `relation-${ingestPipelineElementId}-ships-to-${lastEdge.data.target}`, + source: ingestPipelineElementId, + target: lastEdge.data.target, + shipsTo: {}, + agent, + }, + }, + ]; + + return [...leadingEdges, ...splitEdges]; + }, + [agentDataStreamEdge] + ); + } else { + return [agentDataStreamEdge]; + } + }), + ]; + +const convertIngestPipelineToGraphElements = ( + ingestPipeline: IngestPipelineEntry +): ElementDefinition[] => [ + { + group: 'nodes', + classes: 'ingestPipeline', + data: { + id: getIngestPipelineElementId(ingestPipeline), + ingestPipeline, + }, + }, +]; + +const getDataStreamElementId = ({ id }: DataStreamEntry) => `dataStream-${id}`; const getAgentElementId = ({ id }: Agent) => `agent-${id}`; + +const getIngestPipelineElementId = ({ id }: IngestPipelineEntry) => `ingestPipeline-${id}`; diff --git a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/ingest_pathways_state_machine.ts b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/ingest_pathways_state_machine.ts index 04f28695c1094..80641f6abc238 100644 --- a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/ingest_pathways_state_machine.ts +++ b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/ingest_pathways_state_machine.ts @@ -16,18 +16,21 @@ import { loadDataStreams } from './services/load_data_streams'; import { loadIndexTemplates } from './services/load_index_templates'; import { loadSignalData } from './services/load_signal_data'; import { + Agent, DataStream, + DataStreamStub, IndexTemplate, IngestPathwaysData, IngestPathwaysParameters, IngestPipeline, + IngestPipelineStub, } from './types'; import { mergeIngestPathwaysData } from './utils'; export const createPureIngestPathwaysStateMachine = (initialContext: IngestPathwaysContext) => createMachine( { - /** @xstate-layout N4IgpgJg5mDOIC5QEkB2NYBcAKBDTAFgO64CesAdAK6oCWdmtuANrQF6QDEA2gAwC6iUAAcA9rFqNRqISAAeiAGwB2CgA4VARjXKArABoQpRJoAspigE5r15crUaATGs2OAvm8NoMOfMTKUzKK4EPRQ3nA4tMJgrKhwnBDSYBT0AG6iANYpQSERWNjRsfRwfIJIIGISUjIVCgi86pa8io4GRiaOvADMFKY2lnYOrWruniD5voQk5BS5oegAyrRQqCwAIvi4icmpqBnZc8EQy6sbW2WyVZK00rL1Zn263a7txgjdpppWA0NOLh4vOhInhpgEjiEwmgIGA5AAVMAAW2EzHwCSS8T2Bxyx2hsIRyNRmFKAiu4hudzqiAAtI1FKY1KZunpDO9LG0KMp+pZGVzXJpeJZARNgQU-DNAscwptMLhFpgAE5gXCI2A7THpLI4kIyuWK5Wqy4Va41e6IVTKSyKHRvJSPLnWUyKXSWUyObpqboecaoUQw+AVSag-zkMnVW61UD1amuSxWZm2hDUxQUXhptOORyaTTM7T2XTCoPi8E0eg3FjsSBhimR+SIN2sxAuPq-ez-VyF0VTEOSkJV43k01UhC6b6WMwtRzcgYaRsIHO6VQDVovUeKDvjItg2bzKFdwoxOJwatDqM0+zqLQ2ucCjQ-Gx-EZjIE+YMSiELKCnNbMXUniNmkmo6cm6eaJpm3zToMbYjJonavsWO5SugeLwkiKJogGIiDgBw7Us2lgJnOehqBQU7WLyXxZoK8Egohvafrq8pKiqWGVDhlJnkmzqproTrXh0CD9I05HQdm2iaPS3puEAA */ + /** @xstate-layout N4IgpgJg5mDOIC5QEkB2NYBcAKBDTAFgO64CesAdAK6oCWdmtuANrQF6QDEA2gAwC6iUAAcA9rFqNRqISAAeiAGwB2CgA4VARjXKArABoQpRJoAspigE5r15crUaATGs2OAvm8NoMOfMTKUzKK4EPRQ3nA4tMJgrKhwnBDSYBT0AG6iANYpQSERWNjRsfRwfIJIIGISUjIVCgi86pa8io4GRiaOvADMFKY2lnYOrWruniD5voQk5BS5oegAyrRQqCwAIvi4icmpqBnZc8EQy6sbW2WyVZK00rL1Zn263a7txgjdpppWA0NOLh4vOhInhpgEjiEwmgIGA5AAVMAAW2EzHwCSS8T2Bxyx2hsIRyNRmFKAiu4hudzqiAAtI1FKY1KZunpDO9LG0KMp+pZGVzXJpeJZARNgQU-DNAscwptMLhFpgAE5gXCI2A7THpLI4kIyuWK5Wqy4Va41e6IVTKSyKHRvJSPLnWUyKXSWUyObpqbrCyag-yzebSrbypUqtXzI0icmmqkIZzfRyOJ2J3QC3S6OyKVmIFO9PS8fOmFrOLljIE+X0SiELcKoGHwpEotFh44RypR261UD1F6WCiOTRpj3u7qWMyOLMIezfB2Wbou3h2bqlkXl8XggPoH1FOIJcOk43tyldxBztQUbqKAdtS1MvRqCeaR+NXT53iJ5Qj626NTe0VTP2SiEXB7uUkbVB2ZofFafSmDmajsqMXwXg+c6qAMrQvCml5jOMqCiDC8AVD6a7kGS4FHvINKuL2s4sh0CDUooFCvrwT7dC0Ni6MuxFgrMND0DcLDsJAZEUp2lEIG6E4uH0vz2P8ri-quvGATCECidGx4ICmVhmEW3IDBoKHpj81gYQOmjYUpIIkapUJ-oUMQ7oRYFiZB1L2OoWg2g+vAaKZtjySM3EObZVZhKcazMLqGkQTG1I6SW2h0e8CbfAZgxBXG1liip4WbrW+INkScCxRR9TUjJtG2pO359tyvJfP2go5f+lYblAurBgaLltuR4kVc6zG6E6Pn0f0jSJoFj7aJZpgeB4QA */ context: initialContext, predictableActionArguments: true, id: 'IngestPathways', @@ -42,7 +45,11 @@ export const createPureIngestPathwaysStateMachine = (initialContext: IngestPathw always: 'loadingSignalData', }, - loaded: {}, + loaded: { + on: { + load: 'loadingSignalData', + }, + }, loadingIngestPipelines: { invoke: { @@ -55,6 +62,10 @@ export const createPureIngestPathwaysStateMachine = (initialContext: IngestPathw id: 'loadIngestPipelines', }, + + on: { + load: 'loadingSignalData', + }, }, loadingSignalData: { @@ -74,22 +85,32 @@ export const createPureIngestPathwaysStateMachine = (initialContext: IngestPathw invoke: { src: 'loadIndexTemplates', id: 'loadIndexTemplates', + onDone: { target: 'loadingIngestPipelines', actions: ['storeIndexTemplates', 'updateGraph'], }, }, + + on: { + load: 'loadingSignalData', + }, }, loadingDataStreams: { invoke: { src: 'loadDataStreams', id: 'loadDataStreams', + onDone: { target: 'loadingIndexTemplates', actions: 'storeDataStreams', }, }, + + on: { + load: 'loadingSignalData', + }, }, }, }, @@ -122,7 +143,8 @@ export const createPureIngestPathwaysStateMachine = (initialContext: IngestPathw } return mergeIngestPathwaysData(context.data, { - indexTemplates: event.data, + indexTemplates: event.data.indexTemplates, + ingestPipelines: event.data.ingestPipelines, }); }, }), @@ -176,7 +198,6 @@ export const createIngestPathwaysStateMachine = ({ agents: {}, indexTemplates: {}, ingestPipelines: {}, - relations: [], }, graph: { elements: [], @@ -203,13 +224,19 @@ export interface IngestPathwaysServices { data: any; }; loadSignalData: { - data: IngestPathwaysData; + data: { + agents: Record; + dataStreams: Record; + }; }; loadDataStreams: { data: Record; }; loadIndexTemplates: { - data: Record; + data: { + indexTemplates: Record; + ingestPipelines: Record; + }; }; loadIngestPipelines: { data: Record; diff --git a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_data_streams.ts b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_data_streams.ts index 17e0aaff4789c..c5167ba4d37f7 100644 --- a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_data_streams.ts +++ b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_data_streams.ts @@ -12,11 +12,11 @@ import { IngestPathwaysContext, IngestPathwaysServices } from '../ingest_pathway import { DataStream } from '../types'; import { INDEX_MANAGEMENT_PREFIX } from '../utils'; +type LoadDataStreamsResult = IngestPathwaysServices['loadDataStreams']['data']; + export const loadDataStreams = ({ http }: { http: HttpStart }) => - async ({ - data: { dataStreams }, - }: IngestPathwaysContext): Promise => { + async ({ data: { dataStreams } }: IngestPathwaysContext): Promise => { const updatedDataStreams: Record = Object.fromEntries( await Promise.all( Object.entries(dataStreams).map(async ([, dataStream]) => { @@ -25,13 +25,13 @@ export const loadDataStreams = ); const response = decodeOrThrow(dataStreamResponseRT)(rawResponse); - return [ - dataStream.id, - { - ...dataStream, - indexTemplateId: response.indexTemplateName, - }, - ]; + const newDataStream: DataStream = { + type: 'dataStream', + id: dataStream.id, + indexTemplateId: response.indexTemplateName, + }; + + return [dataStream.id, newDataStream] as const; }) ) ); diff --git a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_index_templates.ts b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_index_templates.ts index 409c5de730b2b..b78144c47a29f 100644 --- a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_index_templates.ts +++ b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_index_templates.ts @@ -9,27 +9,30 @@ import { HttpStart } from '@kbn/core-http-browser'; import { decodeOrThrow } from '@kbn/io-ts-utils'; import * as rt from 'io-ts'; import { IngestPathwaysContext, IngestPathwaysServices } from '../ingest_pathways_state_machine'; -import { IndexTemplate } from '../types'; +import { IndexTemplate, IngestPipelineStub } from '../types'; import { INDEX_MANAGEMENT_PREFIX } from '../utils'; +type LoadIndexTemplatesResult = IngestPathwaysServices['loadIndexTemplates']['data']; + export const loadIndexTemplates = ({ http }: { http: HttpStart }) => - async ({ - data: { dataStreams }, - }: IngestPathwaysContext): Promise => { + async ({ data: { dataStreams } }: IngestPathwaysContext): Promise => { // load main index templates const mentionedIndexTemplateNames = new Set( - Object.values(dataStreams).flatMap(({ indexTemplateId }) => - indexTemplateId != null ? [indexTemplateId] : [] + Object.values(dataStreams).flatMap((dataStream) => + dataStream.type === 'dataStream' ? [dataStream.indexTemplateId] : [] ) ); const indexTemplateInfos = Object.fromEntries( await Promise.all( Array.from(mentionedIndexTemplateNames).map(async (indexTemplateName) => { - const rawResponse = await http.get( - `${INDEX_MANAGEMENT_PREFIX}/index_templates/${indexTemplateName}` - ); + const rawResponse = await http + .get(`${INDEX_MANAGEMENT_PREFIX}/index_templates/${indexTemplateName}`) + .catch((err) => ({ + composedOf: [], + template: {}, + })); return [indexTemplateName, decodeOrThrow(indexTemplateResponseRT)(rawResponse)] as const; }) ) @@ -40,12 +43,14 @@ export const loadIndexTemplates = Object.values(indexTemplateInfos).flatMap(({ composedOf }) => composedOf) ); - const componentTemplateInfos = Object.fromEntries( + const componentTemplateInfos: Record = Object.fromEntries( await Promise.all( Array.from(mentionedComponentTemplateNames).map(async (componentTemplateName) => { - const rawResponse = await http.get( - `${INDEX_MANAGEMENT_PREFIX}/component_templates/${componentTemplateName}` - ); + const rawResponse = await http + .get(`${INDEX_MANAGEMENT_PREFIX}/component_templates/${componentTemplateName}`) + .catch((err) => ({ + template: {}, + })); return [ componentTemplateName, decodeOrThrow(componentTemplateResponseRT)(rawResponse), @@ -57,43 +62,91 @@ export const loadIndexTemplates = // combine the index and component templates const indexTemplates: Record = Object.fromEntries( Object.entries(indexTemplateInfos).map(([indexTemplateId, indexTemplateInfo]) => { - const defaultPipelines: string[] = [ - // indexTemplateInfo.template.settings.index.default_pipeline, - ]; - const finalPipelines: string[] = []; + const defaultPipeline = reduceToLastTemplateName( + indexTemplateInfo, + componentTemplateInfos, + 'default_pipeline' + ); + const finalPipeline = reduceToLastTemplateName( + indexTemplateInfo, + componentTemplateInfos, + 'final_pipeline' + ); return [ indexTemplateId, { id: indexTemplateId, - ingestPipelineIds: [...defaultPipelines, ...finalPipelines], + ingestPipelineIds: [ + ...(defaultPipeline != null ? [defaultPipeline] : []), + ...(finalPipeline != null ? [finalPipeline] : []), + ], }, - ] as const; + ]; }) ); - return {}; + // derive ingest pipeline stubs + const ingestPipelines: Record = Object.fromEntries( + Array.from( + new Set(Object.values(indexTemplates).flatMap(({ ingestPipelineIds }) => ingestPipelineIds)) + ).map((ingestPipelineId) => [ + ingestPipelineId, + { + type: 'ingestPipelineStub', + id: ingestPipelineId, + }, + ]) + ); + + return { + indexTemplates, + ingestPipelines, + }; }; +const reduceToLastTemplateName = ( + mainIndexTemplateInfo: IndexTemplateInfo, + availableComponentTemplateInfos: Record, + templateType: 'default_pipeline' | 'final_pipeline' +): string | undefined => { + return [ + mainIndexTemplateInfo.template.settings?.index?.[templateType], + ...mainIndexTemplateInfo.composedOf.map( + (composedOfTemplateName) => + availableComponentTemplateInfos[composedOfTemplateName].template.settings?.index?.[ + templateType + ] + ), + ].reduce((lastPipelineName, currentPipelineName) => { + return currentPipelineName ?? lastPipelineName; + }, undefined); +}; + +const templateSettingsRT = rt.exact( + rt.partial({ + settings: rt.exact( + rt.partial({ + index: rt.exact( + rt.partial({ + default_pipeline: rt.string, + final_pipeline: rt.string, + }) + ), + }) + ), + }) +); + const indexTemplateResponseRT = rt.strict({ composedOf: rt.array(rt.string), - template: rt.strict({ - settings: rt.strict({ - index: rt.partial({ - default_pipeline: rt.string, - final_pipeline: rt.string, - }), - }), - }), + template: templateSettingsRT, }); +type IndexTemplateInfo = rt.TypeOf; + const componentTemplateResponseRT = rt.strict({ - template: rt.strict({ - settings: rt.strict({ - index: rt.partial({ - default_pipeline: rt.string, - final_pipeline: rt.string, - }), - }), - }), + template: templateSettingsRT, }); + +type ComponentTemplateInfo = rt.TypeOf; diff --git a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_ingest_pipelines.ts b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_ingest_pipelines.ts new file mode 100644 index 0000000000000..d968047f8eb66 --- /dev/null +++ b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_ingest_pipelines.ts @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { HttpStart } from '@kbn/core-http-browser'; +import { decodeOrThrow } from '@kbn/io-ts-utils'; +import * as rt from 'io-ts'; +import { IngestPathwaysContext, IngestPathwaysServices } from '../ingest_pathways_state_machine'; +import { IndexTemplate, IngestPipeline } from '../types'; +import { INDEX_MANAGEMENT_PREFIX } from '../utils'; + +export const loadIndexTemplates = + ({ http }: { http: HttpStart }) => + async ({ + data: { dataStreams }, + }: IngestPathwaysContext): Promise => { + // load main index templates + const mentionedIndexTemplateNames = new Set( + Object.values(dataStreams).flatMap(({ indexTemplateId }) => + indexTemplateId != null ? [indexTemplateId] : [] + ) + ); + + const indexTemplateInfos = Object.fromEntries( + await Promise.all( + Array.from(mentionedIndexTemplateNames).map(async (indexTemplateName) => { + const rawResponse = await http + .get(`${INDEX_MANAGEMENT_PREFIX}/index_templates/${indexTemplateName}`) + .catch((err) => ({ + composedOf: [], + template: {}, + })); + return [indexTemplateName, decodeOrThrow(indexTemplateResponseRT)(rawResponse)] as const; + }) + ) + ); + + // load component templates + const mentionedComponentTemplateNames = new Set( + Object.values(indexTemplateInfos).flatMap(({ composedOf }) => composedOf) + ); + + const componentTemplateInfos: Record = Object.fromEntries( + await Promise.all( + Array.from(mentionedComponentTemplateNames).map(async (componentTemplateName) => { + const rawResponse = await http + .get(`${INDEX_MANAGEMENT_PREFIX}/component_templates/${componentTemplateName}`) + .catch((err) => ({ + template: {}, + })); + return [ + componentTemplateName, + decodeOrThrow(componentTemplateResponseRT)(rawResponse), + ] as const; + }) + ) + ); + + // combine the index and component templates + const indexTemplates: Record = Object.fromEntries( + Object.entries(indexTemplateInfos).map(([indexTemplateId, indexTemplateInfo]) => { + const defaultPipeline = reduceToLastTemplateName( + indexTemplateInfo, + componentTemplateInfos, + 'default_pipeline' + ); + const finalPipeline = reduceToLastTemplateName( + indexTemplateInfo, + componentTemplateInfos, + 'final_pipeline' + ); + + return [ + indexTemplateId, + { + id: indexTemplateId, + ingestPipelineIds: [ + ...(defaultPipeline != null ? [defaultPipeline] : []), + ...(finalPipeline != null ? [finalPipeline] : []), + ], + }, + ]; + }) + ); + + return { + indexTemplates, + ingestPipelines, + }; + }; + +const reduceToLastTemplateName = ( + mainIndexTemplateInfo: IndexTemplateInfo, + availableComponentTemplateInfos: Record, + templateType: 'default_pipeline' | 'final_pipeline' +): string | undefined => { + return [ + mainIndexTemplateInfo.template.settings?.index?.[templateType], + ...mainIndexTemplateInfo.composedOf.map( + (composedOfTemplateName) => + availableComponentTemplateInfos[composedOfTemplateName].template.settings?.index?.[ + templateType + ] + ), + ].reduce((lastPipelineName, currentPipelineName) => { + return currentPipelineName ?? lastPipelineName; + }, undefined); +}; + +const templateSettingsRT = rt.exact( + rt.partial({ + settings: rt.exact( + rt.partial({ + index: rt.exact( + rt.partial({ + default_pipeline: rt.string, + final_pipeline: rt.string, + }) + ), + }) + ), + }) +); + +const indexTemplateResponseRT = rt.strict({ + composedOf: rt.array(rt.string), + template: templateSettingsRT, +}); + +type IndexTemplateInfo = rt.TypeOf; + +const componentTemplateResponseRT = rt.strict({ + template: templateSettingsRT, +}); + +type ComponentTemplateInfo = rt.TypeOf; diff --git a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_signal_data.ts b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_signal_data.ts index 806effb9bdb99..779fc21f72e91 100644 --- a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_signal_data.ts +++ b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/services/load_signal_data.ts @@ -10,8 +10,9 @@ import { IDataStreamsStatsClient } from '@kbn/dataset-quality-plugin/public'; import { decodeOrThrow } from '@kbn/io-ts-utils'; import * as rt from 'io-ts'; import { lastValueFrom } from 'rxjs'; -import { IngestPathwaysContext } from '../ingest_pathways_state_machine'; -import { IngestPathwaysData } from '../types'; +import { IngestPathwaysContext, IngestPathwaysServices } from '../ingest_pathways_state_machine'; + +type LoadSignalDataResult = IngestPathwaysServices['loadSignalData']['data']; export const loadSignalData = ({ @@ -23,7 +24,7 @@ export const loadSignalData = }) => async ({ parameters: { dataStreamPattern, timeRange }, - }: IngestPathwaysContext): Promise => { + }: IngestPathwaysContext): Promise => { const request: IEsSearchRequest = { params: { index: dataStreamPattern, @@ -115,11 +116,11 @@ export const loadSignalData = const response = decodeOrThrow(signalResponseRT)(rawResponse); - return response.aggregations.relations.buckets.reduce( + return response.aggregations.relations.buckets.reduce( ( currentData, { - key: { agentId, dataStreamType, dataStreamDataset, dataStreamNamespace }, + key: { agentId: unsafeAgentId, dataStreamType, dataStreamDataset, dataStreamNamespace }, doc_count: signalCount, agent, } @@ -127,36 +128,47 @@ export const loadSignalData = const dataStreamId = `${dataStreamType}-${dataStreamDataset}-${dataStreamNamespace}`; if (currentData.dataStreams[dataStreamId] == null) { currentData.dataStreams[dataStreamId] = { + type: 'dataStreamStub', id: dataStreamId, }; } - if (currentData.agents[agentId] == null) { - const agentMetadata = agent.top[0]?.metrics; + const agentMetadata = agent.top[0]?.metrics; + const agentId = `${agentMetadata['agent.type'] ?? 'unknown'}-${unsafeAgentId}`; + const previousAgent = currentData.agents[agentId]; + + if (previousAgent == null) { currentData.agents[agentId] = { id: agentId, - type: agentMetadata['agent.type'], - name: agentMetadata['agent.name'], - version: agentMetadata['agent.version'], + type: agentMetadata['agent.type'] ?? 'unknown', + name: agentMetadata['agent.name'] ?? agentId, + version: agentMetadata['agent.version'] ?? 'unknown', + shipsTo: [ + { + dataStreamId, + signalCount, + }, + ], + }; + } else { + currentData.agents[agentId] = { + ...previousAgent, + shipsTo: [ + ...previousAgent.shipsTo, + { + dataStreamId, + signalCount, + }, + ], }; } - currentData.relations.push({ - type: 'agent-ships-to-data-stream', - agentId, - dataStreamId, - signalCount, - }); - return currentData; }, { dataStreams: {}, agents: {}, - relations: [], - indexTemplates: {}, - ingestPipelines: {}, - } as IngestPathwaysData + } ); }; @@ -176,9 +188,9 @@ const signalResponseRT = rt.strict({ top: rt.array( rt.strict({ metrics: rt.strict({ - 'agent.name': rt.string, - 'agent.type': rt.string, - 'agent.version': rt.string, + 'agent.name': rt.union([rt.string, rt.null]), + 'agent.type': rt.union([rt.string, rt.null]), + 'agent.version': rt.union([rt.string, rt.null]), }), }) ), diff --git a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/types.ts b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/types.ts index 0b3cafe466245..c75fca996f5db 100644 --- a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/types.ts +++ b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/types.ts @@ -11,11 +11,10 @@ export interface IngestPathwaysParameters { } export interface IngestPathwaysData { - dataStreams: Record; + dataStreams: Record; agents: Record; indexTemplates: Record; - ingestPipelines: Record; - relations: Relation[]; + ingestPipelines: Record; } export interface TimeRange { @@ -23,34 +22,43 @@ export interface TimeRange { to: string; } +export interface DataStreamStub { + type: 'dataStreamStub'; + id: string; +} + export interface DataStream { + type: 'dataStream'; id: string; - indexTemplateId?: string; + indexTemplateId: string; } +export type DataStreamEntry = DataStream | DataStreamStub; + export interface IndexTemplate { id: string; ingestPipelineIds: string[]; } +export interface IngestPipelineStub { + type: 'ingestPipelineStub'; + id: string; +} + export interface IngestPipeline { + type: 'ingestPipeline'; id: string; } +export type IngestPipelineEntry = IngestPipeline | IngestPipelineStub; + export interface Agent { id: string; type: string; name: string; version: string; + shipsTo: Array<{ + dataStreamId: string; + signalCount: number; + }>; } - -export type Relation = - | { - type: 'agent-ships-to-data-stream'; - agentId: string; - dataStreamId: string; - signalCount: number; - } - | { - type: 'unknown'; - }; diff --git a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/utils.ts b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/utils.ts index 20ef5abf27741..c51e6e6424764 100644 --- a/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/utils.ts +++ b/x-pack/plugins/observability_log_explorer/public/state_machines/ingest_pathways/utils.ts @@ -17,5 +17,4 @@ export const mergeIngestPathwaysData = ( dataStreams: { ...firstData.dataStreams, ...(secondData.dataStreams ?? {}) }, indexTemplates: { ...firstData.indexTemplates, ...(secondData.indexTemplates ?? {}) }, ingestPipelines: { ...firstData.ingestPipelines, ...(secondData.ingestPipelines ?? {}) }, - relations: [...firstData.relations, ...(secondData.relations ?? [])], });