Skip to content

Commit

Permalink
Avoid loading package saved objects into memory before deleting them
Browse files Browse the repository at this point in the history
  • Loading branch information
xcrzx committed Jul 11, 2024
1 parent 7950fb8 commit d70cfd6
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 36 deletions.
49 changes: 17 additions & 32 deletions x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { partition } from 'lodash';

import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import { KibanaAssetType, KibanaSavedObjectType } from '../../../../types';
import type { AssetReference, AssetParts, Installation, PackageSpecTags } from '../../../../types';
import type { AssetReference, Installation, PackageSpecTags } from '../../../../types';
import type { KibanaAssetReference, PackageInstallContext } from '../../../../../common/types';
import {
indexPatternTypes,
Expand Down Expand Up @@ -242,7 +242,8 @@ export async function installKibanaAssetsAndReferences({
}) {
const { savedObjectsImporter, savedObjectTagAssignmentService, savedObjectTagClient } =
getSpaceAwareSaveobjectsClients(spaceId);
const kibanaAssets = await getKibanaAssets(packageInstallContext);
// This is where the memory consumption is rising up in the first place
const kibanaAssets = getKibanaAssets(packageInstallContext);
if (installedPkg) {
await deleteKibanaSavedObjectsAssets({ savedObjectsClient, installedPkg, spaceId });
}
Expand Down Expand Up @@ -323,46 +324,30 @@ export async function deleteKibanaAssetsAndReferencesForSpace({
await saveKibanaAssetsRefs(savedObjectsClient, pkgName, [], true);
}

export async function getKibanaAssets(
export function getKibanaAssets(
packageInstallContext: PackageInstallContext
): Promise<Record<KibanaAssetType, ArchiveAsset[]>> {
): Record<KibanaAssetType, ArchiveAsset[]> {
const kibanaAssetTypes = Object.values(KibanaAssetType);
const isKibanaAssetType = (path: string) => {
const parts = getPathParts(path);

return parts.service === 'kibana' && (kibanaAssetTypes as string[]).includes(parts.type);
};

const filteredPaths = packageInstallContext.paths
.filter(isKibanaAssetType)
.map<[string, AssetParts]>((path) => [path, getPathParts(path)]);
const result = Object.fromEntries<ArchiveAsset[]>(
kibanaAssetTypes.map((type) => [type, []])
) as Record<KibanaAssetType, ArchiveAsset[]>;

const assetArrays: Array<Promise<ArchiveAsset[]>> = [];
for (const assetType of kibanaAssetTypes) {
const matching = filteredPaths.filter(([path, parts]) => parts.type === assetType);
packageInstallContext.paths.filter(isKibanaAssetType).forEach((path) => {
const buffer = getAssetFromAssetsMap(packageInstallContext.assetsMap, path);
const asset = JSON.parse(buffer.toString('utf8'));

assetArrays.push(
Promise.all(
matching.map(([path]) => {
const buffer = getAssetFromAssetsMap(packageInstallContext.assetsMap, path);

// cache values are buffers. convert to string / JSON
return JSON.parse(buffer.toString('utf8'));
})
)
);
}

const resolvedAssets = await Promise.all(assetArrays);

const result = {} as Record<KibanaAssetType, ArchiveAsset[]>;

for (const [index, assetType] of kibanaAssetTypes.entries()) {
const expectedType = KibanaSavedObjectTypeMapping[assetType];
const properlyTypedAssets = resolvedAssets[index].filter(({ type }) => type === expectedType);

result[assetType] = properlyTypedAssets;
}
const assetType = getPathParts(path).type as KibanaAssetType;
const soType = KibanaSavedObjectTypeMapping[assetType];
if (asset.type === soType) {
result[assetType].push(asset);
}
});

return result;
}
Expand Down
54 changes: 50 additions & 4 deletions x-pack/plugins/fleet/server/services/epm/packages/remove.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { SavedObjectsClient } from '@kbn/core/server';
import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants';

import { SavedObjectsUtils, SavedObjectsErrorHelpers } from '@kbn/core/server';
import minVersion from 'semver/ranges/min-version';

import { updateIndexSettings } from '../elasticsearch/index/update_settings';

Expand Down Expand Up @@ -43,6 +44,7 @@ import { auditLoggingService } from '../../audit_logging';
import { FleetError, PackageRemovalError } from '../../../errors';

import { populatePackagePolicyAssignedAgentsCount } from '../../package_policies/populate_package_policy_assigned_agents_count';
import * as Registry from '../registry';

import { getInstallation, kibanaSavedObjectTypes } from '.';

Expand Down Expand Up @@ -112,7 +114,14 @@ export async function removeInstallation(options: {
return installedAssets;
}

async function deleteKibanaAssets(
/**
* This method resolves saved objects before deleting them. It is needed when
* deleting assets that were installed in 7.x to mitigate the breaking change
* that occurred in 8.0. This is a memory-intensive operation as it requires
* loading all the saved objects into memory. It is generally better to delete
* assets directly if the package is known to be installed in 8.x or later.
*/
async function resolveAndDeleteKibanaAssets(
installedObjects: KibanaAssetReference[],
spaceId: string = DEFAULT_SPACE_ID
) {
Expand Down Expand Up @@ -152,6 +161,26 @@ async function deleteKibanaAssets(
return savedObjectsClient.bulkDelete(assetsToDelete, { namespace });
}

async function deleteKibanaAssets(
assetsToDelete: KibanaAssetReference[],
spaceId: string = DEFAULT_SPACE_ID
) {
const savedObjectsClient = new SavedObjectsClient(
appContextService.getSavedObjects().createInternalRepository()
);
const namespace = SavedObjectsUtils.namespaceStringToId(spaceId);

for (const asset of assetsToDelete) {
auditLoggingService.writeCustomSoAuditLog({
action: 'delete',
id: asset.id,
savedObjectType: asset.type,
});
}

return savedObjectsClient.bulkDelete(assetsToDelete, { namespace });
}

function deleteESAssets(
installedObjects: EsAssetReference[],
esClient: ElasticsearchClient
Expand Down Expand Up @@ -237,9 +266,9 @@ async function deleteAssets(
// then the other asset types
await Promise.all([
...deleteESAssets(otherAssets, esClient),
deleteKibanaAssets(installedKibana, spaceId),
resolveAndDeleteKibanaAssets(installedKibana, spaceId),
Object.entries(installedInAdditionalSpacesKibana).map(([additionalSpaceId, kibanaAssets]) =>
deleteKibanaAssets(kibanaAssets, additionalSpaceId)
resolveAndDeleteKibanaAssets(kibanaAssets, additionalSpaceId)
),
]);
} catch (err) {
Expand Down Expand Up @@ -299,8 +328,25 @@ export async function deleteKibanaSavedObjectsAssets({
.filter(({ type }) => kibanaSavedObjectTypes.includes(type))
.map(({ id, type }) => ({ id, type } as KibanaAssetReference));

const registryInfo = await Registry.fetchInfo(
installedPkg.attributes.name,
installedPkg.attributes.version
);

const minKibana = registryInfo.conditions?.kibana?.version
? minVersion(registryInfo.conditions.kibana.version)
: null;

try {
await deleteKibanaAssets(assetsToDelete, spaceIdToDelete);
// Compare Kibana versions to determine if the package could been installed
// only in 8.x or later. If so, we can skip SO resolution step altogether
// and delete the assets directly. Otherwise, we need to resolve the assets
// which might create high memory pressure if a package has a lot of assets.
if (minKibana && minKibana.major >= 8) {
await deleteKibanaAssets(assetsToDelete, spaceIdToDelete);
} else {
await resolveAndDeleteKibanaAssets(assetsToDelete, spaceIdToDelete);
}
} catch (err) {
// in the rollback case, partial installs are likely, so missing assets are not an error
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
Expand Down

0 comments on commit d70cfd6

Please sign in to comment.