Skip to content

Commit

Permalink
feat(Statement): Add alternative publish to queue logic without using…
Browse files Browse the repository at this point in the history
… Redis (LLC-2162) (#890)
  • Loading branch information
PrinceWaune authored Mar 20, 2023
1 parent 22ea51f commit 393bad4
Show file tree
Hide file tree
Showing 17 changed files with 583 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ EXPRESS_PORT=8081
###############################
# Uncomment next line if you want to enable statement handling priority
#ENABLE_QUEUE_PRIORITY=true
# Event provider(redis|sqs). Redis by default
#EVENTS_REPO=redis
# Queue namespace
#QUEUE_NAMESPACE=DEV

##########
# Misc #
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"dependencies": {
"@aws-sdk/client-s3": "^3.100.0",
"@aws-sdk/client-sqs": "^3.282.0",
"@aws-sdk/lib-storage": "^3.100.0",
"@azure/storage-blob": "^10.3.0",
"@google-cloud/storage": "^5.8.1",
Expand Down Expand Up @@ -208,6 +209,8 @@
"functional/prefer-type-literal": "off",
"functional/no-throw-statement": "off",
"functional/no-try-statement": "off",
"functional/no-let": "off",
"functional/no-loop-statement": "off",
"functional/prefer-readonly-type": [
"error",
{
Expand Down
6 changes: 6 additions & 0 deletions src/apps/AppConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Redis } from 'ioredis';
import Tracker from 'jscommons/dist/tracker/Tracker';
import { Db } from 'mongodb';
import { LoggerInstance } from 'winston';
import { SQSClient } from '@aws-sdk/client-sqs';

export default interface AppConfig {
readonly repo: {
Expand Down Expand Up @@ -46,6 +47,11 @@ export default interface AppConfig {
readonly client: () => Promise<Redis>;
readonly isQueuePriorityEnabled: boolean;
};
readonly sqs: {
readonly prefix: string;
readonly client: () => Promise<SQSClient>;
readonly isQueuePriorityEnabled: boolean;
};
};
readonly service: {
readonly statements: {
Expand Down
1 change: 1 addition & 0 deletions src/apps/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export default (appConfig: AppConfig): Router => {
local: appConfig.repo.local,
mongo: appConfig.repo.mongo,
redis: appConfig.repo.redis,
sqs: appConfig.repo.sqs,
s3: appConfig.repo.s3,
storageSubFolder: appConfig.repo.storageSubFolders.statements,
},
Expand Down
6 changes: 6 additions & 0 deletions src/apps/statements/AppConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Redis } from 'ioredis';
import Tracker from 'jscommons/dist/tracker/Tracker';
import { Db } from 'mongodb';
import { LoggerInstance } from 'winston';
import { SQSClient } from '@aws-sdk/client-sqs';

export default interface AppConfig {
readonly logger: LoggerInstance;
Expand Down Expand Up @@ -62,5 +63,10 @@ export default interface AppConfig {
readonly client: () => Promise<Redis>;
readonly isQueuePriorityEnabled: boolean;
};
readonly sqs: {
readonly prefix: string;
readonly client: () => Promise<SQSClient>;
readonly isQueuePriorityEnabled: boolean;
};
};
}
5 changes: 5 additions & 0 deletions src/apps/statements/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ export default (appConfig: AppConfig): Result => {
prefix: appConfig.repo.redis.prefix,
isQueuePriorityEnabled: appConfig.repo.redis.isQueuePriorityEnabled,
},
sqs: {
client: appConfig.repo.sqs.client,
prefix: appConfig.repo.sqs.prefix,
isQueuePriorityEnabled: appConfig.repo.sqs.isQueuePriorityEnabled,
},
},
models: {
facade: appConfig.repo.factory.modelsRepoName,
Expand Down
2 changes: 2 additions & 0 deletions src/apps/statements/repo/eventsRepo/FactoryConfig.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import RedisFactoryConfig from './utils/redisEvents/FactoryConfig';
import SQSFactoryConfig from './utils/sqsEvents/FactoryConfig';

export default interface FactoryConfig {
readonly facade?: string;
readonly redis?: RedisFactoryConfig;
readonly sqs?: SQSFactoryConfig;
}
70 changes: 70 additions & 0 deletions src/apps/statements/repo/eventsRepo/emitNewStatements/sqs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import {
GetQueueUrlCommand,
SQSClient,
SendMessageBatchCommand,
SendMessageBatchRequestEntry,
} from '@aws-sdk/client-sqs';
import { v4 } from 'uuid';
import { getPrefixWithProcessingPriority } from '../utils/getPrefixWithProcessingPriority';
import { StatementProcessingPriority } from '../../../enums/statementProcessingPriority.enum';
import FacadeConfig from '../utils/sqsEvents/FacadeConfig';
import { STATEMENT_QUEUE } from '../utils/constants';
import Signature from './Signature';

const MAX_BATCH_SIZE = 10;

let queueUrl: string | undefined;

const publishMessages = async (sqsClient: SQSClient, statementProperties: string[]) => {
const statementPropertiesBatchRequest = statementProperties.map(
(statementProperty): SendMessageBatchRequestEntry => ({
Id: v4(),
MessageBody: statementProperty,
}),
);

for (let index = 0; index < statementPropertiesBatchRequest.length; index += MAX_BATCH_SIZE) {
await sqsClient.send(
new SendMessageBatchCommand({
QueueUrl: queueUrl,
Entries: statementPropertiesBatchRequest.slice(index, index + MAX_BATCH_SIZE),
}),
);
}
};

const getQueueUrl = async (
sqsClient: SQSClient,
prefix: string,
priority: StatementProcessingPriority,
isQueuePriorityEnabled: boolean,
) => {
if (queueUrl) {
return queueUrl;
}

const prefixWithPriority = getPrefixWithProcessingPriority(
prefix,
priority,
isQueuePriorityEnabled,
);

const getQueueUrlCommand = new GetQueueUrlCommand({
QueueName: `${prefixWithPriority}_${STATEMENT_QUEUE}`,
});

const commandResult = await sqsClient.send(getQueueUrlCommand);

queueUrl = commandResult.QueueUrl;

return queueUrl;
};

export default (config: FacadeConfig): Signature => {
return async ({ statementProperties, priority }) => {
const sqsClient = await config.client();

await getQueueUrl(sqsClient, config.prefix, priority, config.isQueuePriorityEnabled);
await publishMessages(sqsClient, statementProperties);
};
};
3 changes: 3 additions & 0 deletions src/apps/statements/repo/eventsRepo/factory.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import Facade from './Facade';
import FactoryConfig from './FactoryConfig';
import redisFactory from './utils/redisEvents/factory';
import sqsFactory from './utils/sqsEvents/factory';

export default (config: FactoryConfig): Facade => {
switch (config.facade) {
default:
case 'redis':
return redisFactory(config.redis);
case 'sqs':
return sqsFactory(config.sqs);
}
};
1 change: 1 addition & 0 deletions src/apps/statements/repo/eventsRepo/utils/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export const EVENT_NAME = 'statement.new';
export const CHANNEL_NAME = 'statement.notify';
export const STATEMENT_QUEUE = 'STATEMENT_QUEUE';
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { SQSClient } from '@aws-sdk/client-sqs';

export default interface FacadeConfig {
readonly client: () => Promise<SQSClient>;
readonly prefix: string;
readonly isQueuePriorityEnabled: boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { SQSClient } from '@aws-sdk/client-sqs';

export default interface FactoryConfig {
readonly client?: () => Promise<SQSClient>;
readonly prefix?: string;
readonly isQueuePriorityEnabled?: boolean;
}
26 changes: 26 additions & 0 deletions src/apps/statements/repo/eventsRepo/utils/sqsEvents/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { defaultTo } from 'lodash';
import emitNewStatements from '../../emitNewStatements/sqs';
import Facade from '../../Facade';
import connectToSQS from "../../../../../../utils/connectToSQS";
import FacadeConfig from './FacadeConfig';
import FactoryConfig from './FactoryConfig';

export default (factoryConfig: FactoryConfig = {}): Facade => {
const facadeConfig: FacadeConfig = {
client: defaultTo(factoryConfig.client, connectToSQS()),
prefix: defaultTo(factoryConfig.prefix, 'xapistatements'),
isQueuePriorityEnabled: defaultTo(factoryConfig.isQueuePriorityEnabled, false),
};
return {
emitNewStatements: emitNewStatements(facadeConfig),
clearRepo: async () => {
// Do nothing.
},
migrate: async () => {
// Do nothing.
},
rollback: async () => {
// Do nothing.
},
};
};
8 changes: 8 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ export default {
prefix: getStringOption(process.env.REDIS_PREFIX, 'LEARNINGLOCKER'),
url: getStringOption(process.env.REDIS_URL, 'redis://127.0.0.1:6379/0'),
},
aws: {
region: globalAwsRegion,
accessKeyId: globalAwsIamAccessKeyId,
secretAccessKey: globalAwsIamAccessKeySecret,
},
sqs: {
prefix: getStringOption(process.env.QUEUE_NAMESPACE, 'DEV'),
},
repoFactory: {
authRepoName: getStringOption(process.env.AUTH_REPO, 'mongo'),
eventsRepoName: getStringOption(process.env.EVENTS_REPO, 'redis'),
Expand Down
6 changes: 6 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import config from './config';
import logger from './logger';
import connectToMongoDb from './utils/connectToMongoDb';
import connectToRedis from './utils/connectToRedis';
import connectToSQS from './utils/connectToSQS';

const expressApp = express();

Expand All @@ -31,6 +32,11 @@ expressApp.use(
prefix: config.redis.prefix,
isQueuePriorityEnabled: config.isQueuePriorityEnabled,
},
sqs: {
client: connectToSQS(),
prefix: config.sqs.prefix,
isQueuePriorityEnabled: config.isQueuePriorityEnabled,
},
repoFactory: config.repoFactory,
s3: config.s3StorageRepo,
storageSubFolders: config.storageSubFolders,
Expand Down
22 changes: 22 additions & 0 deletions src/utils/connectToSQS.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { SQSClient } from '@aws-sdk/client-sqs';
import { once } from 'lodash';
import config from '../config';
import logger from '../logger';

export default once((): (() => Promise<SQSClient>) => {
return once(async () => {
logger.info('Creating SQS connection');

return new SQSClient({
...(config.aws.region ? { region: config.aws.region } : null),
...(config.aws.accessKeyId && config.aws.secretAccessKey
? {
credentials: {
accessKeyId: config.aws.accessKeyId as string,
secretAccessKey: config.aws.secretAccessKey as string,
},
}
: null),
});
});
});
Loading

0 comments on commit 393bad4

Please sign in to comment.