diff --git a/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts index 812739db56c73..a9b667906fdf3 100644 --- a/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts +++ b/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -6,6 +6,7 @@ */ import { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types'; import { retryTransientEsErrors } from '../helpers/retry'; interface DataStreamManagementOptions { @@ -23,6 +24,7 @@ interface DeleteDataStreamOptions { interface RolloverDataStreamOptions { esClient: ElasticsearchClient; name: string; + mappings: MappingTypeMapping['properties'] | undefined; logger: Logger; } @@ -56,38 +58,37 @@ export async function rolloverDataStreamIfNecessary({ esClient, name, logger, + mappings, }: RolloverDataStreamOptions) { const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` }); for (const dataStream of dataStreams.data_streams) { - const currentMappings = - Object.values( - await esClient.indices.getMapping({ - index: dataStream.indices.at(-1)?.index_name, - }) - )[0].mappings.properties || {}; - const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name }); - const simulatedMappings = simulatedIndex.template.mappings.properties || {}; - - // check whether the same fields and same types are listed (don't check for other mapping attributes) - const isDifferent = - Object.values(simulatedMappings).length !== Object.values(currentMappings).length || - Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => { - const currentType = currentMappings[fieldName]?.type; - return currentType !== type; - }); - - if (!isDifferent) { + const writeIndex = dataStream.indices.at(-1); + if (!writeIndex) { continue; } - try { - await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), { - logger, - }); - logger.debug(() => `Rolled over data stream: ${dataStream.name}`); + await retryTransientEsErrors( + () => esClient.indices.putMapping({ index: writeIndex.index_name, properties: mappings }), + { + logger, + } + ); } catch (error: any) { - logger.error(`Error rolling over data stream: ${error.message}`); - throw error; + if ( + typeof error.message !== 'string' || + !error.message.includes('illegal_argument_exception') + ) { + throw error; + } + try { + await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), { + logger, + }); + logger.debug(() => `Rolled over data stream: ${dataStream.name}`); + } catch (rolloverError: any) { + logger.error(`Error rolling over data stream: ${error.message}`); + throw error; + } } } } diff --git a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts index 78a126905d9a4..c85d37df8267d 100644 --- a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts +++ b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts @@ -97,20 +97,30 @@ export async function listStreams({ scopedClusterClient }: ListStreamsParams) { sort: [{ id: 'asc' }], }); const definitions = response.hits.hits.map((hit) => hit.fields as { id: string[] }); - return definitions; + const hasAccess = await Promise.all( + definitions.map((definition) => checkReadAccess({ id: definition.id[0], scopedClusterClient })) + ); + return definitions.filter((_, index) => hasAccess[index]); } interface ReadStreamParams extends BaseParams { id: string; + skipAccessCheck?: boolean; } -export async function readStream({ id, scopedClusterClient }: ReadStreamParams) { +export async function readStream({ id, scopedClusterClient, skipAccessCheck }: ReadStreamParams) { try { const response = await scopedClusterClient.asInternalUser.get({ id, index: STREAMS_INDEX, }); const definition = response._source as StreamDefinition; + if (!skipAccessCheck) { + const hasAccess = await checkReadAccess({ id, scopedClusterClient }); + if (!hasAccess) { + throw new DefinitionNotFound(`Stream definition for ${id} not found.`); + } + } return { definition, }; @@ -130,7 +140,9 @@ export async function readAncestors({ id, scopedClusterClient }: ReadAncestorsPa const ancestorIds = getAncestors(id); return await Promise.all( - ancestorIds.map((ancestorId) => readStream({ scopedClusterClient, id: ancestorId })) + ancestorIds.map((ancestorId) => + readStream({ scopedClusterClient, id: ancestorId, skipAccessCheck: true }) + ) ); } @@ -223,6 +235,21 @@ export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamP } } +interface CheckReadAccessParams extends BaseParams { + id: string; +} + +export async function checkReadAccess({ + id, + scopedClusterClient, +}: CheckReadAccessParams): Promise { + try { + return await scopedClusterClient.asCurrentUser.indices.exists({ index: id }); + } catch (e) { + return false; + } +} + interface SyncStreamParams { scopedClusterClient: IScopedClusterClient; definition: StreamDefinition; @@ -236,10 +263,11 @@ export async function syncStream({ rootDefinition, logger, }: SyncStreamParams) { + const componentTemplate = generateLayer(definition.id, definition); await upsertComponent({ esClient: scopedClusterClient.asCurrentUser, logger, - component: generateLayer(definition.id, definition), + component: componentTemplate, }); await upsertIngestPipeline({ esClient: scopedClusterClient.asCurrentUser, @@ -282,5 +310,6 @@ export async function syncStream({ esClient: scopedClusterClient.asCurrentUser, name: definition.id, logger, + mappings: componentTemplate.template.mappings?.properties, }); } diff --git a/x-pack/plugins/streams/server/routes/streams/list.ts b/x-pack/plugins/streams/server/routes/streams/list.ts index 2e4f13a89bb41..6de57b4d25581 100644 --- a/x-pack/plugins/streams/server/routes/streams/list.ts +++ b/x-pack/plugins/streams/server/routes/streams/list.ts @@ -51,20 +51,22 @@ interface ListStreamDefinition { function asTrees(definitions: Array<{ id: string[] }>) { const trees: ListStreamDefinition[] = []; - definitions.forEach((definition) => { - const path = definition.id[0].split('.'); + const ids = definitions.map((definition) => definition.id[0]); + + ids.sort((a, b) => a.split('.').length - b.split('.').length); + + ids.forEach((id) => { let currentTree = trees; - path.forEach((_id, index) => { - const partialPath = path.slice(0, index + 1).join('.'); - const existingNode = currentTree.find((node) => node.id === partialPath); - if (existingNode) { - currentTree = existingNode.children; - } else { - const newNode = { id: partialPath, children: [] }; - currentTree.push(newNode); - currentTree = newNode.children; - } - }); + let existingNode: ListStreamDefinition | undefined; + // traverse the tree following the prefix of the current id. + // once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth + while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) { + currentTree = existingNode.children; + } + if (!existingNode) { + const newNode = { id, children: [] }; + currentTree.push(newNode); + } }); return trees; }