From 8e036abf88d77e170c64ac57c63601b956c67fc4 Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Tue, 3 Dec 2024 04:32:16 +1100 Subject: [PATCH] =?UTF-8?q?[8.x]=20=F0=9F=8C=8A=20Play=20nice=20with=20ES?= =?UTF-8?q?=20(#200253)=20(#202516)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Backport This will backport the following commits from `main` to `8.x`: - [🌊 Play nice with ES (#200253)](https://github.com/elastic/kibana/pull/200253) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) Co-authored-by: Joe Reuter --- .../data_streams/manage_data_streams.ts | 51 ++++++++++--------- .../streams/server/lib/streams/stream_crud.ts | 27 +++++++++- .../streams/server/routes/streams/list.ts | 37 ++++++-------- 3 files changed, 67 insertions(+), 48 deletions(-) diff --git a/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts index 812739db56c73..a9b667906fdf3 100644 --- a/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts +++ b/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -6,6 +6,7 @@ */ import { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types'; import { retryTransientEsErrors } from '../helpers/retry'; interface DataStreamManagementOptions { @@ -23,6 +24,7 @@ interface DeleteDataStreamOptions { interface RolloverDataStreamOptions { esClient: ElasticsearchClient; name: string; + mappings: MappingTypeMapping['properties'] | undefined; logger: Logger; } @@ -56,38 +58,37 @@ export async function rolloverDataStreamIfNecessary({ esClient, name, logger, + mappings, }: RolloverDataStreamOptions) { const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` }); for (const dataStream of dataStreams.data_streams) { - const currentMappings = - Object.values( - await esClient.indices.getMapping({ - index: dataStream.indices.at(-1)?.index_name, - }) - )[0].mappings.properties || {}; - const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name }); - const simulatedMappings = simulatedIndex.template.mappings.properties || {}; - - // check whether the same fields and same types are listed (don't check for other mapping attributes) - const isDifferent = - Object.values(simulatedMappings).length !== Object.values(currentMappings).length || - Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => { - const currentType = currentMappings[fieldName]?.type; - return currentType !== type; - }); - - if (!isDifferent) { + const writeIndex = dataStream.indices.at(-1); + if (!writeIndex) { continue; } - try { - await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), { - logger, - }); - logger.debug(() => `Rolled over data stream: ${dataStream.name}`); + await retryTransientEsErrors( + () => esClient.indices.putMapping({ index: writeIndex.index_name, properties: mappings }), + { + logger, + } + ); } catch (error: any) { - logger.error(`Error rolling over data stream: ${error.message}`); - throw error; + if ( + typeof error.message !== 'string' || + !error.message.includes('illegal_argument_exception') + ) { + throw error; + } + try { + await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), { + logger, + }); + logger.debug(() => `Rolled over data stream: ${dataStream.name}`); + } catch (rolloverError: any) { + logger.error(`Error rolling over data stream: ${error.message}`); + throw error; + } } } } diff --git a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts index a74540cdcc62a..da5f74d3e69ed 100644 --- a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts +++ b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts @@ -112,6 +112,7 @@ export async function listStreams({ interface ReadStreamParams extends BaseParams { id: string; + skipAccessCheck?: boolean; } export interface ReadStreamResponse { @@ -121,6 +122,7 @@ export interface ReadStreamResponse { export async function readStream({ id, scopedClusterClient, + skipAccessCheck, }: ReadStreamParams): Promise { try { const response = await scopedClusterClient.asInternalUser.get({ @@ -128,6 +130,12 @@ export async function readStream({ index: STREAMS_INDEX, }); const definition = response._source as StreamDefinition; + if (!skipAccessCheck) { + const hasAccess = await checkReadAccess({ id, scopedClusterClient }); + if (!hasAccess) { + throw new DefinitionNotFound(`Stream definition for ${id} not found.`); + } + } return { definition, }; @@ -249,6 +257,21 @@ export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamP } } +interface CheckReadAccessParams extends BaseParams { + id: string; +} + +export async function checkReadAccess({ + id, + scopedClusterClient, +}: CheckReadAccessParams): Promise { + try { + return await scopedClusterClient.asCurrentUser.indices.exists({ index: id }); + } catch (e) { + return false; + } +} + interface SyncStreamParams { scopedClusterClient: IScopedClusterClient; definition: StreamDefinition; @@ -262,10 +285,11 @@ export async function syncStream({ rootDefinition, logger, }: SyncStreamParams) { + const componentTemplate = generateLayer(definition.id, definition); await upsertComponent({ esClient: scopedClusterClient.asCurrentUser, logger, - component: generateLayer(definition.id, definition), + component: componentTemplate, }); await upsertIngestPipeline({ esClient: scopedClusterClient.asCurrentUser, @@ -308,5 +332,6 @@ export async function syncStream({ esClient: scopedClusterClient.asCurrentUser, name: definition.id, logger, + mappings: componentTemplate.template.mappings?.properties, }); } diff --git a/x-pack/plugins/streams/server/routes/streams/list.ts b/x-pack/plugins/streams/server/routes/streams/list.ts index bd6a5200fe9ba..d3b88ffc36a45 100644 --- a/x-pack/plugins/streams/server/routes/streams/list.ts +++ b/x-pack/plugins/streams/server/routes/streams/list.ts @@ -52,32 +52,25 @@ export interface StreamTree { children: StreamTree[]; } -function asTrees(definitions: StreamDefinition[]): StreamTree[] { - const nodes = new Map(); +function asTrees(definitions: Array<{ id: string }>) { + const trees: StreamTree[] = []; + const ids = definitions.map((definition) => definition.id); - const rootNodes = new Set(); + ids.sort((a, b) => a.split('.').length - b.split('.').length); - function getNode(id: string) { - let node = nodes.get(id); - if (!node) { - node = { id, children: [] }; - nodes.set(id, node); + ids.forEach((id) => { + let currentTree = trees; + let existingNode: StreamTree | undefined; + // traverse the tree following the prefix of the current id. + // once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth + while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) { + currentTree = existingNode.children; } - return node; - } - - definitions.forEach((definition) => { - const path = definition.id.split('.'); - const parentId = path.slice(0, path.length - 1).join('.'); - const parentNode = parentId.length ? getNode(parentId) : undefined; - const selfNode = getNode(definition.id); - - if (parentNode) { - parentNode.children.push(selfNode); - } else { - rootNodes.add(selfNode); + if (!existingNode) { + const newNode = { id, children: [] }; + currentTree.push(newNode); } }); - return Array.from(rootNodes.values()); + return trees; }