Skip to content

Commit

Permalink
Release v1.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Tai NA committed Apr 16, 2021
2 parents 4412174 + b6c9672 commit c1d50c1
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 16 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
## v1.0.0
## v1.1.0
- [Add]
- Support moleculer graceful shutdown using serviceBroker.tracking option
- [Fix]
- Fix critical bug that cause queue not working on v1.0.0

## v1.0.0 (DO NOT USE)
- [Breaking Change]
- Queues in RabbitMQ created by v0.1.0 must be re-create because retry/deduplication feature cause queue configuration changed
- Rename configuration to make it more relevant with amqplib configuration, please follow README to validate your configuration
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,13 @@ Take a look at [examples](examples) folder for more examples
- [Simple example](examples/simple) : Example for basic usage
- [Retry example](examples/retry) : Example with retry logic
- [Deduplication example](examples/deduplication) : Example with deduplicate message feature
- [Graceful shutdown example](examples/graceful_shutdown) : Example with moleculer graceful shutdown feature

# Roadmap
# Checklist

- [x] Implement retry logic for rabbitmq queue
- [x] Allow deduplicate message
- [ ] Graceful shutdown queue
- [X] Graceful shutdown queue
- [ ] Test & Coverage

# License
Expand Down
67 changes: 67 additions & 0 deletions examples/graceful_shutdown/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
const { ServiceBroker } = require("moleculer");
const QueueMixin = require("../../index");

let broker = new ServiceBroker({
logger: console,
transporter: "TCP",
tracking: { // Enable moleculer graceful shutdown
enabled: true,
shutdownTimeout: 30 * 1000
}
});

const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
});

broker.createService({
name: "consumer",
version: 1,

mixins: [
queueMixin,
],

settings: {
amqp: {
connection: "amqp://localhost", // You can also override setting from service setting
},
},

actions: {
hello: {
queue: { // Enable queue for this action
// Options for AMQP queue
amqp: {
queueAssert: {
durable: true,
},
consume: {
noAck: false,
},
prefetch: 0,
},
dedupHash: (ctx) => {
return ctx.params.name;
},
},
params: {
name: "string|convert:true|empty:false",
},
async handler(ctx) {
this.logger.info(`[CONSUMER] PID: ${process.pid} Received job with name=${ctx.params.name}`);
return new Promise((resolve) => {
setTimeout(() => {
this.logger.info(`[CONSUMER] PID: ${process.pid} Processed job with name=${ctx.params.name}`);
return resolve(`hello ${ctx.params.name}`);
}, 3000); // Simulate slow task
});
},
},
},
});

broker.start().then(() => {
broker.repl();
});
50 changes: 50 additions & 0 deletions examples/graceful_shutdown/publisher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
const { ServiceBroker } = require("moleculer");
const QueueMixin = require("../../index");

let broker = new ServiceBroker({
logger: console,
transporter: "TCP",
});

const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
});

broker.createService({
name: "publisher",
version: 1,

mixins: [
queueMixin,
],

settings: {
amqp: {
connection: "amqp://localhost", // You can also override setting from service setting
},
},

async started() {
await broker.waitForServices({ name: "consumer", version: 1 });

let name = "repeat_name";
setInterval(async () => {
const response = await broker.call("v1.consumer.hello.async", {
// `params` is the real param will be passed to original action
params: {
name,
},
// `options` is the real options will be passed to original action
options: {
timeout: 12000,
},
});
this.logger.info(`[PUBLISHER] PID: ${process.pid} Called job with name=${name} response=${JSON.stringify(response)}`);
}, 500);
}
});

broker.start().then(() => {
broker.repl();
});
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "moleculer-rabbitmq",
"version": "1.0.0",
"version": "1.1.0",
"description": "Moleculer RabbitMQ queue plugin",
"main": "index.js",
"scripts": {
Expand Down
58 changes: 46 additions & 12 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,36 @@ const ACTION_OPTIONS_VALIDATOR = {
optional: true,
};

const gracefulShutdown = async function () {
isShuttingDown = true;

Object.keys(this.$amqpQueues).forEach((queueName) => {
const { channel, options } = this.$amqpQueues[queueName] || {};
channel && channel.cancel(Deep(options, "amqp.consume.consumerTag"));
});
};

const closeConnection = async function () {
Object.keys(this.$amqpQueues).forEach((queueName) => {
const { channel } = this.$amqpQueues[queueName];
channel && channel.close();
});

this.$amqpConnection && await this.$amqpConnection.close();
};

const initAMQPQueues = function (schema) {
Object.keys(schema.actions || {}).forEach((originActionName) => {
if (schema.actions[originActionName] && schema.actions[originActionName].queue) {
const queueName = `amqp.${schema.version ? `v${schema.version}.` : ""}${schema.name}.${originActionName}`;

const queueOption = MergeDeep({}, schema.actions[originActionName].queue, DEFAULT_QUEUE_OPTIONS);
Deep(queueOption, "amqp.consume.consumerTag", Crypto.randomBytes(16).toString("hex"));

this.$amqpOptions[queueName] = {
this.$amqpQueues[queueName] = {
options: queueOption,
async consumeHandler(channel, msg) {
messagesBeingProcessed++;
const {
retry: retryOptions,
} = queueOption;
Expand Down Expand Up @@ -117,6 +137,11 @@ const initAMQPQueues = function (schema) {
}

this.logger.error(error);
} finally {
messagesBeingProcessed--;
if (isShuttingDown && messagesBeingProcessed == 0) {
await closeConnection.call(this);
}
}
},
};
Expand Down Expand Up @@ -145,6 +170,7 @@ const initAMQPActions = function (schema) {
};
}

const queueName = `amqp.${schema.version ? `v${schema.version}.` : ""}${schema.name}.${actionName}`;
schema.actions[`${actionName}.async`] = {
timeout: 10000,
retryPolicy: {
Expand All @@ -153,8 +179,7 @@ const initAMQPActions = function (schema) {
},
params: asyncParams,
async handler(ctx) {
const queueName = `amqp.${ctx.params.action}`;
const dedupeHash = Deep(this.$amqpOptions, [queueName, "options", "dedupHash"]);
const dedupeHash = Deep(this.$amqpQueues, [queueName, "options", "dedupHash"]);
const headers = ctx.headers || {};
if (typeof dedupeHash === "number" || typeof dedupeHash === "string") {
headers["x-deduplication-header"] = String(dedupeHash);
Expand All @@ -181,22 +206,25 @@ const initAMQPActions = function (schema) {
return schema;
};

let isShuttingDown = false;
let messagesBeingProcessed = 0;

module.exports = (options) => ({
name: "moleculer-rabbitmq",

methods: {
async assertAMQPQueue(queueName) {
if (!this.$amqpQueues[queueName]) {
if (!Deep(this.$amqpQueues, [queueName, "channel"])) {
const {
options: {
amqp: amqpOptions = {},
} = {},
} = this.$amqpOptions[queueName] || {};
} = this.$amqpQueues[queueName] || {};

try {
const channel = await this.$amqpConnection.createChannel();
channel.on("close", () => {
delete this.$amqpQueues[queueName];
Deep(this.$amqpQueues, [queueName, "channel"], null);
});
channel.on("error", (err) => {
this.logger.error(err);
Expand All @@ -213,14 +241,14 @@ module.exports = (options) => ({
await channel.bindQueue(queueName, `${queueName}.retry`, queueName);
}

this.$amqpQueues[queueName] = channel;
Deep(this.$amqpQueues, [queueName, "channel"], channel);
} catch (err) {
this.logger.error(err);
throw new MoleculerError("Unable to start queue");
}
}

return this.$amqpQueues[queueName];
return Deep(this.$amqpQueues, [queueName, "channel"]);
},

async sendAMQPMessage(name, message, options) {
Expand All @@ -230,11 +258,13 @@ module.exports = (options) => ({
},

async initAMQPConsumers() {
Object.entries(this.$amqpOptions).forEach(async ([queueName, queueOption]) => {
Object.entries(this.$amqpQueues).forEach(async ([queueName, queue]) => {
const {
consumeHandler,
consume: consumeOptions,
} = queueOption;
options: {
consume: consumeOptions,
} = {},
} = queue;

const amqpChannel = await this.assertAMQPQueue(queueName);
amqpChannel.consume(queueName, consumeHandler.bind(this, amqpChannel), consumeOptions);
Expand All @@ -245,7 +275,6 @@ module.exports = (options) => ({
merged(schema) {
this.$amqpConnection = null;
this.$amqpQueues = {};
this.$amqpOptions = {};

if (!schema.settings) {
schema.settings = {};
Expand Down Expand Up @@ -277,6 +306,11 @@ module.exports = (options) => ({
throw new MoleculerError("Unable to connect to AMQP");
}

process.on("SIGTERM", gracefulShutdown.bind(this));
process.on("SIGINT", gracefulShutdown.bind(this));
process.on("SIGUSR1", gracefulShutdown.bind(this));
process.on("SIGUSR2", gracefulShutdown.bind(this));

try {
await this.initAMQPConsumers();
} catch (ex) {
Expand Down

0 comments on commit c1d50c1

Please sign in to comment.