diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.test.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.test.ts index 75ec685a4756e..0907af8fa92d5 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.test.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.test.ts @@ -6,10 +6,12 @@ */ import { loggingSystemMock, elasticsearchServiceMock } from '@kbn/core/server/mocks'; +import { Readable } from 'stream'; import { AssetCriticalityDataClient } from './asset_criticality_data_client'; import { createOrUpdateIndex } from '../utils/create_or_update_index'; import { auditLoggerMock } from '@kbn/security-plugin/server/audit/mocks'; import type { AssetCriticalityUpsert } from '../../../../common/entity_analytics/asset_criticality/types'; +import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks'; type MockInternalEsClient = ReturnType< typeof elasticsearchServiceMock.createScopedClusterClient @@ -264,4 +266,84 @@ describe('AssetCriticalityDataClient', () => { ); }); }); + + describe('#bulkUpsertFromStream()', () => { + let esClientMock: MockInternalEsClient; + let loggerMock: ReturnType; + let subject: AssetCriticalityDataClient; + + beforeEach(() => { + esClientMock = elasticsearchServiceMock.createScopedClusterClient().asInternalUser; + + esClientMock.helpers.bulk = mockEsBulk(); + loggerMock = loggingSystemMock.createLogger(); + subject = new AssetCriticalityDataClient({ + esClient: esClientMock, + logger: loggerMock, + namespace: 'default', + auditLogger: mockAuditLogger, + }); + }); + + it('returns valid stats', async () => { + const recordsStream = [ + { idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' }, + ]; + + const result = await subject.bulkUpsertFromStream({ + recordsStream: Readable.from(recordsStream), + retries: 3, + flushBytes: 1_000, + }); + + expect(result).toEqual({ + errors: [], + stats: { + failed: 0, + successful: 1, + total: 1, + }, + }); + }); + + it('returns error for duplicated entities', async () => { + const recordsStream = [ + { idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' }, + { idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' }, + ]; + + const result = await subject.bulkUpsertFromStream({ + recordsStream: Readable.from(recordsStream), + retries: 3, + flushBytes: 1_000, + streamIndexStart: 9, + }); + + expect(result).toEqual({ + errors: [ + { + index: 10, + message: 'Duplicated entity', + }, + ], + stats: { + failed: 1, + successful: 1, + total: 2, + }, + }); + }); + }); }); + +const mockEsBulk = () => + jest.fn().mockImplementation(async ({ datasource }) => { + let count = 0; + for await (const _ of datasource) { + count++; + } + return { + failed: 0, + successful: count, + }; + }) as unknown as ElasticsearchClientMock['helpers']['bulk']; diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.ts index b957030f2c8e5..b4d7e2f492eb8 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.ts @@ -35,6 +35,10 @@ type AssetCriticalityIdParts = Pick[0], 'flushBytes' | 'retries'>; type StoredAssetCriticalityRecord = { @@ -236,6 +240,7 @@ export class AssetCriticalityDataClient { * @param recordsStream a stream of records to upsert, records may also be an error e.g if there was an error parsing * @param flushBytes how big elasticsearch bulk requests should be before they are sent * @param retries the number of times to retry a failed bulk request + * @param streamIndexStart By default the errors are zero-indexed. You can change it by setting this param to a value like `1`. It could be useful for file upload. * @returns an object containing the number of records updated, created, errored, and the total number of records processed * @throws an error if the stream emits an error * @remarks @@ -248,6 +253,7 @@ export class AssetCriticalityDataClient { recordsStream, flushBytes, retries, + streamIndexStart = 0, }: BulkUpsertFromStreamOptions): Promise => { const errors: BulkUpsertAssetCriticalityRecordsResponse['errors'] = []; const stats: BulkUpsertAssetCriticalityRecordsResponse['stats'] = { @@ -256,10 +262,13 @@ export class AssetCriticalityDataClient { total: 0, }; - let streamIndex = 0; + let streamIndex = streamIndexStart; const recordGenerator = async function* () { + const processedEntities = new Set(); + for await (const untypedRecord of recordsStream) { const record = untypedRecord as unknown as AssetCriticalityUpsert | Error; + stats.total++; if (record instanceof Error) { stats.failed++; @@ -268,10 +277,20 @@ export class AssetCriticalityDataClient { index: streamIndex, }); } else { - yield { - record, - index: streamIndex, - }; + const entityKey = `${record.idField}-${record.idValue}`; + if (processedEntities.has(entityKey)) { + errors.push({ + message: 'Duplicated entity', + index: streamIndex, + }); + stats.failed++; + } else { + processedEntities.add(entityKey); + yield { + record, + index: streamIndex, + }; + } } streamIndex++; } diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/routes/upload_csv.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/routes/upload_csv.ts index dccf24d161054..22dba4bf321c7 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/routes/upload_csv.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/routes/upload_csv.ts @@ -102,6 +102,7 @@ export const assetCriticalityPublicCSVUploadRoute = ( recordsStream, retries: errorRetries, flushBytes: maxBulkRequestBodySizeBytes, + streamIndexStart: 1, // It is the first line number }); const end = new Date(); diff --git a/x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/asset_criticality_csv_upload.ts b/x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/asset_criticality_csv_upload.ts index 496cde9a79e13..737458f75a516 100644 --- a/x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/asset_criticality_csv_upload.ts +++ b/x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/asset_criticality_csv_upload.ts @@ -102,36 +102,36 @@ export default ({ getService }: FtrProviderContext) => { expect(body.errors).toEqual([ { - index: 0, + index: 1, message: 'Invalid criticality level "invalid_criticality", expected one of extreme_impact, high_impact, medium_impact, low_impact', }, { - index: 1, + index: 2, message: 'Invalid entity type "invalid_entity", expected host or user', }, { - index: 2, + index: 3, message: 'Missing identifier', }, { - index: 3, + index: 4, message: 'Missing criticality level', }, { - index: 4, + index: 5, message: 'Missing entity type', }, { - index: 5, + index: 6, message: 'Expected 3 columns, got 2', }, { - index: 6, + index: 7, message: 'Expected 3 columns, got 4', }, { - index: 7, + index: 8, message: `Identifier is too long, expected less than 1000 characters, got 1001`, }, ]); @@ -154,7 +154,7 @@ export default ({ getService }: FtrProviderContext) => { expect(body.errors).toEqual([ { - index: 1, + index: 2, message: 'Invalid criticality level "invalid_criticality", expected one of extreme_impact, high_impact, medium_impact, low_impact', },