Skip to content

Commit

Permalink
fix(service-broker): Awaiting ctx.emit behave differently locally and…
Browse files Browse the repository at this point in the history
… remotely issue moleculerjs#1065
  • Loading branch information
Anton-Burdin committed May 31, 2022
1 parent 452cbf3 commit 0ff0f9d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ module.exports = {
FAILED_SEND_PONG_PACKET: "failedSendPongPacket",
/** @type {String} Emitted when transit fails to send a HEARTBEAT packet*/
FAILED_SEND_HEARTBEAT_PACKET: "failedSendHeartbeatPacket",
/** @type {String} Emitted when broker fails to handler balanced event*/
FAILED_HANDLER_BALANCED_EVENT: "failedHandlerBalancedEvent",
/** @type {String} Emitted when broker fails to handler broadcast event*/
FAILED_HANDLER_BROADCAST_EVENT: "failedHandlerBroadcastEvent",
/** @type {String} Emitted when broker fails to stop all services*/
FAILED_STOPPING_SERVICES: "failedServicesStop",
/** @type {String} Emitted when broker fails to stop all services*/
Expand Down
31 changes: 29 additions & 2 deletions src/service-broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,7 @@ class ServiceBroker {
if (opts.groups && !Array.isArray(opts.groups)) opts.groups = [opts.groups];

const promises = [];
const localHandlers = [];

const ctx = this.ContextFactory.create(this, null, payload, opts);
ctx.eventName = eventName;
Expand Down Expand Up @@ -1384,7 +1385,7 @@ class ServiceBroker {
if (ep.id === this.nodeID) {
// Local service, call handler
const newCtx = ctx.copy(ep);
promises.push(this.registry.events.callEventHandler(newCtx));
localHandlers.push(this.registry.events.callEventHandler(newCtx));
} else {
// Remote service
const e = groupedEP[ep.id];
Expand All @@ -1406,6 +1407,20 @@ class ServiceBroker {
});
}

// invoke local handlers
setImmediate(() =>
Promise.allSettled(localHandlers).then(results => {
results
.filter(r => r.status === "rejected")
.forEach(({ reason: error }) =>
this.broadcastLocal("$broker.error", {
error,
module: "broker",
type: C.FAILED_HANDLER_BALANCED_EVENT
})
);
})
);
return this.Promise.all(promises);
} else if (this.transit) {
// Disabled balancer case
Expand Down Expand Up @@ -1488,7 +1503,19 @@ class ServiceBroker {
}

// Send to local services
promises.push(this.broadcastLocal(eventName, payload, opts));
setImmediate(() =>
this.Promise.resolve()
.then(() => this.broadcastLocal(eventName, payload, opts))
.catch(error =>
this.broadcastLocal("$broker.error", {
error,
module: "broker",
type: C.FAILED_HANDLER_BROADCAST_EVENT
})
)
// catch unresolved error
.catch(err => this.logger.error(err))
);

return this.Promise.all(promises);
}
Expand Down

0 comments on commit 0ff0f9d

Please sign in to comment.