Skip to content

Commit

Permalink
place nice with ES
Browse files Browse the repository at this point in the history
  • Loading branch information
flash1293 committed Nov 14, 2024
1 parent ed002ec commit 6e87efd
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
import { retryTransientEsErrors } from '../helpers/retry';

interface DataStreamManagementOptions {
Expand All @@ -23,6 +24,7 @@ interface DeleteDataStreamOptions {
interface RolloverDataStreamOptions {
esClient: ElasticsearchClient;
name: string;
mappings: MappingTypeMapping['properties'] | undefined;
logger: Logger;
}

Expand Down Expand Up @@ -56,38 +58,37 @@ export async function rolloverDataStreamIfNecessary({
esClient,
name,
logger,
mappings,
}: RolloverDataStreamOptions) {
const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` });
for (const dataStream of dataStreams.data_streams) {
const currentMappings =
Object.values(
await esClient.indices.getMapping({
index: dataStream.indices.at(-1)?.index_name,
})
)[0].mappings.properties || {};
const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name });
const simulatedMappings = simulatedIndex.template.mappings.properties || {};

// check whether the same fields and same types are listed (don't check for other mapping attributes)
const isDifferent =
Object.values(simulatedMappings).length !== Object.values(currentMappings).length ||
Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => {
const currentType = currentMappings[fieldName]?.type;
return currentType !== type;
});

if (!isDifferent) {
const writeIndex = dataStream.indices.at(-1);
if (!writeIndex) {
continue;
}

try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
await retryTransientEsErrors(
() => esClient.indices.putMapping({ index: writeIndex.index_name, properties: mappings }),
{
logger,
}
);
} catch (error: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
if (
typeof error.message !== 'string' ||
!error.message.includes('illegal_argument_exception')
) {
throw error;
}
try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
} catch (rolloverError: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
}
}
}
}
37 changes: 33 additions & 4 deletions x-pack/plugins/streams/server/lib/streams/stream_crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,30 @@ export async function listStreams({ scopedClusterClient }: ListStreamsParams) {
sort: [{ id: 'asc' }],
});
const definitions = response.hits.hits.map((hit) => hit.fields as { id: string[] });
return definitions;
const hasAccess = await Promise.all(
definitions.map((definition) => checkReadAccess({ id: definition.id[0], scopedClusterClient }))
);
return definitions.filter((_, index) => hasAccess[index]);
}

interface ReadStreamParams extends BaseParams {
id: string;
skipAccessCheck?: boolean;
}

export async function readStream({ id, scopedClusterClient }: ReadStreamParams) {
export async function readStream({ id, scopedClusterClient, skipAccessCheck }: ReadStreamParams) {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
index: STREAMS_INDEX,
});
const definition = response._source as StreamDefinition;
if (!skipAccessCheck) {
const hasAccess = await checkReadAccess({ id, scopedClusterClient });
if (!hasAccess) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
return {
definition,
};
Expand All @@ -130,7 +140,9 @@ export async function readAncestors({ id, scopedClusterClient }: ReadAncestorsPa
const ancestorIds = getAncestors(id);

return await Promise.all(
ancestorIds.map((ancestorId) => readStream({ scopedClusterClient, id: ancestorId }))
ancestorIds.map((ancestorId) =>
readStream({ scopedClusterClient, id: ancestorId, skipAccessCheck: true })
)
);
}

Expand Down Expand Up @@ -223,6 +235,21 @@ export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamP
}
}

interface CheckReadAccessParams extends BaseParams {
id: string;
}

export async function checkReadAccess({
id,
scopedClusterClient,
}: CheckReadAccessParams): Promise<boolean> {
try {
return await scopedClusterClient.asCurrentUser.indices.exists({ index: id });
} catch (e) {
return false;
}
}

interface SyncStreamParams {
scopedClusterClient: IScopedClusterClient;
definition: StreamDefinition;
Expand All @@ -236,10 +263,11 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
const componentTemplate = generateLayer(definition.id, definition);
await upsertComponent({
esClient: scopedClusterClient.asCurrentUser,
logger,
component: generateLayer(definition.id, definition),
component: componentTemplate,
});
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
Expand Down Expand Up @@ -282,5 +310,6 @@ export async function syncStream({
esClient: scopedClusterClient.asCurrentUser,
name: definition.id,
logger,
mappings: componentTemplate.template.mappings?.properties,
});
}
28 changes: 15 additions & 13 deletions x-pack/plugins/streams/server/routes/streams/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,22 @@ interface ListStreamDefinition {

function asTrees(definitions: Array<{ id: string[] }>) {
const trees: ListStreamDefinition[] = [];
definitions.forEach((definition) => {
const path = definition.id[0].split('.');
const ids = definitions.map((definition) => definition.id[0]);

ids.sort((a, b) => a.split('.').length - b.split('.').length);

ids.forEach((id) => {
let currentTree = trees;
path.forEach((_id, index) => {
const partialPath = path.slice(0, index + 1).join('.');
const existingNode = currentTree.find((node) => node.id === partialPath);
if (existingNode) {
currentTree = existingNode.children;
} else {
const newNode = { id: partialPath, children: [] };
currentTree.push(newNode);
currentTree = newNode.children;
}
});
let existingNode: ListStreamDefinition | undefined;
// traverse the tree following the prefix of the current id.
// once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth
while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) {
currentTree = existingNode.children;
}
if (!existingNode) {
const newNode = { id, children: [] };
currentTree.push(newNode);
}
});
return trees;
}

0 comments on commit 6e87efd

Please sign in to comment.