Skip to content

Commit

Permalink
Merge pull request #8 from appirio-tech/dev
Browse files Browse the repository at this point in the history
Requeue, failed messages, in different exchange
  • Loading branch information
vikasrohit authored Apr 14, 2017
2 parents 02723a4 + 5a1222d commit 6be4f05
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 8 deletions.
1 change: 1 addition & 0 deletions consumer/config/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ export const EVENT = {
PROJECT_DRAFT_CREATED: 'project.draft-created',
PROJECT_UPDATED: 'project.updated',
PROJECT_DELETED: 'project.deleted',
CONNECT_TO_SF_FAILED: 'connect2sf.failed'
},
};
1 change: 0 additions & 1 deletion consumer/src/services/ConfigurationService.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class ConfigurationService {
},
},
}).promise();
console.log('node env: ' + process.env.NODE_ENV);
if (!result.Items.length) {
throw new Error('Configuration for AppXpressConfig not found');
}
Expand Down
27 changes: 24 additions & 3 deletions consumer/src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ export function initHandlers(handlers) {
* @param {String} exchangeName the exchange name
* @param {String} queue the queue name
*/
export async function consume(channel, exchangeName, queue) {
export async function consume(channel, exchangeName, queue, publishChannel) {
channel.assertExchange(exchangeName, 'topic', { durable: true });
publishChannel.assertExchange(exchangeName, 'topic', { durable: true });
channel.assertQueue(queue, { durable: true });
const bindings = _.keys(EVENT_HANDLERS);
const bindingPromises = _.map(bindings, rk =>
Expand Down Expand Up @@ -73,8 +74,22 @@ export async function consume(channel, exchangeName, queue) {
if (e.shouldAck) {
channel.ack(msg);
} else {
// acking for debugging issue on production. this would prevent log pile up
// ack the message but copy it to other queue where no consumer is listening
// we can listen to that queue on adhoc basis when we see error case like lead not created in SF
// we can use cloudamqp console to check the messages and may be manually create SF lead
// nacking here was causing flood of messages to the worker and it keep on consuming high resources
channel.ack(msg);
try {
publishChannel.publish(
exchangeName,
EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED,
new Buffer(msg.content.toString())
);
} catch(e) {
// TODO decide if we want nack the original msg here
// for now just ignoring the error in requeue
logger.logFullError(e, `Error in publising Exchange to ${exchangeName}`);
}
}
}
});
Expand All @@ -91,7 +106,13 @@ async function start() {
debug('created connection successfully with URL: ' + config.rabbitmqURL);
const channel = await connection.createConfirmChannel();
debug('Channel confirmed...');
consume(channel, config.rabbitmq.projectsExchange, config.rabbitmq.queues.project);
const publishChannel = await connection.createConfirmChannel();
consume(
channel,
config.rabbitmq.projectsExchange,
config.rabbitmq.queues.project,
publishChannel
);
} catch (e) {
debug('Unable to connect to RabbitMQ');
}
Expand Down
14 changes: 10 additions & 4 deletions consumer/test/worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import './setup';
describe('worker', () => {
describe('consume', () => {
const queueName = 'sample-queue';
const exchangeName = EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED//'sample-exchange';
const exchangeName = 'sample-exchange';
const validMessage = {
content: JSON.stringify({ sampleData: 'foo' }),
properties: { correlationId : 'unit-tests'},
Expand All @@ -24,6 +24,7 @@ describe('worker', () => {
let rabbitConsume;
let exchangeHandlerSpy = sinon.spy();
let fakeExchangeHandlerSpy = sinon.spy();
let channelPublishSpy = sinon.spy();

beforeEach(() => {
handler = sinon.spy();
Expand Down Expand Up @@ -58,7 +59,11 @@ describe('worker', () => {
done(e);
}
},
}, exchangeName, queueName);
}, exchangeName, queueName,
{
publish: channelPublishSpy,
assertExchange
});
}

it('should consume and ack a message successfully', (done) => {
Expand Down Expand Up @@ -91,15 +96,16 @@ describe('worker', () => {
invokeConsume(done);
});

xit('should nack if error is thrown', (done) => {
it('should ack, with message being copied to temp queue, if error is thrown', (done) => {
initHandlers({
[exchangeName] : () => {
throw new Error('foo');
}
})
rabbitConsume = async (queue, fn) => {
await fn(validMessage);
nack.should.have.been.calledWith(validMessage);
ack.should.have.been.calledWith(validMessage);
channelPublishSpy.should.have.been.calledWith(exchangeName, EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, new Buffer(validMessage.content));
};
invokeConsume(done);
});
Expand Down

0 comments on commit 6be4f05

Please sign in to comment.