Skip to content

Commit

Permalink
[8.x] [SecuritySolution] Improve asset criticality bulk error when en…
Browse files Browse the repository at this point in the history
…tities are duplicated (elastic#199651) (elastic#199968)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[SecuritySolution] Improve asset criticality bulk error when entities
are duplicated (elastic#199651)](elastic#199651)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Pablo
Machado","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-11-13T10:22:44Z","message":"[SecuritySolution]
Improve asset criticality bulk error when entities are duplicated
(elastic#199651)\n\n## Summary\r\n\r\n* Improve asset criticality bulk error
when entities are duplicated\r\n* It also fixes the server errors line
to be '1' based.\r\n\r\n![Screenshot 2024-11-11 at 11
46\r\n31](https://github.com/user-attachments/assets/3fbf35fb-cd27-417a-bf53-41a197d1bbe9)\r\n\r\n###
Performance\r\n\r\nTest parameters: file with +33k lines and ~1 MB
size.\r\n* Before 6.24 seconds\r\n* After 6.46 seconds\r\n\r\nExecution
time Increased ~0.22 seconds\r\n\r\n\r\n### Checklist\r\n\r\nDelete any
items that are not applicable to this PR.\r\n\r\n- [x] [Unit or
functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere
updated or added to match the most common
scenarios","sha":"02ac5fc90f08615d877befc05f9675838ade77b4","branchLabelMapping":{"^v9.0.0$":"main","^v8.17.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["bug","release_note:fix","v9.0.0","Team:
SecuritySolution","Theme: entity_analytics","Feature:Entity
Analytics","Team:Entity
Analytics","backport:version","v8.17.0"],"title":"[SecuritySolution]
Improve asset criticality bulk error when entities are
duplicated","number":199651,"url":"https://github.com/elastic/kibana/pull/199651","mergeCommit":{"message":"[SecuritySolution]
Improve asset criticality bulk error when entities are duplicated
(elastic#199651)\n\n## Summary\r\n\r\n* Improve asset criticality bulk error
when entities are duplicated\r\n* It also fixes the server errors line
to be '1' based.\r\n\r\n![Screenshot 2024-11-11 at 11
46\r\n31](https://github.com/user-attachments/assets/3fbf35fb-cd27-417a-bf53-41a197d1bbe9)\r\n\r\n###
Performance\r\n\r\nTest parameters: file with +33k lines and ~1 MB
size.\r\n* Before 6.24 seconds\r\n* After 6.46 seconds\r\n\r\nExecution
time Increased ~0.22 seconds\r\n\r\n\r\n### Checklist\r\n\r\nDelete any
items that are not applicable to this PR.\r\n\r\n- [x] [Unit or
functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere
updated or added to match the most common
scenarios","sha":"02ac5fc90f08615d877befc05f9675838ade77b4"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/199651","number":199651,"mergeCommit":{"message":"[SecuritySolution]
Improve asset criticality bulk error when entities are duplicated
(elastic#199651)\n\n## Summary\r\n\r\n* Improve asset criticality bulk error
when entities are duplicated\r\n* It also fixes the server errors line
to be '1' based.\r\n\r\n![Screenshot 2024-11-11 at 11
46\r\n31](https://github.com/user-attachments/assets/3fbf35fb-cd27-417a-bf53-41a197d1bbe9)\r\n\r\n###
Performance\r\n\r\nTest parameters: file with +33k lines and ~1 MB
size.\r\n* Before 6.24 seconds\r\n* After 6.46 seconds\r\n\r\nExecution
time Increased ~0.22 seconds\r\n\r\n\r\n### Checklist\r\n\r\nDelete any
items that are not applicable to this PR.\r\n\r\n- [x] [Unit or
functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere
updated or added to match the most common
scenarios","sha":"02ac5fc90f08615d877befc05f9675838ade77b4"}},{"branch":"8.x","label":"v8.17.0","branchLabelMappingKey":"^v8.17.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Pablo Machado <[email protected]>
  • Loading branch information
kibanamachine and machadoum authored Nov 13, 2024
1 parent a68248c commit 7fb5ec9
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
*/

import { loggingSystemMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { Readable } from 'stream';
import { AssetCriticalityDataClient } from './asset_criticality_data_client';
import { createOrUpdateIndex } from '../utils/create_or_update_index';
import { auditLoggerMock } from '@kbn/security-plugin/server/audit/mocks';
import type { AssetCriticalityUpsert } from '../../../../common/entity_analytics/asset_criticality/types';
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';

type MockInternalEsClient = ReturnType<
typeof elasticsearchServiceMock.createScopedClusterClient
Expand Down Expand Up @@ -264,4 +266,84 @@ describe('AssetCriticalityDataClient', () => {
);
});
});

describe('#bulkUpsertFromStream()', () => {
let esClientMock: MockInternalEsClient;
let loggerMock: ReturnType<typeof loggingSystemMock.createLogger>;
let subject: AssetCriticalityDataClient;

beforeEach(() => {
esClientMock = elasticsearchServiceMock.createScopedClusterClient().asInternalUser;

esClientMock.helpers.bulk = mockEsBulk();
loggerMock = loggingSystemMock.createLogger();
subject = new AssetCriticalityDataClient({
esClient: esClientMock,
logger: loggerMock,
namespace: 'default',
auditLogger: mockAuditLogger,
});
});

it('returns valid stats', async () => {
const recordsStream = [
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
];

const result = await subject.bulkUpsertFromStream({
recordsStream: Readable.from(recordsStream),
retries: 3,
flushBytes: 1_000,
});

expect(result).toEqual({
errors: [],
stats: {
failed: 0,
successful: 1,
total: 1,
},
});
});

it('returns error for duplicated entities', async () => {
const recordsStream = [
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
];

const result = await subject.bulkUpsertFromStream({
recordsStream: Readable.from(recordsStream),
retries: 3,
flushBytes: 1_000,
streamIndexStart: 9,
});

expect(result).toEqual({
errors: [
{
index: 10,
message: 'Duplicated entity',
},
],
stats: {
failed: 1,
successful: 1,
total: 2,
},
});
});
});
});

const mockEsBulk = () =>
jest.fn().mockImplementation(async ({ datasource }) => {
let count = 0;
for await (const _ of datasource) {
count++;
}
return {
failed: 0,
successful: count,
};
}) as unknown as ElasticsearchClientMock['helpers']['bulk'];
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type AssetCriticalityIdParts = Pick<AssetCriticalityUpsert, 'idField' | 'idValue

type BulkUpsertFromStreamOptions = {
recordsStream: NodeJS.ReadableStream;
/**
* The index number for the first stream element. By default the errors are zero-indexed.
*/
streamIndexStart?: number;
} & Pick<Parameters<ElasticsearchClient['helpers']['bulk']>[0], 'flushBytes' | 'retries'>;

type StoredAssetCriticalityRecord = {
Expand Down Expand Up @@ -236,6 +240,7 @@ export class AssetCriticalityDataClient {
* @param recordsStream a stream of records to upsert, records may also be an error e.g if there was an error parsing
* @param flushBytes how big elasticsearch bulk requests should be before they are sent
* @param retries the number of times to retry a failed bulk request
* @param streamIndexStart By default the errors are zero-indexed. You can change it by setting this param to a value like `1`. It could be useful for file upload.
* @returns an object containing the number of records updated, created, errored, and the total number of records processed
* @throws an error if the stream emits an error
* @remarks
Expand All @@ -248,6 +253,7 @@ export class AssetCriticalityDataClient {
recordsStream,
flushBytes,
retries,
streamIndexStart = 0,
}: BulkUpsertFromStreamOptions): Promise<BulkUpsertAssetCriticalityRecordsResponse> => {
const errors: BulkUpsertAssetCriticalityRecordsResponse['errors'] = [];
const stats: BulkUpsertAssetCriticalityRecordsResponse['stats'] = {
Expand All @@ -256,10 +262,13 @@ export class AssetCriticalityDataClient {
total: 0,
};

let streamIndex = 0;
let streamIndex = streamIndexStart;
const recordGenerator = async function* () {
const processedEntities = new Set<string>();

for await (const untypedRecord of recordsStream) {
const record = untypedRecord as unknown as AssetCriticalityUpsert | Error;

stats.total++;
if (record instanceof Error) {
stats.failed++;
Expand All @@ -268,10 +277,20 @@ export class AssetCriticalityDataClient {
index: streamIndex,
});
} else {
yield {
record,
index: streamIndex,
};
const entityKey = `${record.idField}-${record.idValue}`;
if (processedEntities.has(entityKey)) {
errors.push({
message: 'Duplicated entity',
index: streamIndex,
});
stats.failed++;
} else {
processedEntities.add(entityKey);
yield {
record,
index: streamIndex,
};
}
}
streamIndex++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export const assetCriticalityPublicCSVUploadRoute = (
recordsStream,
retries: errorRetries,
flushBytes: maxBulkRequestBodySizeBytes,
streamIndexStart: 1, // It is the first line number
});
const end = new Date();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,36 +102,36 @@ export default ({ getService }: FtrProviderContext) => {

expect(body.errors).toEqual([
{
index: 0,
index: 1,
message:
'Invalid criticality level "invalid_criticality", expected one of extreme_impact, high_impact, medium_impact, low_impact',
},
{
index: 1,
index: 2,
message: 'Invalid entity type "invalid_entity", expected host or user',
},
{
index: 2,
index: 3,
message: 'Missing identifier',
},
{
index: 3,
index: 4,
message: 'Missing criticality level',
},
{
index: 4,
index: 5,
message: 'Missing entity type',
},
{
index: 5,
index: 6,
message: 'Expected 3 columns, got 2',
},
{
index: 6,
index: 7,
message: 'Expected 3 columns, got 4',
},
{
index: 7,
index: 8,
message: `Identifier is too long, expected less than 1000 characters, got 1001`,
},
]);
Expand All @@ -154,7 +154,7 @@ export default ({ getService }: FtrProviderContext) => {

expect(body.errors).toEqual([
{
index: 1,
index: 2,
message:
'Invalid criticality level "invalid_criticality", expected one of extreme_impact, high_impact, medium_impact, low_impact',
},
Expand Down

0 comments on commit 7fb5ec9

Please sign in to comment.