Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌊 Play nice with ES #200253

Merged
merged 11 commits into from
Nov 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
import { retryTransientEsErrors } from '../helpers/retry';

interface DataStreamManagementOptions {
Expand All @@ -23,6 +24,7 @@ interface DeleteDataStreamOptions {
interface RolloverDataStreamOptions {
esClient: ElasticsearchClient;
name: string;
mappings: MappingTypeMapping['properties'] | undefined;
logger: Logger;
}

Expand Down Expand Up @@ -56,38 +58,37 @@ export async function rolloverDataStreamIfNecessary({
esClient,
name,
logger,
mappings,
}: RolloverDataStreamOptions) {
const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` });
for (const dataStream of dataStreams.data_streams) {
const currentMappings =
Object.values(
await esClient.indices.getMapping({
index: dataStream.indices.at(-1)?.index_name,
})
)[0].mappings.properties || {};
const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name });
const simulatedMappings = simulatedIndex.template.mappings.properties || {};

// check whether the same fields and same types are listed (don't check for other mapping attributes)
const isDifferent =
Object.values(simulatedMappings).length !== Object.values(currentMappings).length ||
Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => {
const currentType = currentMappings[fieldName]?.type;
return currentType !== type;
});

if (!isDifferent) {
const writeIndex = dataStream.indices.at(-1);
if (!writeIndex) {
continue;
}

try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
await retryTransientEsErrors(
() => esClient.indices.putMapping({ index: writeIndex.index_name, properties: mappings }),
{
logger,
}
);
} catch (error: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
if (
typeof error.message !== 'string' ||
!error.message.includes('illegal_argument_exception')
) {
throw error;
}
try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
} catch (rolloverError: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
}
}
}
}
37 changes: 33 additions & 4 deletions x-pack/plugins/streams/server/lib/streams/stream_crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,30 @@ export async function listStreams({ scopedClusterClient }: ListStreamsParams) {
sort: [{ id: 'asc' }],
});
const definitions = response.hits.hits.map((hit) => hit.fields as { id: string[] });
return definitions;
const hasAccess = await Promise.all(
definitions.map((definition) => checkReadAccess({ id: definition.id[0], scopedClusterClient }))
);
return definitions.filter((_, index) => hasAccess[index]);
}

interface ReadStreamParams extends BaseParams {
id: string;
skipAccessCheck?: boolean;
}

export async function readStream({ id, scopedClusterClient }: ReadStreamParams) {
export async function readStream({ id, scopedClusterClient, skipAccessCheck }: ReadStreamParams) {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
index: STREAMS_INDEX,
});
const definition = response._source as StreamDefinition;
if (!skipAccessCheck) {
const hasAccess = await checkReadAccess({ id, scopedClusterClient });
if (!hasAccess) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
return {
definition,
};
Expand All @@ -130,7 +140,9 @@ export async function readAncestors({ id, scopedClusterClient }: ReadAncestorsPa
const ancestorIds = getAncestors(id);

return await Promise.all(
ancestorIds.map((ancestorId) => readStream({ scopedClusterClient, id: ancestorId }))
ancestorIds.map((ancestorId) =>
readStream({ scopedClusterClient, id: ancestorId, skipAccessCheck: true })
)
);
}

Expand Down Expand Up @@ -223,6 +235,21 @@ export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamP
}
}

interface CheckReadAccessParams extends BaseParams {
id: string;
}

export async function checkReadAccess({
id,
scopedClusterClient,
}: CheckReadAccessParams): Promise<boolean> {
try {
return await scopedClusterClient.asCurrentUser.indices.exists({ index: id });
} catch (e) {
return false;
}
}

interface SyncStreamParams {
scopedClusterClient: IScopedClusterClient;
definition: StreamDefinition;
Expand All @@ -236,10 +263,11 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
const componentTemplate = generateLayer(definition.id, definition);
await upsertComponent({
esClient: scopedClusterClient.asCurrentUser,
logger,
component: generateLayer(definition.id, definition),
component: componentTemplate,
});
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
Expand Down Expand Up @@ -282,5 +310,6 @@ export async function syncStream({
esClient: scopedClusterClient.asCurrentUser,
name: definition.id,
logger,
mappings: componentTemplate.template.mappings?.properties,
});
}
28 changes: 15 additions & 13 deletions x-pack/plugins/streams/server/routes/streams/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,22 @@ interface ListStreamDefinition {

function asTrees(definitions: Array<{ id: string[] }>) {
const trees: ListStreamDefinition[] = [];
definitions.forEach((definition) => {
const path = definition.id[0].split('.');
const ids = definitions.map((definition) => definition.id[0]);

ids.sort((a, b) => a.split('.').length - b.split('.').length);

ids.forEach((id) => {
let currentTree = trees;
path.forEach((_id, index) => {
const partialPath = path.slice(0, index + 1).join('.');
const existingNode = currentTree.find((node) => node.id === partialPath);
if (existingNode) {
currentTree = existingNode.children;
} else {
const newNode = { id: partialPath, children: [] };
currentTree.push(newNode);
currentTree = newNode.children;
}
});
let existingNode: ListStreamDefinition | undefined;
// traverse the tree following the prefix of the current id.
// once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth
while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) {
currentTree = existingNode.children;
}
if (!existingNode) {
const newNode = { id, children: [] };
currentTree.push(newNode);
}
});
return trees;
}