Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cb2-14233): Update pull-test-results/modify to remove snowball SQS pattern #88

Merged
merged 9 commits into from
Nov 1, 2024
12,082 changes: 7,794 additions & 4,288 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"rimraf": "5.0.1",
"semantic-release": "21.1.1",
"serverless": "3.34.0",
"serverless-offline": "12.0.4",
"serverless-offline": "^13.6.0",
"serverless-offline-aws-eventbridge": "2.1.0",
"serverless-plugin-typescript": "2.1.5",
"sonar-scanner": "3.1.0",
Expand Down
3 changes: 2 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
// eslint-disable-next-line
export const SEMVER_REGEX = /^([0-9]|[1-9][0-9]*)\.([0-9]|[1-9][0-9]*)\.([0-9]|[1-9][0-9]*)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+[0-9A-Za-z-]+)?$/;
export const SEMVER_REGEX =
/^([0-9]|[1-9][0-9]*)\.([0-9]|[1-9][0-9]*)\.([0-9]|[1-9][0-9]*)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+[0-9A-Za-z-]+)?$/;

Check warning on line 3 in src/constants.ts

View workflow job for this annotation

GitHub Actions / Build / build

Unsafe Regular Expression

Check warning on line 3 in src/constants.ts

View workflow job for this annotation

GitHub Actions / scanner

Unsafe Regular Expression

Check warning on line 3 in src/constants.ts

View workflow job for this annotation

GitHub Actions / build-test (18.x)

Unsafe Regular Expression
68 changes: 43 additions & 25 deletions src/eventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable no-restricted-syntax */
import { unmarshall } from '@aws-sdk/util-dynamodb';
import { DynamoDBRecord, SQSEvent } from 'aws-lambda';
import {
DynamoDBRecord, SQSBatchItemFailure, SQSBatchResponse, SQSEvent,
} from 'aws-lambda';
import { AttributeValue } from '@aws-sdk/client-dynamodb';
import type { TestResultSchema } from '@dvsa/cvs-type-definitions/types/v1/test-result';
import { TypeOfTest } from '@dvsa/cvs-type-definitions/types/v1/enums/typeOfTest.enum';
Expand All @@ -14,38 +16,54 @@ import logger from './observability/logger';
import { extractAmendedBillableTestResults } from './utils/extractAmendedBillableTestResults';
import { extractBillableTestResults } from './utils/extractTestResults';

const eventHandler = async (event: SQSEvent) => {
const eventHandler = async (event: SQSEvent): Promise<SQSBatchResponse> => {
// We want to process these in sequence to maintain order of database changes
const batchItemFailures: SQSBatchItemFailure[] = [];
for (const record of event.Records) {
const dbRecord = JSON.parse(record.body) as DynamoDBRecord;
try {
const dbRecord = JSON.parse(record.body) as DynamoDBRecord;

switch (dbRecord.eventName) {
case 'INSERT': {
const currentRecord = unmarshall(dbRecord.dynamodb.NewImage as Record<string, AttributeValue>) as TestResultSchema;
if (process.env.PROCESS_DESK_BASED_TESTS !== 'true' && currentRecord.typeOfTest === TypeOfTest.DESK_BASED as TypeOfTest) {
logger.info('Ignoring desk based test');
switch (dbRecord.eventName) {
case 'INSERT': {
const currentRecord = unmarshall(
dbRecord.dynamodb.NewImage as Record<string, AttributeValue>,
) as TestResultSchema;
if (
process.env.PROCESS_DESK_BASED_TESTS !== 'true'
&& currentRecord.typeOfTest === (TypeOfTest.DESK_BASED as TypeOfTest)
) {
logger.info('Ignoring desk based test');
break;
}
const testActivity: TestActivity[] = extractBillableTestResults(currentRecord);

const eventType = eventTypeMap.get(currentRecord.typeOfTest) ?? EventType.COMPLETION;
/* eslint-disable no-await-in-loop */
await sendEvents(testActivity, eventType);
break;
}
const testActivity: TestActivity[] = extractBillableTestResults(currentRecord);

const eventType = eventTypeMap.get(currentRecord.typeOfTest) ?? EventType.COMPLETION;
/* eslint-disable no-await-in-loop */
await sendEvents(testActivity, eventType);
break;
}
case 'MODIFY': {
const currentRecord = unmarshall(dbRecord.dynamodb.NewImage as Record<string, AttributeValue>) as TestResultSchema;
const previousRecord = unmarshall(dbRecord.dynamodb.OldImage as Record<string, AttributeValue>) as TestResultSchema;
const amendmentChanges: TestAmendment[] = extractAmendedBillableTestResults(currentRecord, previousRecord);
/* eslint-disable no-await-in-loop */
await sendEvents(amendmentChanges, EventType.AMENDMENT);
break;
case 'MODIFY': {
const currentRecord = unmarshall(
dbRecord.dynamodb.NewImage as Record<string, AttributeValue>,
) as TestResultSchema;
const previousRecord = unmarshall(
dbRecord.dynamodb.OldImage as Record<string, AttributeValue>,
) as TestResultSchema;
const amendmentChanges: TestAmendment[] = extractAmendedBillableTestResults(currentRecord, previousRecord);
/* eslint-disable no-await-in-loop */
await sendEvents(amendmentChanges, EventType.AMENDMENT);
break;
}
default:
logger.error(`Unhandled event {event: ${dbRecord.eventName}}`);
break;
}
default:
logger.error(`Unhandled event {event: ${dbRecord.eventName}}`);
break;
} catch (error) {
console.error(error);
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return { batchItemFailures };
};

const eventTypeMap = new Map<TypeOfTest, EventType>([
Expand Down
18 changes: 6 additions & 12 deletions src/insert.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import 'source-map-support/register';
import { Context, Callback, SQSEvent } from 'aws-lambda';
import { SQSEvent, SQSBatchResponse } from 'aws-lambda';
import logger from './observability/logger';
import { eventHandler } from './eventHandler';

Expand All @@ -11,18 +11,12 @@ logger.debug(
`\nRunning Service:\n '${SERVICE}'\n mode: ${NODE_ENV}\n stage: '${AWS_STAGE}'\n region: '${AWS_REGION}'\n\n`,
);

const handler = async (event: SQSEvent, _context: Context, callback: Callback) => {
try {
logger.debug(`Function triggered with '${JSON.stringify(event)}'.`);
await eventHandler(event);
const handler = async (event: SQSEvent): Promise<SQSBatchResponse> => {
logger.debug(`Function triggered with '${JSON.stringify(event)}'.`);
const batchItemFailures = await eventHandler(event);

logger.info('Data processed successfully.');
callback(null, 'Data processed successfully.');
} catch (error) {
logger.info('Data processed unsuccessfully.');
logger.error('', error);
callback(new Error('Data processed unsuccessfully.'));
}
logger.info(`Data processed successfully with ${batchItemFailures.batchItemFailures.length} failures`);
Daniel-Searle marked this conversation as resolved.
Show resolved Hide resolved
return batchItemFailures;
};

export { handler };
24 changes: 9 additions & 15 deletions src/modify.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import 'source-map-support/register';
import { Context, Callback, SQSEvent } from 'aws-lambda';
import { SQSEvent, SQSBatchResponse } from 'aws-lambda';
import logger from './observability/logger';
import { eventHandler } from './eventHandler';

Expand All @@ -11,22 +11,16 @@ logger.debug(
`\nRunning Service:\n '${SERVICE}'\n mode: ${NODE_ENV}\n stage: '${AWS_STAGE}'\n region: '${AWS_REGION}'\n\n`,
);

const handler = async (event: SQSEvent, _context: Context, callback: Callback) => {
try {
logger.debug(`Function triggered with '${JSON.stringify(event)}'.`);
if (process.env.PROCESS_MODIFY_EVENTS === 'true') {
await eventHandler(event);
} else {
logger.info('Not handling modify events.');
}
const handler = async (event: SQSEvent): Promise<SQSBatchResponse> => {
logger.debug(`Function triggered with '${JSON.stringify(event)}'.`);

logger.info('Data processed successfully.');
callback(null, 'Data processed successfully.');
} catch (error) {
logger.info('Data processed unsuccessfully.');
logger.error('', error);
callback(new Error('Data processed unsuccessfully.'));
if (process.env.PROCESS_MODIFY_EVENTS !== 'true') {
logger.info('not handling modify events.');
return { batchItemFailures: [] } as SQSBatchResponse;
}
const batchItemFailures = await eventHandler(event);
logger.info('Data processed successfully.');
return batchItemFailures;
};

export { handler };
26 changes: 26 additions & 0 deletions tests/unit/eventHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,30 @@ describe('eventHandler', () => {
expect(sendEvents).toHaveBeenCalledTimes(eventsProcessed);
},
);
it('GIVEN an event that throws an error THEN it should be caught and added to batch failures', async () => {
const errorMessageId = 'errorMessageId';
event = {
Records: [
{
messageId: errorMessageId,
receiptHandle: 'test',
attributes: {} as SQSRecordAttributes,
messageAttributes: {} as SQSMessageAttributes,
body: 'Invalid JSON Test',
awsRegion: '',
eventSource: '',
eventSourceARN: '',
md5OfBody: '',
},
],
};

const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
const result = await eventHandler(event);

expect(result.batchItemFailures).toHaveLength(1);
expect(result.batchItemFailures[0].itemIdentifier).toBe(errorMessageId);
expect(consoleSpy).toHaveBeenCalled();
consoleSpy.mockRestore();
});
});
24 changes: 7 additions & 17 deletions tests/unit/insert.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { SQSEvent, SQSMessageAttributes, SQSRecordAttributes } from 'aws-lambda';
import {
SQSBatchResponse, SQSEvent, SQSMessageAttributes, SQSRecordAttributes,
} from 'aws-lambda';

process.env.LOG_LEVEL = 'debug';
import { mocked } from 'jest-mock';
Expand Down Expand Up @@ -37,24 +39,12 @@ describe('Application entry', () => {
});

describe('Handler', () => {
it('GIVEN an event WHEN the eventHandler resolves THEN a callback result is returned', async () => {
mocked(eventHandler).mockReturnValue(Promise.resolve());
await handler(mockEvent, null, (error: string | Error, result: string) => {
expect(error).toBeNull();
expect(result).toBe('Data processed successfully.');
});
expect(eventHandler).toHaveBeenCalled();
expect(eventHandler).toHaveBeenCalledWith(mockEvent);
});

it('GIVEN an event WHEN the eventHandler throws an error THEN a call back error is returned', async () => {
mocked(eventHandler).mockReturnValue(Promise.reject());
await handler(mockEvent, null, (error: string | Error, result: string) => {
expect(error).toEqual(new Error('Data processed unsuccessfully.'));
expect(result).toBeUndefined();
});
it('GIVEN an event WHEN the eventHandler throws an error THEN return the sqsBatchResponse with failed records', async () => {
mocked(eventHandler).mockReturnValue(Promise.resolve({ batchItemFailures: [{ itemIdentifier: 'test' }] } as SQSBatchResponse));
const sqsBatchResponse = await handler(mockEvent);
expect(eventHandler).toHaveBeenCalled();
expect(eventHandler).toHaveBeenCalledWith(mockEvent);
expect(sqsBatchResponse.batchItemFailures.length).toBeGreaterThan(0);
});
});
});
32 changes: 10 additions & 22 deletions tests/unit/modify.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/ban-ts-comment, @typescript-eslint/no-unsafe-call */
import { SQSEvent, SQSMessageAttributes, SQSRecordAttributes } from 'aws-lambda';
import {
SQSBatchResponse, SQSEvent, SQSMessageAttributes, SQSRecordAttributes,
} from 'aws-lambda';

process.env.LOG_LEVEL = 'debug';
import { mocked } from 'jest-mock';
Expand Down Expand Up @@ -40,36 +42,22 @@ describe('Application entry', () => {
});

describe('Handler', () => {
it('GIVEN an event WHEN the eventHandler resolves THEN a callback result is returned', async () => {
it('GIVEN an event WHEN the eventHandler throws an error THEN return the sqsBatchResponse with failed records', async () => {
process.env.PROCESS_MODIFY_EVENTS = 'true';
mocked(eventHandler).mockReturnValue(Promise.resolve());
await handler(mockEvent, null, (error: string | Error, result: string) => {
expect(error).toBeNull();
expect(result).toBe('Data processed successfully.');
});
expect(eventHandler).toHaveBeenCalled();
expect(eventHandler).toHaveBeenCalledWith(mockEvent);
});
mocked(eventHandler).mockReturnValue(Promise.resolve({ batchItemFailures: [{ itemIdentifier: 'test' }] } as SQSBatchResponse));
const sqsBatchResponse = await handler(mockEvent);

it('GIVEN an event WHEN the eventHandler throws an THEN a callback error is returned', async () => {
process.env.PROCESS_MODIFY_EVENTS = 'true';
mocked(eventHandler).mockReturnValue(Promise.reject());
await handler(mockEvent, null, (error: string | Error, result: string) => {
expect(error).toEqual(new Error('Data processed unsuccessfully.'));
expect(result).toBeUndefined();
});
expect(eventHandler).toHaveBeenCalled();
expect(eventHandler).toHaveBeenCalledWith(mockEvent);
expect(sqsBatchResponse.batchItemFailures.length).toBeGreaterThan(0);
});

it("GIVEN an event WHEN the environment variable PROCESS_MODIFY_EVENTS is not set to 'true' THEN a callback result is returned AND the eventHandler is not called", async () => {
process.env.PROCESS_MODIFY_EVENTS = 'false';
mocked(eventHandler).mockReturnValue(Promise.resolve());
await handler(mockEvent, null, (error: string | Error, result: string) => {
expect(error).toBeNull();
expect(result).toBe('Data processed successfully.');
});
mocked(eventHandler).mockReturnValue(Promise.resolve({ batchItemFailures: [] } as SQSBatchResponse));
const sqsBatchResponse = await handler(mockEvent);
expect(eventHandler).not.toHaveBeenCalled();
expect(sqsBatchResponse.batchItemFailures).toHaveLength(0);
});
});
});
Loading