From 41869627c6b29702c806f6324a69f2f273fde2ba Mon Sep 17 00:00:00 2001 From: Tai NA Date: Sat, 1 May 2021 17:51:24 +0700 Subject: [PATCH 1/2] Fix bug graceful shutdown --- src/index.js | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/index.js b/src/index.js index 9d0ad76..39af139 100644 --- a/src/index.js +++ b/src/index.js @@ -1,4 +1,3 @@ -const Crypto = require("crypto"); const Amqplib = require("amqplib"); const DefaultDeep = require("defaults-deep"); const Deep = require("deep-get-set"); @@ -59,9 +58,10 @@ const ACTION_OPTIONS_VALIDATOR = { const gracefulShutdown = async function () { isShuttingDown = true; - Object.keys(this.$amqpQueues).forEach((queueName) => { - const { channel, options } = this.$amqpQueues[queueName] || {}; - channel && channel.cancel(Deep(options, "amqp.consume.consumerTag")); + Object.keys(this.$amqpQueues).forEach(async (queueName) => { + const { channel, consumerTag } = this.$amqpQueues[queueName] || {}; + + channel && consumerTag && await channel.cancel(consumerTag); }); }; @@ -80,7 +80,6 @@ const initAMQPQueues = function (schema) { const queueName = `amqp.${schema.version ? `v${schema.version}.` : ""}${schema.name}.${originActionName}`; const queueOption = DefaultDeep({}, schema.actions[originActionName].queue, DEFAULT_QUEUE_OPTIONS); - Deep(queueOption, "amqp.consume.consumerTag", Crypto.randomBytes(16).toString("hex")); this.$amqpQueues[queueName] = { options: queueOption, @@ -267,7 +266,10 @@ module.exports = (options) => ({ } = queue; const amqpChannel = await this.assertAMQPQueue(queueName); - amqpChannel.consume(queueName, consumeHandler.bind(this, amqpChannel), consumeOptions); + const { + consumerTag, + } = await amqpChannel.consume(queueName, consumeHandler.bind(this, amqpChannel), consumeOptions); + this.$amqpQueues[queueName].consumerTag = consumerTag; }); }, }, From c61aec0aaa0fbeb7ee218e97160aaabe91fd17f8 Mon Sep 17 00:00:00 2001 From: Tai NA Date: Sat, 1 May 2021 17:52:40 +0700 Subject: [PATCH 2/2] Bump version to v1.1.3 --- CHANGELOG.md | 4 ++++ package.json | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dfe6bb..9a6873f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## v1.1.3 +- [Fix] + - Fix bug graceful shutdown not cancel consumer + ## v1.1.2 - [Fix] - Fix bug using merge-deep instead of defaults-deep diff --git a/package.json b/package.json index e8a3ff6..77deba1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "moleculer-rabbitmq", - "version": "1.1.2", + "version": "1.1.3", "description": "Moleculer RabbitMQ queue plugin", "main": "index.js", "scripts": {