From 84bf8c803529c30e0c1e88bc4b459ca377a607fa Mon Sep 17 00:00:00 2001 From: Tai NA Date: Fri, 16 Apr 2021 19:57:12 +0700 Subject: [PATCH 1/3] Fix bug wrong queueName on initAMQPActions --- src/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.js b/src/index.js index 8bb25cf..254c83a 100644 --- a/src/index.js +++ b/src/index.js @@ -145,6 +145,7 @@ const initAMQPActions = function (schema) { }; } + const queueName = `amqp.${schema.version ? `v${schema.version}.` : ""}${schema.name}.${actionName}`; schema.actions[`${actionName}.async`] = { timeout: 10000, retryPolicy: { @@ -153,7 +154,6 @@ const initAMQPActions = function (schema) { }, params: asyncParams, async handler(ctx) { - const queueName = `amqp.${ctx.params.action}`; const dedupeHash = Deep(this.$amqpOptions, [queueName, "options", "dedupHash"]); const headers = ctx.headers || {}; if (typeof dedupeHash === "number" || typeof dedupeHash === "string") { From b33c3dd34e73d8c0ec0b4495b0beb41dccbf055a Mon Sep 17 00:00:00 2001 From: Tai NA Date: Fri, 16 Apr 2021 20:25:04 +0700 Subject: [PATCH 2/3] Add gracefulShutdown logic --- examples/gracefull_shutdown/consumer.js | 67 ++++++++++++++++++++++++ examples/gracefull_shutdown/publisher.js | 50 ++++++++++++++++++ src/index.js | 56 ++++++++++++++++---- 3 files changed, 162 insertions(+), 11 deletions(-) create mode 100644 examples/gracefull_shutdown/consumer.js create mode 100644 examples/gracefull_shutdown/publisher.js diff --git a/examples/gracefull_shutdown/consumer.js b/examples/gracefull_shutdown/consumer.js new file mode 100644 index 0000000..21799e9 --- /dev/null +++ b/examples/gracefull_shutdown/consumer.js @@ -0,0 +1,67 @@ +const { ServiceBroker } = require("moleculer"); +const QueueMixin = require("../../index"); + +let broker = new ServiceBroker({ + logger: console, + transporter: "TCP", + tracking: { // Enable moleculer graceful shutdown + enabled: true, + shutdownTimeout: 30 * 1000 + } +}); + +const queueMixin = QueueMixin({ + connection: "amqp://localhost", + asyncActions: true, // Enable auto generate .async version for actions +}); + +broker.createService({ + name: "consumer", + version: 1, + + mixins: [ + queueMixin, + ], + + settings: { + amqp: { + connection: "amqp://localhost", // You can also override setting from service setting + }, + }, + + actions: { + hello: { + queue: { // Enable queue for this action + // Options for AMQP queue + amqp: { + queueAssert: { + durable: true, + }, + consume: { + noAck: false, + }, + prefetch: 0, + }, + dedupHash: (ctx) => { + return ctx.params.name; + }, + }, + params: { + name: "string|convert:true|empty:false", + }, + async handler(ctx) { + this.logger.info(`[CONSUMER] PID: ${process.pid} Received job with name=${ctx.params.name}`); + return new Promise((resolve) => { + setTimeout(() => { + this.logger.info(`[CONSUMER] PID: ${process.pid} Processed job with name=${ctx.params.name}`); + return resolve(`hello ${ctx.params.name}`); + }, 3000); // Simulate slow task + }); + }, + }, + }, +}); + +broker.start().then(() => { + broker.repl(); +}); diff --git a/examples/gracefull_shutdown/publisher.js b/examples/gracefull_shutdown/publisher.js new file mode 100644 index 0000000..0749fd7 --- /dev/null +++ b/examples/gracefull_shutdown/publisher.js @@ -0,0 +1,50 @@ +const { ServiceBroker } = require("moleculer"); +const QueueMixin = require("../../index"); + +let broker = new ServiceBroker({ + logger: console, + transporter: "TCP", +}); + +const queueMixin = QueueMixin({ + connection: "amqp://localhost", + asyncActions: true, // Enable auto generate .async version for actions +}); + +broker.createService({ + name: "publisher", + version: 1, + + mixins: [ + queueMixin, + ], + + settings: { + amqp: { + connection: "amqp://localhost", // You can also override setting from service setting + }, + }, + + async started() { + await broker.waitForServices({ name: "consumer", version: 1 }); + + let name = "repeat_name"; + setInterval(async () => { + const response = await broker.call("v1.consumer.hello.async", { + // `params` is the real param will be passed to original action + params: { + name, + }, + // `options` is the real options will be passed to original action + options: { + timeout: 12000, + }, + }); + this.logger.info(`[PUBLISHER] PID: ${process.pid} Called job with name=${name} response=${JSON.stringify(response)}`); + }, 500); + } +}); + +broker.start().then(() => { + broker.repl(); +}); diff --git a/src/index.js b/src/index.js index 254c83a..3f79ceb 100644 --- a/src/index.js +++ b/src/index.js @@ -56,16 +56,36 @@ const ACTION_OPTIONS_VALIDATOR = { optional: true, }; +const gracefulShutdown = async function () { + isShuttingDown = true; + + Object.keys(this.$amqpQueues).forEach((queueName) => { + const { channel, options } = this.$amqpQueues[queueName] || {}; + channel && channel.cancel(Deep(options, "amqp.consume.consumerTag")); + }); +}; + +const closeConnection = async function () { + Object.keys(this.$amqpQueues).forEach((queueName) => { + const { channel } = this.$amqpQueues[queueName]; + channel && channel.close(); + }); + + this.$amqpConnection && await this.$amqpConnection.close(); +}; + const initAMQPQueues = function (schema) { Object.keys(schema.actions || {}).forEach((originActionName) => { if (schema.actions[originActionName] && schema.actions[originActionName].queue) { const queueName = `amqp.${schema.version ? `v${schema.version}.` : ""}${schema.name}.${originActionName}`; const queueOption = MergeDeep({}, schema.actions[originActionName].queue, DEFAULT_QUEUE_OPTIONS); + Deep(queueOption, "amqp.consume.consumerTag", Crypto.randomBytes(16).toString("hex")); - this.$amqpOptions[queueName] = { + this.$amqpQueues[queueName] = { options: queueOption, async consumeHandler(channel, msg) { + messagesBeingProcessed++; const { retry: retryOptions, } = queueOption; @@ -117,6 +137,11 @@ const initAMQPQueues = function (schema) { } this.logger.error(error); + } finally { + messagesBeingProcessed--; + if (isShuttingDown && messagesBeingProcessed == 0) { + await closeConnection.call(this); + } } }, }; @@ -154,7 +179,7 @@ const initAMQPActions = function (schema) { }, params: asyncParams, async handler(ctx) { - const dedupeHash = Deep(this.$amqpOptions, [queueName, "options", "dedupHash"]); + const dedupeHash = Deep(this.$amqpQueues, [queueName, "options", "dedupHash"]); const headers = ctx.headers || {}; if (typeof dedupeHash === "number" || typeof dedupeHash === "string") { headers["x-deduplication-header"] = String(dedupeHash); @@ -181,22 +206,25 @@ const initAMQPActions = function (schema) { return schema; }; +let isShuttingDown = false; +let messagesBeingProcessed = 0; + module.exports = (options) => ({ name: "moleculer-rabbitmq", methods: { async assertAMQPQueue(queueName) { - if (!this.$amqpQueues[queueName]) { + if (!Deep(this.$amqpQueues, [queueName, "channel"])) { const { options: { amqp: amqpOptions = {}, } = {}, - } = this.$amqpOptions[queueName] || {}; + } = this.$amqpQueues[queueName] || {}; try { const channel = await this.$amqpConnection.createChannel(); channel.on("close", () => { - delete this.$amqpQueues[queueName]; + Deep(this.$amqpQueues, [queueName, "channel"], null); }); channel.on("error", (err) => { this.logger.error(err); @@ -213,14 +241,14 @@ module.exports = (options) => ({ await channel.bindQueue(queueName, `${queueName}.retry`, queueName); } - this.$amqpQueues[queueName] = channel; + Deep(this.$amqpQueues, [queueName, "channel"], channel); } catch (err) { this.logger.error(err); throw new MoleculerError("Unable to start queue"); } } - return this.$amqpQueues[queueName]; + return Deep(this.$amqpQueues, [queueName, "channel"]); }, async sendAMQPMessage(name, message, options) { @@ -230,11 +258,13 @@ module.exports = (options) => ({ }, async initAMQPConsumers() { - Object.entries(this.$amqpOptions).forEach(async ([queueName, queueOption]) => { + Object.entries(this.$amqpQueues).forEach(async ([queueName, queue]) => { const { consumeHandler, - consume: consumeOptions, - } = queueOption; + options: { + consume: consumeOptions, + } = {}, + } = queue; const amqpChannel = await this.assertAMQPQueue(queueName); amqpChannel.consume(queueName, consumeHandler.bind(this, amqpChannel), consumeOptions); @@ -245,7 +275,6 @@ module.exports = (options) => ({ merged(schema) { this.$amqpConnection = null; this.$amqpQueues = {}; - this.$amqpOptions = {}; if (!schema.settings) { schema.settings = {}; @@ -277,6 +306,11 @@ module.exports = (options) => ({ throw new MoleculerError("Unable to connect to AMQP"); } + process.on("SIGTERM", gracefulShutdown.bind(this)); + process.on("SIGINT", gracefulShutdown.bind(this)); + process.on("SIGUSR1", gracefulShutdown.bind(this)); + process.on("SIGUSR2", gracefulShutdown.bind(this)); + try { await this.initAMQPConsumers(); } catch (ex) { From b6c9672b9a77514c1405ac13a71394c55413a620 Mon Sep 17 00:00:00 2001 From: Tai NA Date: Fri, 16 Apr 2021 20:30:49 +0700 Subject: [PATCH 3/3] Release v1.1.0 --- CHANGELOG.md | 8 +++++++- README.md | 5 +++-- .../{gracefull_shutdown => graceful_shutdown}/consumer.js | 0 .../publisher.js | 0 package.json | 2 +- 5 files changed, 11 insertions(+), 4 deletions(-) rename examples/{gracefull_shutdown => graceful_shutdown}/consumer.js (100%) rename examples/{gracefull_shutdown => graceful_shutdown}/publisher.js (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index e65b1dc..673e14d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,10 @@ -## v1.0.0 +## v1.1.0 +- [Add] + - Support moleculer graceful shutdown using serviceBroker.tracking option +- [Fix] + - Fix critical bug that cause queue not working on v1.0.0 + +## v1.0.0 (DO NOT USE) - [Breaking Change] - Queues in RabbitMQ created by v0.1.0 must be re-create because retry/deduplication feature cause queue configuration changed - Rename configuration to make it more relevant with amqplib configuration, please follow README to validate your configuration diff --git a/README.md b/README.md index 0d8fa6f..4ca62cf 100644 --- a/README.md +++ b/README.md @@ -214,12 +214,13 @@ Take a look at [examples](examples) folder for more examples - [Simple example](examples/simple) : Example for basic usage - [Retry example](examples/retry) : Example with retry logic - [Deduplication example](examples/deduplication) : Example with deduplicate message feature +- [Graceful shutdown example](examples/graceful_shutdown) : Example with moleculer graceful shutdown feature -# Roadmap +# Checklist - [x] Implement retry logic for rabbitmq queue - [x] Allow deduplicate message -- [ ] Graceful shutdown queue +- [X] Graceful shutdown queue - [ ] Test & Coverage # License diff --git a/examples/gracefull_shutdown/consumer.js b/examples/graceful_shutdown/consumer.js similarity index 100% rename from examples/gracefull_shutdown/consumer.js rename to examples/graceful_shutdown/consumer.js diff --git a/examples/gracefull_shutdown/publisher.js b/examples/graceful_shutdown/publisher.js similarity index 100% rename from examples/gracefull_shutdown/publisher.js rename to examples/graceful_shutdown/publisher.js diff --git a/package.json b/package.json index 1acbc49..fd1ba51 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "moleculer-rabbitmq", - "version": "1.0.0", + "version": "1.1.0", "description": "Moleculer RabbitMQ queue plugin", "main": "index.js", "scripts": {