Skip to content

Commit

Permalink
[SecuritySolution] Improve asset criticality bulk error when entities…
Browse files Browse the repository at this point in the history
… are duplicated (elastic#199651)

## Summary

* Improve asset criticality bulk error when entities are duplicated
* It also fixes the server errors line to be '1' based.

![Screenshot 2024-11-11 at 11 46
31](https://github.com/user-attachments/assets/3fbf35fb-cd27-417a-bf53-41a197d1bbe9)

### Performance

Test parameters: file with +33k lines and ~1 MB size.
* Before 6.24 seconds
* After 6.46 seconds

Execution time Increased  ~0.22 seconds

### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

(cherry picked from commit 02ac5fc)
  • Loading branch information
machadoum committed Nov 13, 2024
1 parent 323eb99 commit 4414b8d
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
*/

import { loggingSystemMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { Readable } from 'stream';
import { AssetCriticalityDataClient } from './asset_criticality_data_client';
import { createOrUpdateIndex } from '../utils/create_or_update_index';
import { auditLoggerMock } from '@kbn/security-plugin/server/audit/mocks';
import type { AssetCriticalityUpsert } from '../../../../common/entity_analytics/asset_criticality/types';
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';

type MockInternalEsClient = ReturnType<
typeof elasticsearchServiceMock.createScopedClusterClient
Expand Down Expand Up @@ -264,4 +266,84 @@ describe('AssetCriticalityDataClient', () => {
);
});
});

describe('#bulkUpsertFromStream()', () => {
let esClientMock: MockInternalEsClient;
let loggerMock: ReturnType<typeof loggingSystemMock.createLogger>;
let subject: AssetCriticalityDataClient;

beforeEach(() => {
esClientMock = elasticsearchServiceMock.createScopedClusterClient().asInternalUser;

esClientMock.helpers.bulk = mockEsBulk();
loggerMock = loggingSystemMock.createLogger();
subject = new AssetCriticalityDataClient({
esClient: esClientMock,
logger: loggerMock,
namespace: 'default',
auditLogger: mockAuditLogger,
});
});

it('returns valid stats', async () => {
const recordsStream = [
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
];

const result = await subject.bulkUpsertFromStream({
recordsStream: Readable.from(recordsStream),
retries: 3,
flushBytes: 1_000,
});

expect(result).toEqual({
errors: [],
stats: {
failed: 0,
successful: 1,
total: 1,
},
});
});

it('returns error for duplicated entities', async () => {
const recordsStream = [
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
];

const result = await subject.bulkUpsertFromStream({
recordsStream: Readable.from(recordsStream),
retries: 3,
flushBytes: 1_000,
streamIndexStart: 9,
});

expect(result).toEqual({
errors: [
{
index: 10,
message: 'Duplicated entity',
},
],
stats: {
failed: 1,
successful: 1,
total: 2,
},
});
});
});
});

const mockEsBulk = () =>
jest.fn().mockImplementation(async ({ datasource }) => {
let count = 0;
for await (const _ of datasource) {
count++;
}
return {
failed: 0,
successful: count,
};
}) as unknown as ElasticsearchClientMock['helpers']['bulk'];
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type AssetCriticalityIdParts = Pick<AssetCriticalityUpsert, 'idField' | 'idValue

type BulkUpsertFromStreamOptions = {
recordsStream: NodeJS.ReadableStream;
/**
* The index number for the first stream element. By default the errors are zero-indexed.
*/
streamIndexStart?: number;
} & Pick<Parameters<ElasticsearchClient['helpers']['bulk']>[0], 'flushBytes' | 'retries'>;

type StoredAssetCriticalityRecord = {
Expand Down Expand Up @@ -236,6 +240,7 @@ export class AssetCriticalityDataClient {
* @param recordsStream a stream of records to upsert, records may also be an error e.g if there was an error parsing
* @param flushBytes how big elasticsearch bulk requests should be before they are sent
* @param retries the number of times to retry a failed bulk request
* @param streamIndexStart By default the errors are zero-indexed. You can change it by setting this param to a value like `1`. It could be useful for file upload.
* @returns an object containing the number of records updated, created, errored, and the total number of records processed
* @throws an error if the stream emits an error
* @remarks
Expand All @@ -248,6 +253,7 @@ export class AssetCriticalityDataClient {
recordsStream,
flushBytes,
retries,
streamIndexStart = 0,
}: BulkUpsertFromStreamOptions): Promise<BulkUpsertAssetCriticalityRecordsResponse> => {
const errors: BulkUpsertAssetCriticalityRecordsResponse['errors'] = [];
const stats: BulkUpsertAssetCriticalityRecordsResponse['stats'] = {
Expand All @@ -256,10 +262,13 @@ export class AssetCriticalityDataClient {
total: 0,
};

let streamIndex = 0;
let streamIndex = streamIndexStart;
const recordGenerator = async function* () {
const processedEntities = new Set<string>();

for await (const untypedRecord of recordsStream) {
const record = untypedRecord as unknown as AssetCriticalityUpsert | Error;

stats.total++;
if (record instanceof Error) {
stats.failed++;
Expand All @@ -268,10 +277,20 @@ export class AssetCriticalityDataClient {
index: streamIndex,
});
} else {
yield {
record,
index: streamIndex,
};
const entityKey = `${record.idField}-${record.idValue}`;
if (processedEntities.has(entityKey)) {
errors.push({
message: 'Duplicated entity',
index: streamIndex,
});
stats.failed++;
} else {
processedEntities.add(entityKey);
yield {
record,
index: streamIndex,
};
}
}
streamIndex++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export const assetCriticalityPublicCSVUploadRoute = (
recordsStream,
retries: errorRetries,
flushBytes: maxBulkRequestBodySizeBytes,
streamIndexStart: 1, // It is the first line number
});
const end = new Date();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,36 +102,36 @@ export default ({ getService }: FtrProviderContext) => {

expect(body.errors).toEqual([
{
index: 0,
index: 1,
message:
'Invalid criticality level "invalid_criticality", expected one of extreme_impact, high_impact, medium_impact, low_impact',
},
{
index: 1,
index: 2,
message: 'Invalid entity type "invalid_entity", expected host or user',
},
{
index: 2,
index: 3,
message: 'Missing identifier',
},
{
index: 3,
index: 4,
message: 'Missing criticality level',
},
{
index: 4,
index: 5,
message: 'Missing entity type',
},
{
index: 5,
index: 6,
message: 'Expected 3 columns, got 2',
},
{
index: 6,
index: 7,
message: 'Expected 3 columns, got 4',
},
{
index: 7,
index: 8,
message: `Identifier is too long, expected less than 1000 characters, got 1001`,
},
]);
Expand All @@ -154,7 +154,7 @@ export default ({ getService }: FtrProviderContext) => {

expect(body.errors).toEqual([
{
index: 1,
index: 2,
message:
'Invalid criticality level "invalid_criticality", expected one of extreme_impact, high_impact, medium_impact, low_impact',
},
Expand Down

0 comments on commit 4414b8d

Please sign in to comment.