Skip to content

Commit

Permalink
[8.x] [Fleet] [Security Solution] Install prebuilt rules package usin…
Browse files Browse the repository at this point in the history
…g stream-based approach (#195888) (#198936)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Fleet] [Security Solution] Install prebuilt rules package using
stream-based approach
(#195888)](#195888)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Dmitrii
Shevchenko","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-11-05T12:11:47Z","message":"[Fleet]
[Security Solution] Install prebuilt rules package using stream-based
approach (#195888)\n\n**Resolves:
https://github.com/elastic/kibana/issues/192350**\r\n\r\n##
Summary\r\n\r\nImplemented stream-based installation of the detection
rules package.\r\n\r\n**Background**: The installation of the detection
rules package was\r\ncausing OOM (Out of Memory) errors in Serverless
environments where the\r\navailable memory is limited to 1GB. The root
cause of the errors was\r\nthat during installation, the package was
being read and unzipped\r\nentirely into memory. Given the large package
size, this led to OOMs. To\r\naddress these memory issues, the following
changes were made:\r\n\r\n1. Added a branching logic to the
`installPackageFromRegistry` and\r\n`installPackageByUpload` methods,
where based on the package name is\r\ndecided to use streaming or not.
Only one `security_detection_engine`\r\npackage is currently hardcoded
to use streaming.\r\n2. In the state machine then defined a separate set
of steps for the\r\nstream-based package installation. It is reduced to
cover only Kibana\r\nassets installation at this stage.\r\n3. A new
`stepInstallKibanaAssetsWithStreaming` step is added to handle\r\nassets
installation. While this method still reads the package archive\r\ninto
memory (since unzipping from a readable stream is [not possible
due\r\nto the design of the
.zip\r\nformat](https://github.com/thejoshwolfe/yauzl?tab=readme-ov-file#no-streaming-unzip-api)),\r\nthe
package is unzipped using streams after being read into a
buffer.\r\nThis allows only a small portion of the archive (100 saved
objects at a\r\ntime) to be unpacked into memory, reducing memory
usage.\r\n4. The new method also includes several optimizations, such as
only\r\nremoving previously installed assets if they are missing in the
new\r\npackage and using `savedObjectClient.bulkCreate` instead of the
less\r\nefficient `savedObjectClient.import`.\r\n\r\n### Test
environment\r\n\r\n1. Prebuilt detection rules package with ~20k saved
objects; 118MB\r\nzipped.\r\n5. Local package registry.\r\n6. Production
build of Kibana running locally with a 700MB max old space\r\nlimit,
pointed to that registry.\r\n\r\nSetting up a test environment is not
completely straightforward. Here's\r\na rough outline of the
steps:\r\n<details>\r\n<summary>\r\nHow to test this
PR\r\n</summary>\r\n\r\n1. Create a package containing a large number of
prebuilt rules.\r\n1. I used the `package-storage` repository to find
one of the previously\r\nreleased prebuilt rules packages.\r\n2.
Multiplied the number of assets in the package to 20k
historical\r\nversions.\r\n 4. Built the package using `elastic-package
build`.\r\n2. Start a local package registry serving the built package
using\r\n`elastic-package stack up --services package-registry`.\r\n4.
Create a production build of Kibana. To speed up the
process,\r\nunnecessary artifacts can be skipped:\r\n ```\r\nnode
scripts/build --skip-cdn-assets
--skip-docker-ubi\r\n--skip-docker-ubuntu --skip-docker-wolfi
--skip-docker-fips\r\n ```\r\n7. Provide the built Kibana with a config
pointing to the local\r\nregistry. The config is located
in\r\n`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64/config/kibana.yml`.\r\nYou
can use the following config:\r\n ```\r\n csp.strict:
false\r\nxpack.security.encryptionKey: 've4Vohnu oa0Fu9ae Eethee8c
oDieg4do\r\nNohrah1u ao9Hu2oh Aeb4Ieyi
Aew1aegi'\r\nxpack.encryptedSavedObjects.encryptionKey: 'Shah7nai
Eew6izai Eir7OoW0\r\nGewi2ief eiSh8woo shoogh7E Quae6hal
ce6Oumah'\r\n\r\n
xpack.fleet.internal.registry.kibanaVersionCheckEnabled: false\r\n
xpack.fleet.registryUrl: https://localhost:8080\r\n\r\n
elasticsearch:\r\n username: 'kibana_system'\r\n password:
'changeme'\r\n hosts: 'http://localhost:9200'\r\n ```\r\n8. Override the
Node options Kibana starts with to allow it to connect\r\nto the local
registry and set the memory limit. For this, you need to\r\nedit the
`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64/bin/kibana`\r\nfile:\r\n
```\r\nNODE_OPTIONS=\"--no-warnings
--max-http-header-size=65536\r\n--unhandled-rejections=warn
--dns-result-order=ipv4first\r\n--openssl-legacy-provider
--max_old_space_size=700
--inspect\"\r\nNODE_ENV=production\r\nNODE_EXTRA_CA_CERTS=~/.elastic-package/profiles/default/certs/ca-cert.pem\r\nexec
\"${NODE}\" \"${DIR}/src/cli/dist\" \"${@}\"\r\n ```\r\n9. Navigate to
the build
folder:\r\n`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64`.\r\n10.
Start Kibana using `./bin/kibana`.\r\n11. Kibana is now running in debug
mode, with the debugger started on\r\nport 9229. You can connect to it
using VS Code's debug config or\r\nChrome's DevTools.\r\n12. Now you can
install prebuilt detection rules by calling the
`POST\r\n/internal/detection_engine/prebuilt_rules/_bootstrap` endpoint,
which\r\nuses the new streaming installation under the
hood.\r\n\r\n</details>\r\n\r\n### Test results locally\r\n\r\n**Without
the streaming approach**\r\n\r\nGuaranteed OOM. Even smaller packages,
up to 10k rules, caused sporadic\r\nOOM errors. So for comparison,
tested the package installation without\r\nmemory
limits.\r\n\r\n![Screenshot 2024-10-14 at 14
15\r\n26](https://github.com/user-attachments/assets/131cb877-2404-4638-b619-b1370a53659f)\r\n\r\n1.
Heap memory usage spikes up to 2.5GB\r\n5. External memory consumes up
to 450 Mb, which is four times the\r\narchive size\r\n13. RSS (Resident
Set Size) exceeds 4.5GB\r\n\r\n**With the streaming approach**\r\n\r\nNo
OOM errors observed. The memory consumption chart looks like
the\r\nfollowing:\r\n\r\n![Screenshot 2024-10-14 at 11
15\r\n21](https://github.com/user-attachments/assets/b47ba8c9-2ba7-42de-b921-c33104d4481e)\r\n\r\n1.
Heap memory remains stable, around 450MB, without any spikes.\r\n2.
External memory jumps to around 250MB at the beginning of
the\r\ninstallation, then drops to around 120MB, which is roughly equal
to the\r\npackage archive size. I couldn't determine why the external
memory\r\nconsumption exceeds the package size by 2x when the
installation starts.\r\nI checked the code for places where the package
might be loaded into\r\nmemory twice but found nothing suspicious. This
might be worth\r\ninvestigating further.\r\n3. RSS remains stable,
peaking slightly above 1GB. I believe this is the\r\nupper limit for a
package that can be handled without errors in a\r\nServerless
environment, where the memory limit is dictated by pod-level\r\nsettings
rather than Node settings and is set to 1GB. I'll verify this\r\non a
real Serverless instance to confirm.\r\n\r\n### Test results on
Serverless\r\n\r\n![Screenshot 2024-10-31 at 12
31\r\n34](https://github.com/user-attachments/assets/d20d2860-fa96-4e56-be2b-7b3c0b5c7b77)","sha":"67cdb93f5b800caac80672c942d04afe4d7aa4d8","branchLabelMapping":{"^v9.0.0$":"main","^v8.17.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["performance","release_note:skip","Team:Fleet","v9.0.0","Team:Detections
and Resp","Team: SecuritySolution","Team:Detection Rule
Management","Feature:Prebuilt Detection
Rules","ci:project-deploy-security","backport:version","v8.17.0"],"title":"[Fleet]
[Security Solution] Install prebuilt rules package using stream-based
approach","number":195888,"url":"https://github.com/elastic/kibana/pull/195888","mergeCommit":{"message":"[Fleet]
[Security Solution] Install prebuilt rules package using stream-based
approach (#195888)\n\n**Resolves:
https://github.com/elastic/kibana/issues/192350**\r\n\r\n##
Summary\r\n\r\nImplemented stream-based installation of the detection
rules package.\r\n\r\n**Background**: The installation of the detection
rules package was\r\ncausing OOM (Out of Memory) errors in Serverless
environments where the\r\navailable memory is limited to 1GB. The root
cause of the errors was\r\nthat during installation, the package was
being read and unzipped\r\nentirely into memory. Given the large package
size, this led to OOMs. To\r\naddress these memory issues, the following
changes were made:\r\n\r\n1. Added a branching logic to the
`installPackageFromRegistry` and\r\n`installPackageByUpload` methods,
where based on the package name is\r\ndecided to use streaming or not.
Only one `security_detection_engine`\r\npackage is currently hardcoded
to use streaming.\r\n2. In the state machine then defined a separate set
of steps for the\r\nstream-based package installation. It is reduced to
cover only Kibana\r\nassets installation at this stage.\r\n3. A new
`stepInstallKibanaAssetsWithStreaming` step is added to handle\r\nassets
installation. While this method still reads the package archive\r\ninto
memory (since unzipping from a readable stream is [not possible
due\r\nto the design of the
.zip\r\nformat](https://github.com/thejoshwolfe/yauzl?tab=readme-ov-file#no-streaming-unzip-api)),\r\nthe
package is unzipped using streams after being read into a
buffer.\r\nThis allows only a small portion of the archive (100 saved
objects at a\r\ntime) to be unpacked into memory, reducing memory
usage.\r\n4. The new method also includes several optimizations, such as
only\r\nremoving previously installed assets if they are missing in the
new\r\npackage and using `savedObjectClient.bulkCreate` instead of the
less\r\nefficient `savedObjectClient.import`.\r\n\r\n### Test
environment\r\n\r\n1. Prebuilt detection rules package with ~20k saved
objects; 118MB\r\nzipped.\r\n5. Local package registry.\r\n6. Production
build of Kibana running locally with a 700MB max old space\r\nlimit,
pointed to that registry.\r\n\r\nSetting up a test environment is not
completely straightforward. Here's\r\na rough outline of the
steps:\r\n<details>\r\n<summary>\r\nHow to test this
PR\r\n</summary>\r\n\r\n1. Create a package containing a large number of
prebuilt rules.\r\n1. I used the `package-storage` repository to find
one of the previously\r\nreleased prebuilt rules packages.\r\n2.
Multiplied the number of assets in the package to 20k
historical\r\nversions.\r\n 4. Built the package using `elastic-package
build`.\r\n2. Start a local package registry serving the built package
using\r\n`elastic-package stack up --services package-registry`.\r\n4.
Create a production build of Kibana. To speed up the
process,\r\nunnecessary artifacts can be skipped:\r\n ```\r\nnode
scripts/build --skip-cdn-assets
--skip-docker-ubi\r\n--skip-docker-ubuntu --skip-docker-wolfi
--skip-docker-fips\r\n ```\r\n7. Provide the built Kibana with a config
pointing to the local\r\nregistry. The config is located
in\r\n`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64/config/kibana.yml`.\r\nYou
can use the following config:\r\n ```\r\n csp.strict:
false\r\nxpack.security.encryptionKey: 've4Vohnu oa0Fu9ae Eethee8c
oDieg4do\r\nNohrah1u ao9Hu2oh Aeb4Ieyi
Aew1aegi'\r\nxpack.encryptedSavedObjects.encryptionKey: 'Shah7nai
Eew6izai Eir7OoW0\r\nGewi2ief eiSh8woo shoogh7E Quae6hal
ce6Oumah'\r\n\r\n
xpack.fleet.internal.registry.kibanaVersionCheckEnabled: false\r\n
xpack.fleet.registryUrl: https://localhost:8080\r\n\r\n
elasticsearch:\r\n username: 'kibana_system'\r\n password:
'changeme'\r\n hosts: 'http://localhost:9200'\r\n ```\r\n8. Override the
Node options Kibana starts with to allow it to connect\r\nto the local
registry and set the memory limit. For this, you need to\r\nedit the
`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64/bin/kibana`\r\nfile:\r\n
```\r\nNODE_OPTIONS=\"--no-warnings
--max-http-header-size=65536\r\n--unhandled-rejections=warn
--dns-result-order=ipv4first\r\n--openssl-legacy-provider
--max_old_space_size=700
--inspect\"\r\nNODE_ENV=production\r\nNODE_EXTRA_CA_CERTS=~/.elastic-package/profiles/default/certs/ca-cert.pem\r\nexec
\"${NODE}\" \"${DIR}/src/cli/dist\" \"${@}\"\r\n ```\r\n9. Navigate to
the build
folder:\r\n`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64`.\r\n10.
Start Kibana using `./bin/kibana`.\r\n11. Kibana is now running in debug
mode, with the debugger started on\r\nport 9229. You can connect to it
using VS Code's debug config or\r\nChrome's DevTools.\r\n12. Now you can
install prebuilt detection rules by calling the
`POST\r\n/internal/detection_engine/prebuilt_rules/_bootstrap` endpoint,
which\r\nuses the new streaming installation under the
hood.\r\n\r\n</details>\r\n\r\n### Test results locally\r\n\r\n**Without
the streaming approach**\r\n\r\nGuaranteed OOM. Even smaller packages,
up to 10k rules, caused sporadic\r\nOOM errors. So for comparison,
tested the package installation without\r\nmemory
limits.\r\n\r\n![Screenshot 2024-10-14 at 14
15\r\n26](https://github.com/user-attachments/assets/131cb877-2404-4638-b619-b1370a53659f)\r\n\r\n1.
Heap memory usage spikes up to 2.5GB\r\n5. External memory consumes up
to 450 Mb, which is four times the\r\narchive size\r\n13. RSS (Resident
Set Size) exceeds 4.5GB\r\n\r\n**With the streaming approach**\r\n\r\nNo
OOM errors observed. The memory consumption chart looks like
the\r\nfollowing:\r\n\r\n![Screenshot 2024-10-14 at 11
15\r\n21](https://github.com/user-attachments/assets/b47ba8c9-2ba7-42de-b921-c33104d4481e)\r\n\r\n1.
Heap memory remains stable, around 450MB, without any spikes.\r\n2.
External memory jumps to around 250MB at the beginning of
the\r\ninstallation, then drops to around 120MB, which is roughly equal
to the\r\npackage archive size. I couldn't determine why the external
memory\r\nconsumption exceeds the package size by 2x when the
installation starts.\r\nI checked the code for places where the package
might be loaded into\r\nmemory twice but found nothing suspicious. This
might be worth\r\ninvestigating further.\r\n3. RSS remains stable,
peaking slightly above 1GB. I believe this is the\r\nupper limit for a
package that can be handled without errors in a\r\nServerless
environment, where the memory limit is dictated by pod-level\r\nsettings
rather than Node settings and is set to 1GB. I'll verify this\r\non a
real Serverless instance to confirm.\r\n\r\n### Test results on
Serverless\r\n\r\n![Screenshot 2024-10-31 at 12
31\r\n34](https://github.com/user-attachments/assets/d20d2860-fa96-4e56-be2b-7b3c0b5c7b77)","sha":"67cdb93f5b800caac80672c942d04afe4d7aa4d8"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/195888","number":195888,"mergeCommit":{"message":"[Fleet]
[Security Solution] Install prebuilt rules package using stream-based
approach (#195888)\n\n**Resolves:
https://github.com/elastic/kibana/issues/192350**\r\n\r\n##
Summary\r\n\r\nImplemented stream-based installation of the detection
rules package.\r\n\r\n**Background**: The installation of the detection
rules package was\r\ncausing OOM (Out of Memory) errors in Serverless
environments where the\r\navailable memory is limited to 1GB. The root
cause of the errors was\r\nthat during installation, the package was
being read and unzipped\r\nentirely into memory. Given the large package
size, this led to OOMs. To\r\naddress these memory issues, the following
changes were made:\r\n\r\n1. Added a branching logic to the
`installPackageFromRegistry` and\r\n`installPackageByUpload` methods,
where based on the package name is\r\ndecided to use streaming or not.
Only one `security_detection_engine`\r\npackage is currently hardcoded
to use streaming.\r\n2. In the state machine then defined a separate set
of steps for the\r\nstream-based package installation. It is reduced to
cover only Kibana\r\nassets installation at this stage.\r\n3. A new
`stepInstallKibanaAssetsWithStreaming` step is added to handle\r\nassets
installation. While this method still reads the package archive\r\ninto
memory (since unzipping from a readable stream is [not possible
due\r\nto the design of the
.zip\r\nformat](https://github.com/thejoshwolfe/yauzl?tab=readme-ov-file#no-streaming-unzip-api)),\r\nthe
package is unzipped using streams after being read into a
buffer.\r\nThis allows only a small portion of the archive (100 saved
objects at a\r\ntime) to be unpacked into memory, reducing memory
usage.\r\n4. The new method also includes several optimizations, such as
only\r\nremoving previously installed assets if they are missing in the
new\r\npackage and using `savedObjectClient.bulkCreate` instead of the
less\r\nefficient `savedObjectClient.import`.\r\n\r\n### Test
environment\r\n\r\n1. Prebuilt detection rules package with ~20k saved
objects; 118MB\r\nzipped.\r\n5. Local package registry.\r\n6. Production
build of Kibana running locally with a 700MB max old space\r\nlimit,
pointed to that registry.\r\n\r\nSetting up a test environment is not
completely straightforward. Here's\r\na rough outline of the
steps:\r\n<details>\r\n<summary>\r\nHow to test this
PR\r\n</summary>\r\n\r\n1. Create a package containing a large number of
prebuilt rules.\r\n1. I used the `package-storage` repository to find
one of the previously\r\nreleased prebuilt rules packages.\r\n2.
Multiplied the number of assets in the package to 20k
historical\r\nversions.\r\n 4. Built the package using `elastic-package
build`.\r\n2. Start a local package registry serving the built package
using\r\n`elastic-package stack up --services package-registry`.\r\n4.
Create a production build of Kibana. To speed up the
process,\r\nunnecessary artifacts can be skipped:\r\n ```\r\nnode
scripts/build --skip-cdn-assets
--skip-docker-ubi\r\n--skip-docker-ubuntu --skip-docker-wolfi
--skip-docker-fips\r\n ```\r\n7. Provide the built Kibana with a config
pointing to the local\r\nregistry. The config is located
in\r\n`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64/config/kibana.yml`.\r\nYou
can use the following config:\r\n ```\r\n csp.strict:
false\r\nxpack.security.encryptionKey: 've4Vohnu oa0Fu9ae Eethee8c
oDieg4do\r\nNohrah1u ao9Hu2oh Aeb4Ieyi
Aew1aegi'\r\nxpack.encryptedSavedObjects.encryptionKey: 'Shah7nai
Eew6izai Eir7OoW0\r\nGewi2ief eiSh8woo shoogh7E Quae6hal
ce6Oumah'\r\n\r\n
xpack.fleet.internal.registry.kibanaVersionCheckEnabled: false\r\n
xpack.fleet.registryUrl: https://localhost:8080\r\n\r\n
elasticsearch:\r\n username: 'kibana_system'\r\n password:
'changeme'\r\n hosts: 'http://localhost:9200'\r\n ```\r\n8. Override the
Node options Kibana starts with to allow it to connect\r\nto the local
registry and set the memory limit. For this, you need to\r\nedit the
`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64/bin/kibana`\r\nfile:\r\n
```\r\nNODE_OPTIONS=\"--no-warnings
--max-http-header-size=65536\r\n--unhandled-rejections=warn
--dns-result-order=ipv4first\r\n--openssl-legacy-provider
--max_old_space_size=700
--inspect\"\r\nNODE_ENV=production\r\nNODE_EXTRA_CA_CERTS=~/.elastic-package/profiles/default/certs/ca-cert.pem\r\nexec
\"${NODE}\" \"${DIR}/src/cli/dist\" \"${@}\"\r\n ```\r\n9. Navigate to
the build
folder:\r\n`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64`.\r\n10.
Start Kibana using `./bin/kibana`.\r\n11. Kibana is now running in debug
mode, with the debugger started on\r\nport 9229. You can connect to it
using VS Code's debug config or\r\nChrome's DevTools.\r\n12. Now you can
install prebuilt detection rules by calling the
`POST\r\n/internal/detection_engine/prebuilt_rules/_bootstrap` endpoint,
which\r\nuses the new streaming installation under the
hood.\r\n\r\n</details>\r\n\r\n### Test results locally\r\n\r\n**Without
the streaming approach**\r\n\r\nGuaranteed OOM. Even smaller packages,
up to 10k rules, caused sporadic\r\nOOM errors. So for comparison,
tested the package installation without\r\nmemory
limits.\r\n\r\n![Screenshot 2024-10-14 at 14
15\r\n26](https://github.com/user-attachments/assets/131cb877-2404-4638-b619-b1370a53659f)\r\n\r\n1.
Heap memory usage spikes up to 2.5GB\r\n5. External memory consumes up
to 450 Mb, which is four times the\r\narchive size\r\n13. RSS (Resident
Set Size) exceeds 4.5GB\r\n\r\n**With the streaming approach**\r\n\r\nNo
OOM errors observed. The memory consumption chart looks like
the\r\nfollowing:\r\n\r\n![Screenshot 2024-10-14 at 11
15\r\n21](https://github.com/user-attachments/assets/b47ba8c9-2ba7-42de-b921-c33104d4481e)\r\n\r\n1.
Heap memory remains stable, around 450MB, without any spikes.\r\n2.
External memory jumps to around 250MB at the beginning of
the\r\ninstallation, then drops to around 120MB, which is roughly equal
to the\r\npackage archive size. I couldn't determine why the external
memory\r\nconsumption exceeds the package size by 2x when the
installation starts.\r\nI checked the code for places where the package
might be loaded into\r\nmemory twice but found nothing suspicious. This
might be worth\r\ninvestigating further.\r\n3. RSS remains stable,
peaking slightly above 1GB. I believe this is the\r\nupper limit for a
package that can be handled without errors in a\r\nServerless
environment, where the memory limit is dictated by pod-level\r\nsettings
rather than Node settings and is set to 1GB. I'll verify this\r\non a
real Serverless instance to confirm.\r\n\r\n### Test results on
Serverless\r\n\r\n![Screenshot 2024-10-31 at 12
31\r\n34](https://github.com/user-attachments/assets/d20d2860-fa96-4e56-be2b-7b3c0b5c7b77)","sha":"67cdb93f5b800caac80672c942d04afe4d7aa4d8"}},{"branch":"8.x","label":"v8.17.0","branchLabelMappingKey":"^v8.17.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Dmitrii Shevchenko <[email protected]>
  • Loading branch information
kibanamachine and xcrzx authored Nov 5, 2024
1 parent c969b0e commit 687595a
Show file tree
Hide file tree
Showing 41 changed files with 768 additions and 83 deletions.
15 changes: 15 additions & 0 deletions x-pack/plugins/fleet/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,25 @@ export type InstallablePackage = RegistryPackage | ArchivePackage;

export type AssetsMap = Map<string, Buffer | undefined>;

export interface ArchiveEntry {
path: string;
buffer?: Buffer;
}

export interface ArchiveIterator {
traverseEntries: (onEntry: (entry: ArchiveEntry) => Promise<void>) => Promise<void>;
getPaths: () => Promise<string[]>;
}

export interface PackageInstallContext {
packageInfo: InstallablePackage;
/**
* @deprecated Use `archiveIterator` to access the package archive entries
* without loading them all into memory at once.
*/
assetsMap: AssetsMap;
paths: string[];
archiveIterator: ArchiveIterator;
}

export type ArchivePackage = PackageSpecManifest &
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/fleet/server/routes/epm/file_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { getBundledPackageByPkgKey } from '../../services/epm/packages/bundled_p
import { getFile, getInstallation } from '../../services/epm/packages/get';
import type { FleetRequestHandlerContext } from '../..';
import { appContextService } from '../../services';
import { unpackBufferEntries } from '../../services/epm/archive';
import { unpackArchiveEntriesIntoMemory } from '../../services/epm/archive';
import { getAsset } from '../../services/epm/archive/storage';

import { getFileHandler } from './file_handler';
Expand All @@ -29,7 +29,7 @@ jest.mock('../../services/epm/packages/get');
const mockedGetBundledPackageByPkgKey = jest.mocked(getBundledPackageByPkgKey);
const mockedGetInstallation = jest.mocked(getInstallation);
const mockedGetFile = jest.mocked(getFile);
const mockedUnpackBufferEntries = jest.mocked(unpackBufferEntries);
const mockedUnpackBufferEntries = jest.mocked(unpackArchiveEntriesIntoMemory);
const mockedGetAsset = jest.mocked(getAsset);

function mockContext() {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/fleet/server/routes/epm/file_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { defaultFleetErrorHandler } from '../../errors';
import { getAsset } from '../../services/epm/archive/storage';
import { getBundledPackageByPkgKey } from '../../services/epm/packages/bundled_packages';
import { pkgToPkgKey } from '../../services/epm/registry';
import { unpackBufferEntries } from '../../services/epm/archive';
import { unpackArchiveEntriesIntoMemory } from '../../services/epm/archive';

const CACHE_CONTROL_10_MINUTES_HEADER: HttpResponseOptions['headers'] = {
'cache-control': 'max-age=600',
Expand Down Expand Up @@ -69,7 +69,7 @@ export const getFileHandler: FleetRequestHandler<
pkgToPkgKey({ name: pkgName, version: pkgVersion })
);
if (bundledPackage) {
const bufferEntries = await unpackBufferEntries(
const bufferEntries = await unpackArchiveEntriesIntoMemory(
await bundledPackage.getBuffer(),
'application/zip'
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
FleetRequestHandler,
InstallKibanaAssetsRequestSchema,
} from '../../types';
import { createArchiveIteratorFromMap } from '../../services/epm/archive/archive_iterator';

export const installPackageKibanaAssetsHandler: FleetRequestHandler<
TypeOf<typeof InstallKibanaAssetsRequestSchema.params>,
Expand Down Expand Up @@ -69,6 +70,7 @@ export const installPackageKibanaAssetsHandler: FleetRequestHandler<
packageInfo,
paths: installedPkgWithAssets.paths,
assetsMap: installedPkgWithAssets.assetsMap,
archiveIterator: createArchiveIteratorFromMap(installedPkgWithAssets.assetsMap),
},
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { AssetsMap, ArchiveIterator, ArchiveEntry } from '../../../../common/types';

import { traverseArchiveEntries } from '.';

/**
* Creates an iterator for traversing and extracting paths from an archive
* buffer. This iterator is intended to be used for memory efficient traversal
* of archive contents without extracting the entire archive into memory.
*
* @param archiveBuffer - The buffer containing the archive data.
* @param contentType - The content type of the archive (e.g.,
* 'application/zip').
* @returns ArchiveIterator instance.
*
*/
export const createArchiveIterator = (
archiveBuffer: Buffer,
contentType: string
): ArchiveIterator => {
const paths: string[] = [];

const traverseEntries = async (
onEntry: (entry: ArchiveEntry) => Promise<void>
): Promise<void> => {
await traverseArchiveEntries(archiveBuffer, contentType, async (entry) => {
await onEntry(entry);
});
};

const getPaths = async (): Promise<string[]> => {
if (paths.length) {
return paths;
}

await traverseEntries(async (entry) => {
paths.push(entry.path);
});

return paths;
};

return {
traverseEntries,
getPaths,
};
};

/**
* Creates an archive iterator from the assetsMap. This is a stop-gap solution
* to provide a uniform interface for traversing assets while assetsMap is still
* in use. It works with a map of assets loaded into memory and is not intended
* for use with large archives.
*
* @param assetsMap - A map where the keys are asset paths and the values are
* asset buffers.
* @returns ArchiveIterator instance.
*
*/
export const createArchiveIteratorFromMap = (assetsMap: AssetsMap): ArchiveIterator => {
const traverseEntries = async (
onEntry: (entry: ArchiveEntry) => Promise<void>
): Promise<void> => {
for (const [path, buffer] of assetsMap) {
await onEntry({ path, buffer });
}
};

const getPaths = async (): Promise<string[]> => {
return [...assetsMap.keys()];
};

return {
traverseEntries,
getPaths,
};
};
16 changes: 9 additions & 7 deletions x-pack/plugins/fleet/server/services/epm/archive/extract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import * as tar from 'tar';
import yauzl from 'yauzl';

import { bufferToStream, streamToBuffer } from '../streams';

import type { ArchiveEntry } from '.';
import type { ArchiveEntry } from '../../../../common/types';

export async function untarBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
onEntry = async (entry: ArchiveEntry): Promise<void> => {}
) {
const deflatedStream = bufferToStream(buffer);
// use tar.list vs .extract to avoid writing to disk
Expand All @@ -37,17 +36,20 @@ export async function untarBuffer(
export async function unzipBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
onEntry = async (entry: ArchiveEntry): Promise<void> => {}
): Promise<unknown> {
const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true });
zipfile.readEntry();
zipfile.on('entry', async (entry: yauzl.Entry) => {
const path = entry.fileName;
if (!filter({ path })) return zipfile.readEntry();

const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
onEntry({ buffer: entryBuffer, path });
zipfile.readEntry();
try {
const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
await onEntry({ buffer: entryBuffer, path });
} finally {
zipfile.readEntry();
}
});
return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject));
}
Expand Down
100 changes: 63 additions & 37 deletions x-pack/plugins/fleet/server/services/epm/archive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
* 2.0.
*/

import type { AssetParts, AssetsMap } from '../../../../common/types';
import type {
ArchiveEntry,
ArchiveIterator,
AssetParts,
AssetsMap,
} from '../../../../common/types';
import {
PackageInvalidArchiveError,
PackageUnsupportedMediaTypeError,
PackageNotFoundError,
} from '../../../errors';

import { createArchiveIterator } from './archive_iterator';

import { deletePackageInfo } from './cache';
import type { SharedKey } from './cache';
import { getBufferExtractor } from './extract';
Expand All @@ -20,66 +27,85 @@ export * from './cache';
export { getBufferExtractor, untarBuffer, unzipBuffer } from './extract';
export { generatePackageInfoFromArchiveBuffer } from './parse';

export interface ArchiveEntry {
path: string;
buffer?: Buffer;
}

export async function unpackBufferToAssetsMap({
name,
version,
contentType,
archiveBuffer,
useStreaming,
}: {
name: string;
version: string;
contentType: string;
archiveBuffer: Buffer;
}): Promise<{ paths: string[]; assetsMap: AssetsMap }> {
const assetsMap = new Map<string, Buffer | undefined>();
const paths: string[] = [];
const entries = await unpackBufferEntries(archiveBuffer, contentType);

entries.forEach((entry) => {
const { path, buffer } = entry;
if (buffer) {
assetsMap.set(path, buffer);
paths.push(path);
}
});

return { assetsMap, paths };
useStreaming: boolean | undefined;
}): Promise<{ paths: string[]; assetsMap: AssetsMap; archiveIterator: ArchiveIterator }> {
const archiveIterator = createArchiveIterator(archiveBuffer, contentType);
let paths: string[] = [];
let assetsMap: AssetsMap = new Map();
if (useStreaming) {
paths = await archiveIterator.getPaths();
// We keep the assetsMap empty as we don't want to load all the assets in memory
assetsMap = new Map();
} else {
const entries = await unpackArchiveEntriesIntoMemory(archiveBuffer, contentType);

entries.forEach((entry) => {
const { path, buffer } = entry;
if (buffer) {
assetsMap.set(path, buffer);
paths.push(path);
}
});
}

return { paths, assetsMap, archiveIterator };
}

export async function unpackBufferEntries(
/**
* This function extracts all archive entries into memory.
*
* NOTE: This is potentially dangerous for large archives and can cause OOM
* errors. Use 'traverseArchiveEntries' instead to iterate over the entries
* without storing them all in memory at once.
*
* @param archiveBuffer
* @param contentType
* @returns All the entries in the archive buffer
*/
export async function unpackArchiveEntriesIntoMemory(
archiveBuffer: Buffer,
contentType: string
): Promise<ArchiveEntry[]> {
const entries: ArchiveEntry[] = [];
const addToEntries = async (entry: ArchiveEntry) => void entries.push(entry);
await traverseArchiveEntries(archiveBuffer, contentType, addToEntries);

// While unpacking a tar.gz file with unzipBuffer() will result in a thrown
// error, unpacking a zip file with untarBuffer() just results in nothing.
if (entries.length === 0) {
throw new PackageInvalidArchiveError(
`Archive seems empty. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}
return entries;
}

export async function traverseArchiveEntries(
archiveBuffer: Buffer,
contentType: string,
onEntry: (entry: ArchiveEntry) => Promise<void>
) {
const bufferExtractor = getBufferExtractor({ contentType });
if (!bufferExtractor) {
throw new PackageUnsupportedMediaTypeError(
`Unsupported media type ${contentType}. Please use 'application/gzip' or 'application/zip'`
);
}
const entries: ArchiveEntry[] = [];
try {
const onlyFiles = ({ path }: ArchiveEntry): boolean => !path.endsWith('/');
const addToEntries = (entry: ArchiveEntry) => entries.push(entry);
await bufferExtractor(archiveBuffer, onlyFiles, addToEntries);
await bufferExtractor(archiveBuffer, onlyFiles, onEntry);
} catch (error) {
throw new PackageInvalidArchiveError(
`Error during extraction of package: ${error}. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}

// While unpacking a tar.gz file with unzipBuffer() will result in a thrown error in the try-catch above,
// unpacking a zip file with untarBuffer() just results in nothing.
if (entries.length === 0) {
throw new PackageInvalidArchiveError(
`Archive seems empty. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}
return entries;
}

export const deletePackageCache = ({ name, version }: SharedKey) => {
Expand Down
5 changes: 2 additions & 3 deletions x-pack/plugins/fleet/server/services/epm/archive/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import {
import { PackageInvalidArchiveError } from '../../../errors';
import { pkgToPkgKey } from '../registry';

import { unpackBufferEntries } from '.';
import { traverseArchiveEntries } from '.';

const readFileAsync = promisify(readFile);
export const MANIFEST_NAME = 'manifest.yml';
Expand Down Expand Up @@ -160,9 +160,8 @@ export async function generatePackageInfoFromArchiveBuffer(
contentType: string
): Promise<{ paths: string[]; packageInfo: ArchivePackage }> {
const assetsMap: AssetsBufferMap = {};
const entries = await unpackBufferEntries(archiveBuffer, contentType);
const paths: string[] = [];
entries.forEach(({ path: bufferPath, buffer }) => {
await traverseArchiveEntries(archiveBuffer, contentType, async ({ path: bufferPath, buffer }) => {
paths.push(bufferPath);
if (buffer && filterAssetPathForParseAndVerifyArchive(bufferPath)) {
assetsMap[bufferPath] = buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';

import { ASSETS_SAVED_OBJECT_TYPE } from '../../../../common';
import type {
ArchiveEntry,
InstallablePackage,
InstallSource,
PackageAssetReference,
Expand All @@ -24,7 +25,6 @@ import { PackageInvalidArchiveError, PackageNotFoundError } from '../../../error
import { appContextService } from '../../app_context';

import { setPackageInfo } from '.';
import type { ArchiveEntry } from '.';
import { filterAssetPathForParseAndVerifyArchive, parseAndVerifyArchive } from './parse';

const ONE_BYTE = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ import type {
PackageInfo,
} from '../../../../types';
import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import type { ArchiveEntry } from '../../archive';
import {
FLEET_FINAL_PIPELINE_CONTENT,
FLEET_FINAL_PIPELINE_ID,
FLEET_FINAL_PIPELINE_VERSION,
} from '../../../../constants';
import { getPipelineNameForDatastream } from '../../../../../common/services';
import type { PackageInstallContext } from '../../../../../common/types';
import type { ArchiveEntry, PackageInstallContext } from '../../../../../common/types';

import { appendMetadataToIngestPipeline } from '../meta';
import { retryTransientEsErrors } from '../retry';
Expand Down
Loading

0 comments on commit 687595a

Please sign in to comment.