Skip to content

Commit

Permalink
Lint fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Murilo Portescheller committed Aug 12, 2019
1 parent 09b5b19 commit 53d064b
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 22 deletions.
21 changes: 21 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"env": {
"es6": true,
"mocha": true,
"node": true
},
"extends": ["@naturacosmeticos/eslint-config-natura"],
"parserOptions": {
"ecmaVersion": 2017,
"ecmaFeatures": {
"experimentalObjectRestSpread": true
}
},
"settings": {
"import/resolver": {
"node": {
"paths": ["src", "test"]
}
}
}
}
2 changes: 1 addition & 1 deletion src/common/errors/messages.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module.exports = {
messageBus: {
close: 'Message Bus error on close connection',
unavailable: 'Message Bus Unavailable',
compress: 'Failed to compress message - Sending uncompressed message',
decompress: 'Failed to decompress message - Aborting message retrieval',
delete: 'Failed to delete message - It will be reprocessed',
unavailable: 'Message Bus Unavailable',
},
messageHandler: {
error: 'Message handler failed - Aborting message consumption',
Expand Down
1 change: 1 addition & 0 deletions src/pub-sub/amqp/message-bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class MessageBus {
* @param {Object} message - The raw message content
* @returns {Promise<void>}
*/
// eslint-disable-next-line max-lines-per-function, max-statements
async publish(bus, message) {
const connection = new AmqpConnection(this.serverUrl);
const logger = Logger.current().createChildLogger('message-bus:send');
Expand Down
4 changes: 3 additions & 1 deletion src/pub-sub/aws/sns/message-bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class MessageBus {
* @param {string} friendlyName - The topic friendly name
* @param {string} message - The message you want to publish
*/
// eslint-disable-next-line max-lines-per-function, max-statements
async publish(friendlyName, message) {
const sns = ClientFactory.create('sns');
const logger = Logger.current().createChildLogger('message-bus:send');
Expand All @@ -33,7 +34,8 @@ class MessageBus {
let compressedMessage;

try {
compressedMessage = await CompressEngine.compressMessage(wrappedCorrelationIdMessage, this.compressEngine);
compressedMessage = await CompressEngine
.compressMessage(wrappedCorrelationIdMessage, this.compressEngine);
} catch (error) {
logger.error(`${errorMessages.messageBus.compress}, ${error}`);
compressedMessage = wrappedCorrelationIdMessage;
Expand Down
7 changes: 5 additions & 2 deletions src/queue/amqp/message-bus.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const Logger = require('@naturacosmeticos/clio-nodejs-logger');
const AmqpConnection = require('../../common/amqp/connection');
const LoggerContext = require('../../common/logger/context');
const CorrelationEngine = require('../../util/correlation-engine');
const CompressEngine = require('../../util/compress-engine');
const Logger = require('@naturacosmeticos/clio-nodejs-logger');
const errorMessages = require('../../common/errors/messages');
const MessageBusError = require('../../common/errors/message-bus-error');

Expand Down Expand Up @@ -80,16 +80,19 @@ class AmqpMessageBus {
/**
* @private
*/
// eslint-disable-next-line max-lines-per-function
handler(queueName, channel, fn) {
const logger = Logger.current().createChildLogger('message-bus:receive');

// eslint-disable-next-line max-lines-per-function
return message => LoggerContext.run(() => new Promise(async (resolve) => {
const compressedMessage = JSON.parse(message.content.toString('utf-8'));

try {
const decompressedMessage = await CompressEngine.decompressMessage(compressedMessage);
const wrappedCorrelationIdMessage = CorrelationEngine.wrapMessage(decompressedMessage);
const { body, correlationId } = CorrelationEngine.unwrapMessage(wrappedCorrelationIdMessage);
const { body, correlationId } = CorrelationEngine
.unwrapMessage(wrappedCorrelationIdMessage);

LoggerContext
.logItemProcessing(() => fn(body, correlationId), queueName, body)
Expand Down
6 changes: 4 additions & 2 deletions src/queue/aws/lambda/handler.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const ClientFactory = require('../../../common/aws/client-factory');
const Logger = require('@naturacosmeticos/clio-nodejs-logger');
const ClientFactory = require('../../../common/aws/client-factory');
const LoggerContext = require('../../../common/logger/context');
const CorrelationEngine = require('../../../util/correlation-engine');
const CompressEngine = require('../../../util/compress-engine');
Expand Down Expand Up @@ -33,7 +33,9 @@ class LambdaHandler {
});
}

async handleRecord({ body, eventSourceARN: arn, receiptHandle, messageId }) {
async handleRecord({
body, eventSourceARN: arn, receiptHandle, messageId,
}) {
const logger = Logger.current().createChildLogger('lambdaHandler:handleRecord');
const queueInfo = this.arnToQueueInfo[arn];
const awsMessage = JSON.parse(body);
Expand Down
3 changes: 1 addition & 2 deletions src/queue/aws/message-bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class MessageBus {
/** @private */
handler(queueName, fn) {
return (message, done) => LoggerContext.run(async () => {

const logger = Logger.current().createChildLogger('message-bus:handler');

try {
Expand All @@ -96,7 +95,7 @@ class MessageBus {
.catch(done);
} catch (error) {
logger.error(`${errorMessages.messageBus.decompress} - ${error}`);
throw new MessageBusError(`${errorMessages.messageBus.decompress}: ${error}`)
throw new MessageBusError(`${errorMessages.messageBus.decompress}: ${error}`);
}
});
}
Expand Down
7 changes: 5 additions & 2 deletions src/util/compress-engine/engines/gzip.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
const { promisify } = require('util');
const zlib = require('zlib');

const gzip = promisify(zlib.gzip);
const gunzip = promisify(zlib.gunzip);

class GzipEngine {
async compress(input) {
const buff = await gzip(input,{level:zlib.constants.Z_MAX_LEVEL});
const buff = await gzip(input, { level: zlib.constants.Z_MAX_LEVEL });

return buff.toString('base64');
}

async decompress(input) {
const debased = Buffer.from(input, 'base64');
const unziped = await gunzip(debased);

return unziped.toString();
}
}

module.exports = GzipEngine;
module.exports = GzipEngine;
1 change: 1 addition & 0 deletions src/util/compress-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ async function extractInputMessage(inputMessage) {
try {
const input = await CompressEngine.decompress(inputMessage['x-iris-data'], inputMessage['x-iris-engine']);
const resultData = JSON.parse(input);

return resultData;
} catch (err) {
throw err;
Expand Down
26 changes: 14 additions & 12 deletions src/util/correlation-engine/index.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
const AsyncHooksStorage = require('@naturacosmeticos/async-hooks-storage');

class CorrelationEngine {
static wrapMessage(unwrappedMessageJSON) {
const correlationId = AsyncHooksStorage.getEntry('correlation-id');
const wrappedMessage = correlationId ? { correlationId, message: unwrappedMessageJSON } : unwrappedMessageJSON;
return wrappedMessage;
}
static wrapMessage(unwrappedMessageJSON) {
const correlationId = AsyncHooksStorage.getEntry('correlation-id');
const wrappedMessage = correlationId ? { correlationId, message: unwrappedMessageJSON } : unwrappedMessageJSON;

static unwrapMessage(wrappedMessageJSON) {
const isCorrelationIdWrappedMessage = (wrappedMessageJSON && wrappedMessageJSON.correlationId && wrappedMessageJSON.message && Object.keys(wrappedMessageJSON).length===2);
const body = isCorrelationIdWrappedMessage ? wrappedMessageJSON.message : wrappedMessageJSON;
const correlationId = isCorrelationIdWrappedMessage ? wrappedMessageJSON.correlationId : undefined;
return { correlationId, body };
}
return wrappedMessage;
}

static unwrapMessage(wrappedMessageJSON) {
const isCorrelationIdWrappedMessage = (wrappedMessageJSON && wrappedMessageJSON.correlationId && wrappedMessageJSON.message && Object.keys(wrappedMessageJSON).length === 2);
const body = isCorrelationIdWrappedMessage ? wrappedMessageJSON.message : wrappedMessageJSON;
const correlationId = isCorrelationIdWrappedMessage ? wrappedMessageJSON.correlationId : undefined;

return { correlationId, body };
}
}

module.exports = CorrelationEngine;
module.exports = CorrelationEngine;
1 change: 1 addition & 0 deletions test/integration/queue/amqp/message-bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ describe('QueueAmqpMessageBus', () => {
await messageBus.receive({
[bus]: async (receivedMessage) => { // eslint-disable-line require-await
const decompressedMessage = await compressEngine.decompressMessage(receivedMessage);

assert.deepEqual(decompressedMessage, message);
done();
},
Expand Down
1 change: 1 addition & 0 deletions test/integration/queue/aws/message-bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ describe('QueueAwsSqsMessageBus', () => {
await messageBus.receive({
[sqsQueue.name]: async (receivedMessage) => { // eslint-disable-line require-await
const decompressedMessage = await CompressEngine.decompressMessage(receivedMessage);

assert.deepEqual(decompressedMessage, message);
},
});
Expand Down

0 comments on commit 53d064b

Please sign in to comment.