Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SecuritySolution] Improve asset criticality bulk error when entities are duplicated #199651

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
*/

import { loggingSystemMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { Readable } from 'stream';
import { AssetCriticalityDataClient } from './asset_criticality_data_client';
import { createOrUpdateIndex } from '../utils/create_or_update_index';
import { auditLoggerMock } from '@kbn/security-plugin/server/audit/mocks';
import type { AssetCriticalityUpsert } from '../../../../common/entity_analytics/asset_criticality/types';
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';

type MockInternalEsClient = ReturnType<
typeof elasticsearchServiceMock.createScopedClusterClient
Expand Down Expand Up @@ -264,4 +266,84 @@ describe('AssetCriticalityDataClient', () => {
);
});
});

describe('#bulkUpsertFromStream()', () => {
let esClientMock: MockInternalEsClient;
let loggerMock: ReturnType<typeof loggingSystemMock.createLogger>;
let subject: AssetCriticalityDataClient;

beforeEach(() => {
esClientMock = elasticsearchServiceMock.createScopedClusterClient().asInternalUser;

esClientMock.helpers.bulk = mockEsBulk();
loggerMock = loggingSystemMock.createLogger();
subject = new AssetCriticalityDataClient({
esClient: esClientMock,
logger: loggerMock,
namespace: 'default',
auditLogger: mockAuditLogger,
});
});

it('returns valid stats', async () => {
const recordsStream = [
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
];

const result = await subject.bulkUpsertFromStream({
recordsStream: Readable.from(recordsStream),
retries: 3,
flushBytes: 1_000,
});

expect(result).toEqual({
errors: [],
stats: {
failed: 0,
successful: 1,
total: 1,
},
});
});

it('returns error for duplicated entities', async () => {
const recordsStream = [
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
{ idField: 'host.name', idValue: 'host1', criticalityLevel: 'high_impact' },
];

const result = await subject.bulkUpsertFromStream({
recordsStream: Readable.from(recordsStream),
retries: 3,
flushBytes: 1_000,
streamIndexStart: 9,
});

expect(result).toEqual({
errors: [
{
index: 10,
message: 'Duplicated entity',
},
],
stats: {
failed: 1,
successful: 1,
total: 2,
},
});
});
});
});

const mockEsBulk = () =>
jest.fn().mockImplementation(async ({ datasource }) => {
let count = 0;
for await (const _ of datasource) {
count++;
}
return {
failed: 0,
successful: count,
};
}) as unknown as ElasticsearchClientMock['helpers']['bulk'];
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type AssetCriticalityIdParts = Pick<AssetCriticalityUpsert, 'idField' | 'idValue

type BulkUpsertFromStreamOptions = {
recordsStream: NodeJS.ReadableStream;
/**
* The index number for the first stream element. By default the errors are zero-indexed.
*/
streamIndexStart?: number;
} & Pick<Parameters<ElasticsearchClient['helpers']['bulk']>[0], 'flushBytes' | 'retries'>;

type StoredAssetCriticalityRecord = {
Expand Down Expand Up @@ -236,6 +240,7 @@ export class AssetCriticalityDataClient {
* @param recordsStream a stream of records to upsert, records may also be an error e.g if there was an error parsing
* @param flushBytes how big elasticsearch bulk requests should be before they are sent
* @param retries the number of times to retry a failed bulk request
* @param streamIndexStart By default the errors are zero-indexed. You can change it by setting this param to a value like `1`. It could be useful for file upload.
* @returns an object containing the number of records updated, created, errored, and the total number of records processed
* @throws an error if the stream emits an error
* @remarks
Expand All @@ -248,6 +253,7 @@ export class AssetCriticalityDataClient {
recordsStream,
flushBytes,
retries,
streamIndexStart = 0,
}: BulkUpsertFromStreamOptions): Promise<BulkUpsertAssetCriticalityRecordsResponse> => {
const errors: BulkUpsertAssetCriticalityRecordsResponse['errors'] = [];
const stats: BulkUpsertAssetCriticalityRecordsResponse['stats'] = {
Expand All @@ -256,10 +262,13 @@ export class AssetCriticalityDataClient {
total: 0,
};

let streamIndex = 0;
let streamIndex = streamIndexStart;
const recordGenerator = async function* () {
const processedEntities = new Set<string>();

for await (const untypedRecord of recordsStream) {
const record = untypedRecord as unknown as AssetCriticalityUpsert | Error;

stats.total++;
if (record instanceof Error) {
stats.failed++;
Expand All @@ -268,10 +277,20 @@ export class AssetCriticalityDataClient {
index: streamIndex,
});
} else {
yield {
record,
index: streamIndex,
};
const entityKey = `${record.idField}-${record.idValue}`;
if (processedEntities.has(entityKey)) {
errors.push({
message: 'Duplicated entity',
index: streamIndex,
});
stats.failed++;
} else {
processedEntities.add(entityKey);
yield {
record,
index: streamIndex,
};
}
}
streamIndex++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export const assetCriticalityPublicCSVUploadRoute = (
recordsStream,
retries: errorRetries,
flushBytes: maxBulkRequestBodySizeBytes,
streamIndexStart: 1, // It is the first line number
});
const end = new Date();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,36 +102,36 @@ export default ({ getService }: FtrProviderContext) => {

expect(body.errors).toEqual([
{
index: 0,
index: 1,
message:
'Invalid criticality level "invalid_criticality", expected one of extreme_impact, high_impact, medium_impact, low_impact',
},
{
index: 1,
index: 2,
message: 'Invalid entity type "invalid_entity", expected host or user',
},
{
index: 2,
index: 3,
message: 'Missing identifier',
},
{
index: 3,
index: 4,
message: 'Missing criticality level',
},
{
index: 4,
index: 5,
message: 'Missing entity type',
},
{
index: 5,
index: 6,
message: 'Expected 3 columns, got 2',
},
{
index: 6,
index: 7,
message: 'Expected 3 columns, got 4',
},
{
index: 7,
index: 8,
message: `Identifier is too long, expected less than 1000 characters, got 1001`,
},
]);
Expand All @@ -154,7 +154,7 @@ export default ({ getService }: FtrProviderContext) => {

expect(body.errors).toEqual([
{
index: 1,
index: 2,
message:
'Invalid criticality level "invalid_criticality", expected one of extreme_impact, high_impact, medium_impact, low_impact',
},
Expand Down