diff --git a/consumer/src/worker.js b/consumer/src/worker.js index fdfe5dd..7254c35 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -37,6 +37,7 @@ export function initHandlers(handlers) { */ export async function consume(channel, exchangeName, queue, publishChannel) { channel.assertExchange(exchangeName, 'topic', { durable: true }); + publishChannel.assertExchange(exchangeName, 'topic', { durable: true }); channel.assertQueue(queue, { durable: true }); const bindings = _.keys(EVENT_HANDLERS); const bindingPromises = _.map(bindings, rk => @@ -78,11 +79,17 @@ export async function consume(channel, exchangeName, queue, publishChannel) { // we can use cloudamqp console to check the messages and may be manually create SF lead // nacking here was causing flood of messages to the worker and it keep on consuming high resources channel.ack(msg); - publishChannel.publish( - exchangeName, - EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, - new Buffer(msg.content.toString()) - ); + try { + publishChannel.publish( + exchangeName, + EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, + new Buffer(msg.content.toString()) + ); + } catch(e) { + // TODO decide if we want nack the original msg here + // for now just ignoring the error in requeue + logger.logFullError(e, `Error in publising Exchange to ${exchangeName}`); + } } } }); diff --git a/consumer/test/worker.spec.js b/consumer/test/worker.spec.js index c41b0a0..02b3102 100644 --- a/consumer/test/worker.spec.js +++ b/consumer/test/worker.spec.js @@ -61,7 +61,8 @@ describe('worker', () => { }, }, exchangeName, queueName, { - publish: channelPublishSpy + publish: channelPublishSpy, + assertExchange }); }