diff --git a/.eslintrc b/.eslintrc new file mode 100644 index 0000000..d29dc97 --- /dev/null +++ b/.eslintrc @@ -0,0 +1,21 @@ +{ + "env": { + "es6": true, + "mocha": true, + "node": true + }, + "extends": ["@naturacosmeticos/eslint-config-natura"], + "parserOptions": { + "ecmaVersion": 2017, + "ecmaFeatures": { + "experimentalObjectRestSpread": true + } + }, + "settings": { + "import/resolver": { + "node": { + "paths": ["src", "test"] + } + } + } +} \ No newline at end of file diff --git a/src/common/errors/messages.js b/src/common/errors/messages.js index 86db103..0d7de9b 100644 --- a/src/common/errors/messages.js +++ b/src/common/errors/messages.js @@ -1,10 +1,10 @@ module.exports = { messageBus: { close: 'Message Bus error on close connection', - unavailable: 'Message Bus Unavailable', compress: 'Failed to compress message - Sending uncompressed message', decompress: 'Failed to decompress message - Aborting message retrieval', delete: 'Failed to delete message - It will be reprocessed', + unavailable: 'Message Bus Unavailable', }, messageHandler: { error: 'Message handler failed - Aborting message consumption', diff --git a/src/pub-sub/amqp/message-bus.js b/src/pub-sub/amqp/message-bus.js index 212ee64..0b5b0a1 100644 --- a/src/pub-sub/amqp/message-bus.js +++ b/src/pub-sub/amqp/message-bus.js @@ -26,6 +26,7 @@ class MessageBus { * @param {Object} message - The raw message content * @returns {Promise} */ + // eslint-disable-next-line max-lines-per-function, max-statements async publish(bus, message) { const connection = new AmqpConnection(this.serverUrl); const logger = Logger.current().createChildLogger('message-bus:send'); diff --git a/src/pub-sub/aws/sns/message-bus.js b/src/pub-sub/aws/sns/message-bus.js index b84983d..c850dcc 100644 --- a/src/pub-sub/aws/sns/message-bus.js +++ b/src/pub-sub/aws/sns/message-bus.js @@ -25,6 +25,7 @@ class MessageBus { * @param {string} friendlyName - The topic friendly name * @param {string} message - The message you want to publish */ + // eslint-disable-next-line max-lines-per-function, max-statements async publish(friendlyName, message) { const sns = ClientFactory.create('sns'); const logger = Logger.current().createChildLogger('message-bus:send'); @@ -33,7 +34,8 @@ class MessageBus { let compressedMessage; try { - compressedMessage = await CompressEngine.compressMessage(wrappedCorrelationIdMessage, this.compressEngine); + compressedMessage = await CompressEngine + .compressMessage(wrappedCorrelationIdMessage, this.compressEngine); } catch (error) { logger.error(`${errorMessages.messageBus.compress}, ${error}`); compressedMessage = wrappedCorrelationIdMessage; diff --git a/src/queue/amqp/message-bus.js b/src/queue/amqp/message-bus.js index 04bec07..8efb5b4 100644 --- a/src/queue/amqp/message-bus.js +++ b/src/queue/amqp/message-bus.js @@ -1,8 +1,8 @@ +const Logger = require('@naturacosmeticos/clio-nodejs-logger'); const AmqpConnection = require('../../common/amqp/connection'); const LoggerContext = require('../../common/logger/context'); const CorrelationEngine = require('../../util/correlation-engine'); const CompressEngine = require('../../util/compress-engine'); -const Logger = require('@naturacosmeticos/clio-nodejs-logger'); const errorMessages = require('../../common/errors/messages'); const MessageBusError = require('../../common/errors/message-bus-error'); @@ -80,16 +80,19 @@ class AmqpMessageBus { /** * @private */ + // eslint-disable-next-line max-lines-per-function handler(queueName, channel, fn) { const logger = Logger.current().createChildLogger('message-bus:receive'); + // eslint-disable-next-line max-lines-per-function return message => LoggerContext.run(() => new Promise(async (resolve) => { const compressedMessage = JSON.parse(message.content.toString('utf-8')); try { const decompressedMessage = await CompressEngine.decompressMessage(compressedMessage); const wrappedCorrelationIdMessage = CorrelationEngine.wrapMessage(decompressedMessage); - const { body, correlationId } = CorrelationEngine.unwrapMessage(wrappedCorrelationIdMessage); + const { body, correlationId } = CorrelationEngine + .unwrapMessage(wrappedCorrelationIdMessage); LoggerContext .logItemProcessing(() => fn(body, correlationId), queueName, body) diff --git a/src/queue/aws/lambda/handler.js b/src/queue/aws/lambda/handler.js index e1b5e97..9841c0b 100644 --- a/src/queue/aws/lambda/handler.js +++ b/src/queue/aws/lambda/handler.js @@ -1,5 +1,5 @@ -const ClientFactory = require('../../../common/aws/client-factory'); const Logger = require('@naturacosmeticos/clio-nodejs-logger'); +const ClientFactory = require('../../../common/aws/client-factory'); const LoggerContext = require('../../../common/logger/context'); const CorrelationEngine = require('../../../util/correlation-engine'); const CompressEngine = require('../../../util/compress-engine'); @@ -33,7 +33,9 @@ class LambdaHandler { }); } - async handleRecord({ body, eventSourceARN: arn, receiptHandle, messageId }) { + async handleRecord({ + body, eventSourceARN: arn, receiptHandle, messageId, + }) { const logger = Logger.current().createChildLogger('lambdaHandler:handleRecord'); const queueInfo = this.arnToQueueInfo[arn]; const awsMessage = JSON.parse(body); diff --git a/src/queue/aws/message-bus.js b/src/queue/aws/message-bus.js index 7625c7b..95b34ca 100644 --- a/src/queue/aws/message-bus.js +++ b/src/queue/aws/message-bus.js @@ -70,7 +70,6 @@ class MessageBus { /** @private */ handler(queueName, fn) { return (message, done) => LoggerContext.run(async () => { - const logger = Logger.current().createChildLogger('message-bus:handler'); try { @@ -96,7 +95,7 @@ class MessageBus { .catch(done); } catch (error) { logger.error(`${errorMessages.messageBus.decompress} - ${error}`); - throw new MessageBusError(`${errorMessages.messageBus.decompress}: ${error}`) + throw new MessageBusError(`${errorMessages.messageBus.decompress}: ${error}`); } }); } diff --git a/src/util/compress-engine/engines/gzip.js b/src/util/compress-engine/engines/gzip.js index 0d76638..8ff0d7c 100644 --- a/src/util/compress-engine/engines/gzip.js +++ b/src/util/compress-engine/engines/gzip.js @@ -1,19 +1,22 @@ const { promisify } = require('util'); const zlib = require('zlib'); + const gzip = promisify(zlib.gzip); const gunzip = promisify(zlib.gunzip); class GzipEngine { async compress(input) { - const buff = await gzip(input,{level:zlib.constants.Z_MAX_LEVEL}); + const buff = await gzip(input, { level: zlib.constants.Z_MAX_LEVEL }); + return buff.toString('base64'); } async decompress(input) { const debased = Buffer.from(input, 'base64'); const unziped = await gunzip(debased); + return unziped.toString(); } } -module.exports = GzipEngine; \ No newline at end of file +module.exports = GzipEngine; diff --git a/src/util/compress-engine/index.js b/src/util/compress-engine/index.js index 78e944a..28fc8ef 100644 --- a/src/util/compress-engine/index.js +++ b/src/util/compress-engine/index.js @@ -5,6 +5,7 @@ async function extractInputMessage(inputMessage) { try { const input = await CompressEngine.decompress(inputMessage['x-iris-data'], inputMessage['x-iris-engine']); const resultData = JSON.parse(input); + return resultData; } catch (err) { throw err; diff --git a/src/util/correlation-engine/index.js b/src/util/correlation-engine/index.js index 889e6d9..22e7c01 100644 --- a/src/util/correlation-engine/index.js +++ b/src/util/correlation-engine/index.js @@ -1,18 +1,20 @@ const AsyncHooksStorage = require('@naturacosmeticos/async-hooks-storage'); class CorrelationEngine { - static wrapMessage(unwrappedMessageJSON) { - const correlationId = AsyncHooksStorage.getEntry('correlation-id'); - const wrappedMessage = correlationId ? { correlationId, message: unwrappedMessageJSON } : unwrappedMessageJSON; - return wrappedMessage; - } + static wrapMessage(unwrappedMessageJSON) { + const correlationId = AsyncHooksStorage.getEntry('correlation-id'); + const wrappedMessage = correlationId ? { correlationId, message: unwrappedMessageJSON } : unwrappedMessageJSON; - static unwrapMessage(wrappedMessageJSON) { - const isCorrelationIdWrappedMessage = (wrappedMessageJSON && wrappedMessageJSON.correlationId && wrappedMessageJSON.message && Object.keys(wrappedMessageJSON).length===2); - const body = isCorrelationIdWrappedMessage ? wrappedMessageJSON.message : wrappedMessageJSON; - const correlationId = isCorrelationIdWrappedMessage ? wrappedMessageJSON.correlationId : undefined; - return { correlationId, body }; - } + return wrappedMessage; + } + + static unwrapMessage(wrappedMessageJSON) { + const isCorrelationIdWrappedMessage = (wrappedMessageJSON && wrappedMessageJSON.correlationId && wrappedMessageJSON.message && Object.keys(wrappedMessageJSON).length === 2); + const body = isCorrelationIdWrappedMessage ? wrappedMessageJSON.message : wrappedMessageJSON; + const correlationId = isCorrelationIdWrappedMessage ? wrappedMessageJSON.correlationId : undefined; + + return { correlationId, body }; + } } -module.exports = CorrelationEngine; \ No newline at end of file +module.exports = CorrelationEngine; diff --git a/test/integration/queue/amqp/message-bus.js b/test/integration/queue/amqp/message-bus.js index ddb20c0..759c5c5 100644 --- a/test/integration/queue/amqp/message-bus.js +++ b/test/integration/queue/amqp/message-bus.js @@ -32,6 +32,7 @@ describe('QueueAmqpMessageBus', () => { await messageBus.receive({ [bus]: async (receivedMessage) => { // eslint-disable-line require-await const decompressedMessage = await compressEngine.decompressMessage(receivedMessage); + assert.deepEqual(decompressedMessage, message); done(); }, diff --git a/test/integration/queue/aws/message-bus.js b/test/integration/queue/aws/message-bus.js index f2fb0f6..e6bcb9d 100644 --- a/test/integration/queue/aws/message-bus.js +++ b/test/integration/queue/aws/message-bus.js @@ -22,6 +22,7 @@ describe('QueueAwsSqsMessageBus', () => { await messageBus.receive({ [sqsQueue.name]: async (receivedMessage) => { // eslint-disable-line require-await const decompressedMessage = await CompressEngine.decompressMessage(receivedMessage); + assert.deepEqual(decompressedMessage, message); }, });