Skip to content

Commit

Permalink
Avoid loading package saved objects into memory before deleting them
Browse files Browse the repository at this point in the history
  • Loading branch information
xcrzx committed Jul 10, 2024
1 parent 045aafc commit 70f2487
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
} from '../utils';
import type { ApiExecutionContext } from '../types';
import type { RepositoryEsClient } from '../../repository_es_client';
import { includedFields } from '../../utils';

const MAX_CONCURRENT_RESOLVE = 10;

Expand Down Expand Up @@ -140,7 +141,7 @@ export async function internalBulkResolve<T>(

const bulkGetResponse = docsToBulkGet.length
? await client.mget<SavedObjectsRawDocSource>(
{ body: { docs: docsToBulkGet } },
{ body: { docs: docsToBulkGet }, _source: includedFields(allowedTypes, options.fields) },
{ ignore: [404], meta: true }
)
: undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type { SavedObject } from '../..';
export interface SavedObjectsResolveOptions extends SavedObjectsBaseOptions {
/** {@link SavedObjectsRawDocParseOptions.migrationVersionCompatibility} */
migrationVersionCompatibility?: 'compatible' | 'raw';
fields?: string[];
}

/**
Expand Down
49 changes: 17 additions & 32 deletions x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { partition } from 'lodash';

import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import { KibanaAssetType, KibanaSavedObjectType } from '../../../../types';
import type { AssetReference, AssetParts, Installation, PackageSpecTags } from '../../../../types';
import type { AssetReference, Installation, PackageSpecTags } from '../../../../types';
import type { KibanaAssetReference, PackageInstallContext } from '../../../../../common/types';
import {
indexPatternTypes,
Expand Down Expand Up @@ -242,7 +242,8 @@ export async function installKibanaAssetsAndReferences({
}) {
const { savedObjectsImporter, savedObjectTagAssignmentService, savedObjectTagClient } =
getSpaceAwareSaveobjectsClients(spaceId);
const kibanaAssets = await getKibanaAssets(packageInstallContext);
// This is where the memory consumption is rising up in the first place
const kibanaAssets = getKibanaAssets(packageInstallContext);
if (installedPkg) {
await deleteKibanaSavedObjectsAssets({ savedObjectsClient, installedPkg, spaceId });
}
Expand Down Expand Up @@ -323,46 +324,30 @@ export async function deleteKibanaAssetsAndReferencesForSpace({
await saveKibanaAssetsRefs(savedObjectsClient, pkgName, [], true);
}

export async function getKibanaAssets(
export function getKibanaAssets(
packageInstallContext: PackageInstallContext
): Promise<Record<KibanaAssetType, ArchiveAsset[]>> {
): Record<KibanaAssetType, ArchiveAsset[]> {
const kibanaAssetTypes = Object.values(KibanaAssetType);
const isKibanaAssetType = (path: string) => {
const parts = getPathParts(path);

return parts.service === 'kibana' && (kibanaAssetTypes as string[]).includes(parts.type);
};

const filteredPaths = packageInstallContext.paths
.filter(isKibanaAssetType)
.map<[string, AssetParts]>((path) => [path, getPathParts(path)]);
const result = Object.fromEntries<ArchiveAsset[]>(
kibanaAssetTypes.map((type) => [type, []])
) as Record<KibanaAssetType, ArchiveAsset[]>;

const assetArrays: Array<Promise<ArchiveAsset[]>> = [];
for (const assetType of kibanaAssetTypes) {
const matching = filteredPaths.filter(([path, parts]) => parts.type === assetType);
packageInstallContext.paths.filter(isKibanaAssetType).forEach((path) => {
const buffer = getAssetFromAssetsMap(packageInstallContext.assetsMap, path);
const asset = JSON.parse(buffer.toString('utf8'));

assetArrays.push(
Promise.all(
matching.map(([path]) => {
const buffer = getAssetFromAssetsMap(packageInstallContext.assetsMap, path);

// cache values are buffers. convert to string / JSON
return JSON.parse(buffer.toString('utf8'));
})
)
);
}

const resolvedAssets = await Promise.all(assetArrays);

const result = {} as Record<KibanaAssetType, ArchiveAsset[]>;

for (const [index, assetType] of kibanaAssetTypes.entries()) {
const expectedType = KibanaSavedObjectTypeMapping[assetType];
const properlyTypedAssets = resolvedAssets[index].filter(({ type }) => type === expectedType);

result[assetType] = properlyTypedAssets;
}
const assetType = getPathParts(path).type as KibanaAssetType;
const soType = KibanaSavedObjectTypeMapping[assetType];
if (asset.type === soType) {
result[assetType].push(asset);
}
});

return result;
}
Expand Down
27 changes: 23 additions & 4 deletions x-pack/plugins/fleet/server/services/epm/packages/remove.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { SavedObjectsClient } from '@kbn/core/server';
import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants';

import { SavedObjectsUtils, SavedObjectsErrorHelpers } from '@kbn/core/server';
import minVersion from 'semver/ranges/min-version';

import { updateIndexSettings } from '../elasticsearch/index/update_settings';

Expand Down Expand Up @@ -43,6 +44,7 @@ import { auditLoggingService } from '../../audit_logging';
import { FleetError, PackageRemovalError } from '../../../errors';

import { populatePackagePolicyAssignedAgentsCount } from '../../package_policies/populate_package_policy_assigned_agents_count';
import * as Registry from '../registry';

import { getInstallation, kibanaSavedObjectTypes } from '.';

Expand Down Expand Up @@ -112,7 +114,7 @@ export async function removeInstallation(options: {
return installedAssets;
}

async function deleteKibanaAssets(
async function resolveAndDeleteKibanaAssets(
installedObjects: KibanaAssetReference[],
spaceId: string = DEFAULT_SPACE_ID
) {
Expand Down Expand Up @@ -237,9 +239,9 @@ async function deleteAssets(
// then the other asset types
await Promise.all([
...deleteESAssets(otherAssets, esClient),
deleteKibanaAssets(installedKibana, spaceId),
resolveAndDeleteKibanaAssets(installedKibana, spaceId),
Object.entries(installedInAdditionalSpacesKibana).map(([additionalSpaceId, kibanaAssets]) =>
deleteKibanaAssets(kibanaAssets, additionalSpaceId)
resolveAndDeleteKibanaAssets(kibanaAssets, additionalSpaceId)
),
]);
} catch (err) {
Expand Down Expand Up @@ -299,8 +301,25 @@ export async function deleteKibanaSavedObjectsAssets({
.filter(({ type }) => kibanaSavedObjectTypes.includes(type))
.map(({ id, type }) => ({ id, type } as KibanaAssetReference));

const registryInfo = await Registry.fetchInfo(
installedPkg.attributes.name,
installedPkg.attributes.version
);

const minKibana = registryInfo.conditions?.kibana?.version
? minVersion(registryInfo.conditions.kibana.version)
: null;

try {
await deleteKibanaAssets(assetsToDelete, spaceIdToDelete);
// Compare Kibana versions to determine if the package could have been
// installed in 7.x. If so, we need to resolve legacy SO references before
// deleting them.
if (minKibana && minKibana.major >= 8) {
const namespace = SavedObjectsUtils.namespaceStringToId(spaceIdToDelete ?? DEFAULT_SPACE_ID);
await savedObjectsClient.bulkDelete(assetsToDelete, { namespace });
} else {
await resolveAndDeleteKibanaAssets(assetsToDelete, spaceIdToDelete);
}
} catch (err) {
// in the rollback case, partial installs are likely, so missing assets are not an error
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
Expand Down

0 comments on commit 70f2487

Please sign in to comment.