Skip to content

Commit

Permalink
[EDR Workflows] Osquery Live Query PIT Search (#171556)
Browse files Browse the repository at this point in the history
Closes #169726

Current implementation do not use Point In Time search but tries to
paginate through the responses to fetch all available agents with a page
size of 9000. This is causing the error reported in the linked ticket
when we try to fetch results from 9,000 to 18,000


![277793068-f777179c-7eab-4349-8b5b-2afdf9318769](https://github.com/elastic/kibana/assets/29123534/4a75d685-5042-4387-ba5e-dd0585904f85)


This PR adds Point In Time search functionality when trying to fetch:
1. All agents
2. All agents of a policy
3. All agents of a platform

Since I believe querying 10k+ agents plus isn't done often I always
start with no PIT search and if pages total is greater than 1 I refetch,
this time, with PIT and searchAfter params.

Tested for packs and single query live queries.




https://github.com/elastic/kibana/assets/29123534/b0bb831c-9633-4fbb-b0a8-291889f68543



https://github.com/elastic/kibana/assets/29123534/49393627-e3c5-4346-ab74-287fdde9ecf8



https://github.com/elastic/kibana/assets/29123534/62063fa4-b2cd-4b6d-9976-23c89501652f
  • Loading branch information
szwarckonrad authored Nov 30, 2023
1 parent 2eba909 commit 25177aa
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ export const createActionHandler = async (

const { soClient, metadata, alertData, error } = options;
const savedObjectsClient = soClient ?? coreStartServices.savedObjects.createInternalRepository();

const elasticsearchClient = coreStartServices.elasticsearch.client.asInternalUser;
// eslint-disable-next-line @typescript-eslint/naming-convention
const { agent_all, agent_ids, agent_platforms, agent_policy_ids } = params;
const selectedAgents = await parseAgentSelection(internalSavedObjectsClient, osqueryContext, {
agents: agent_ids,
allAgentsSelected: !!agent_all,
platformsSelected: agent_platforms,
policiesSelected: agent_policy_ids,
});
const selectedAgents = await parseAgentSelection(
internalSavedObjectsClient,
elasticsearchClient,
osqueryContext,
{
agents: agent_ids,
allAgentsSelected: !!agent_all,
platformsSelected: agent_platforms,
policiesSelected: agent_policy_ids,
}
);

if (!selectedAgents.length) {
throw new CustomHttpRequestError('No agents found for selection', 400);
Expand Down
79 changes: 79 additions & 0 deletions x-pack/plugins/osquery/server/lib/parse_agent_groups.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { aggregateResults } from './parse_agent_groups';
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import type { OsqueryAppContext } from './osquery_app_context_services';

const mockOpenPointInTime = jest.fn().mockResolvedValue({ id: 'mockedPitId' });
const mockClosePointInTime = jest.fn();

const mockElasticsearchClient = {
openPointInTime: mockOpenPointInTime,
closePointInTime: mockClosePointInTime,
} as unknown as ElasticsearchClientMock;

const mockContext = {} as unknown as OsqueryAppContext;

describe('aggregateResults', () => {
it('should handle one page of results', async () => {
const generatorMock = jest.fn().mockResolvedValue({
results: ['result1', 'result2'],
total: 2,
});

const result = await aggregateResults(generatorMock, mockElasticsearchClient, mockContext);

expect(generatorMock).toHaveBeenCalledWith(1, expect.any(Number)); // 1st page, PER_PAGE
expect(mockOpenPointInTime).not.toHaveBeenCalled();
expect(mockClosePointInTime).not.toHaveBeenCalled();

expect(result).toEqual(['result1', 'result2']);
});

it('should handle multiple pages of results', async () => {
const generateResults = (run = 1, length = 9000) =>
Array.from({ length }, (_, index) => `result_${index + 1 + (run - 1) * length}`);

const generatorMock = jest
.fn()
.mockResolvedValueOnce({
results: generateResults(),
total: 18001,
})
.mockResolvedValueOnce({
results: generateResults(),
total: 18001,
searchAfter: ['firstSort'],
})
.mockResolvedValueOnce({
results: generateResults(2),
total: 18001,
searchAfter: ['secondSort'],
})
.mockResolvedValueOnce({
results: ['result_18001'],
total: 18001,
searchAfter: ['thirdSort'],
});

const result = await aggregateResults(generatorMock, mockElasticsearchClient, mockContext);
expect(generatorMock).toHaveBeenCalledWith(1, expect.any(Number));
expect(generatorMock).toHaveBeenCalledWith(1, expect.any(Number), undefined, 'mockedPitId');
expect(generatorMock).toHaveBeenCalledWith(2, expect.any(Number), ['firstSort'], 'mockedPitId');
expect(generatorMock).toHaveBeenCalledWith(
3,
expect.any(Number),
['secondSort'],
'mockedPitId'
);
expect(mockOpenPointInTime).toHaveBeenCalledTimes(1);
expect(mockClosePointInTime).toHaveBeenCalledTimes(1);
expect(mockClosePointInTime).toHaveBeenCalledWith({ id: 'mockedPitId' });
expect(result.length).toEqual(18001);
});
});
136 changes: 101 additions & 35 deletions x-pack/plugins/osquery/server/lib/parse_agent_groups.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
*/

import { uniq } from 'lodash';
import type { SavedObjectsClientContract } from '@kbn/core/server';
import { PACKAGE_POLICY_SAVED_OBJECT_TYPE } from '@kbn/fleet-plugin/common';
import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server';
import { AGENTS_INDEX, PACKAGE_POLICY_SAVED_OBJECT_TYPE } from '@kbn/fleet-plugin/common';
import { OSQUERY_INTEGRATION_NAME } from '../../common';
import type { OsqueryAppContext } from './osquery_app_context_services';

Expand All @@ -20,22 +20,64 @@ export interface AgentSelection {

const PER_PAGE = 9000;

const aggregateResults = async (
generator: (page: number, perPage: number) => Promise<{ results: string[]; total: number }>
export const aggregateResults = async (
generator: (
page: number,
perPage: number,
searchAfter?: unknown[],
pitId?: string
) => Promise<{ results: string[]; total: number; searchAfter?: unknown[] }>,
esClient: ElasticsearchClient,
context: OsqueryAppContext
) => {
const { results, total } = await generator(1, PER_PAGE);
let results: string[];
const { results: initialResults, total } = await generator(1, PER_PAGE);
const totalPages = Math.ceil(total / PER_PAGE);
let currPage = 2;
while (currPage <= totalPages) {
const { results: additionalResults } = await generator(currPage++, PER_PAGE);
results.push(...additionalResults);
if (totalPages === 1) {
// One page only, no need for PIT
results = initialResults;
} else {
const { id: pitId } = await esClient.openPointInTime({
index: AGENTS_INDEX,
keep_alive: '10m',
});
let currentSort: unknown[] | undefined;
// Refetch first page with PIT
const { results: pitInitialResults, searchAfter } = await generator(
1,
PER_PAGE,
currentSort, // No searchAfter for first page, its built based on first page results
pitId
);
results = pitInitialResults;
currentSort = searchAfter;
let currPage = 2;
while (currPage <= totalPages) {
const { results: additionalResults, searchAfter: additionalSearchAfter } = await generator(
currPage++,
PER_PAGE,
currentSort,
pitId
);
results.push(...additionalResults);
currentSort = additionalSearchAfter;
}

try {
await esClient.closePointInTime({ id: pitId });
} catch (error) {
context.logFactory
.get()
.warn(`Error closing point in time with id: ${pitId}. Error: ${error.message}`);
}
}

return uniq<string>(results);
};

export const parseAgentSelection = async (
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
context: OsqueryAppContext,
agentSelection: AgentSelection
) => {
Expand All @@ -52,28 +94,42 @@ export const parseAgentSelection = async (
const kueryFragments = ['status:online'];

if (agentService && packagePolicyService) {
const osqueryPolicies = await aggregateResults(async (page, perPage) => {
const { items, total } = await packagePolicyService.list(soClient, {
kuery: `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name:${OSQUERY_INTEGRATION_NAME}`,
perPage,
page,
});
const osqueryPolicies = await aggregateResults(
async (page, perPage) => {
const { items, total } = await packagePolicyService.list(soClient, {
kuery: `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name:${OSQUERY_INTEGRATION_NAME}`,
perPage,
page,
});

return { results: items.map((it) => it.policy_id), total };
});
return { results: items.map((it) => it.policy_id), total };
},
esClient,
context
);
kueryFragments.push(`policy_id:(${uniq(osqueryPolicies).join(' or ')})`);
if (allAgentsSelected) {
const kuery = kueryFragments.join(' and ');
const fetchedAgents = await aggregateResults(async (page, perPage) => {
const res = await agentService.listAgents({
perPage,
page,
kuery,
showInactive: false,
});
const fetchedAgents = await aggregateResults(
async (page, perPage, searchAfter?: unknown[], pitId?: string) => {
const res = await agentService.listAgents({
...(searchAfter ? { searchAfter } : {}),
...(pitId ? { pitId } : {}),
perPage,
page,
kuery,
showInactive: false,
});

return { results: res.agents.map((agent) => agent.id), total: res.total };
});
return {
results: res.agents.map((agent) => agent.id),
total: res.total,
searchAfter: res.agents[res.agents.length - 1].sort,
};
},
esClient,
context
);
fetchedAgents.forEach(addAgent);
} else {
if (platformsSelected.length > 0 || policiesSelected.length > 0) {
Expand All @@ -88,16 +144,26 @@ export const parseAgentSelection = async (

kueryFragments.push(`(${groupFragments.join(' or ')})`);
const kuery = kueryFragments.join(' and ');
const fetchedAgents = await aggregateResults(async (page, perPage) => {
const res = await agentService.listAgents({
perPage,
page,
kuery,
showInactive: false,
});
const fetchedAgents = await aggregateResults(
async (page, perPage, searchAfter?: unknown[], pitId?: string) => {
const res = await agentService.listAgents({
...(searchAfter ? { searchAfter } : {}),
...(pitId ? { pitId } : {}),
perPage,
page,
kuery,
showInactive: false,
});

return { results: res.agents.map((agent) => agent.id), total: res.total };
});
return {
results: res.agents.map((agent) => agent.id),
total: res.total,
searchAfter: res.agents[res.agents.length - 1].sort,
};
},
esClient,
context
);
fetchedAgents.forEach(addAgent);
}
}
Expand Down

0 comments on commit 25177aa

Please sign in to comment.