From 2ae75f7a3b22bbad6e0869e5d6019b9a0fde8054 Mon Sep 17 00:00:00 2001 From: reluc Date: Wed, 24 Apr 2024 11:11:03 +0200 Subject: [PATCH] refactor(binding-mqtt/mqtt-broker-server): move listening to start method Listening is an asynchronous interaction that might throw an error. It's better to defer it until we start the server. This should help also debugging possible errors when listening. --- .../binding-mqtt/src/mqtt-broker-server.ts | 115 ++++++++++-------- 1 file changed, 64 insertions(+), 51 deletions(-) diff --git a/packages/binding-mqtt/src/mqtt-broker-server.ts b/packages/binding-mqtt/src/mqtt-broker-server.ts index 58c06eb69..53361ff59 100644 --- a/packages/binding-mqtt/src/mqtt-broker-server.ts +++ b/packages/binding-mqtt/src/mqtt-broker-server.ts @@ -86,22 +86,6 @@ export default class MqttBrokerServer implements ProtocolServer { } this.brokerURI = config.uri; - - const selfHost = config.selfHost ?? false; - if (selfHost) { - this.hostedServer = Server({}); - let server; - if (config.key) { - server = tls.createServer({ key: config.key, cert: config.cert }, this.hostedServer.handle); - } else { - server = net.createServer(this.hostedServer.handle); - } - const parsed = new url.URL(this.brokerURI); - const port = parseInt(parsed.port); - this.port = port > 0 ? port : 1883; - this.hostedBroker = server.listen(port, parsed.hostname); - this.hostedServer.authenticate = this.selfHostAuthentication.bind(this); - } } public async expose(thing: ExposedThing): Promise { @@ -374,44 +358,42 @@ export default class MqttBrokerServer implements ProtocolServer { return removedThing !== undefined; } - public start(servient: Servient): Promise { - return new Promise((resolve, reject) => { - if (this.brokerURI === undefined) { - warn(`No broker defined for MQTT server binding - skipping`); - resolve(); + public async start(servient: Servient): Promise { + if (this.brokerURI === undefined) { + warn(`No broker defined for MQTT server binding - skipping`); + } else { + const selfHost = this.config.selfHost ?? false; + if (selfHost) { + await this.startBroker(); + } + // try to connect to the broker without or with credentials + if (this.config.psw === undefined) { + debug(`MqttBrokerServer trying to connect to broker at ${this.brokerURI}`); + } else if (this.config.clientId === undefined) { + debug(`MqttBrokerServer trying to connect to secured broker at ${this.brokerURI}`); + } else if (this.config.protocolVersion === undefined) { + debug( + `MqttBrokerServer trying to connect to secured broker at ${this.brokerURI} with client ID ${this.config.clientId}` + ); } else { - // try to connect to the broker without or with credentials - if (this.config.psw === undefined) { - debug(`MqttBrokerServer trying to connect to broker at ${this.brokerURI}`); - } else if (this.config.clientId === undefined) { - debug(`MqttBrokerServer trying to connect to secured broker at ${this.brokerURI}`); - } else if (this.config.protocolVersion === undefined) { - debug( - `MqttBrokerServer trying to connect to secured broker at ${this.brokerURI} with client ID ${this.config.clientId}` - ); - } else { - debug( - `MqttBrokerServer trying to connect to secured broker at ${this.brokerURI} with client ID ${this.config.clientId}` - ); - } + debug( + `MqttBrokerServer trying to connect to secured broker at ${this.brokerURI} with client ID ${this.config.clientId}` + ); + } + + try { + this.broker = await mqtt.connectAsync(this.brokerURI, this.config); + info(`MqttBrokerServer connected to broker at ${this.brokerURI}`); - this.broker = mqtt.connect(this.brokerURI, this.config); - - this.broker.on("connect", () => { - info(`MqttBrokerServer connected to broker at ${this.brokerURI}`); - - const parsed = new url.URL(this.brokerURI); - this.address = parsed.hostname; - const port = parseInt(parsed.port); - this.port = port > 0 ? port : 1883; - resolve(); - }); - this.broker.on("error", (err: Error) => { - error(`MqttBrokerServer could not connect to broker at ${this.brokerURI}`); - reject(err); - }); + const parsed = new url.URL(this.brokerURI); + this.address = parsed.hostname; + const port = parseInt(parsed.port); + this.port = port > 0 ? port : 1883; + } catch (err) { + error(`MqttBrokerServer could not connect to broker at ${this.brokerURI}`); + throw err; } - }); + } } public async stop(): Promise { @@ -463,4 +445,35 @@ export default class MqttBrokerServer implements ProtocolServer { } done(null, true); } + + private async startBroker() { + return new Promise((resolve, reject) => { + this.hostedServer = Server({}); + let server: tls.Server | net.Server; + if (this.config.key) { + server = tls.createServer({ key: this.config.key, cert: this.config.cert }, this.hostedServer.handle); + } else { + server = net.createServer(this.hostedServer.handle); + } + const parsed = new url.URL(this.brokerURI); + const port = parseInt(parsed.port); + + this.port = port > 0 ? port : 1883; + this.hostedServer.authenticate = this.selfHostAuthentication.bind(this); + + const errorListener = (err: Error) => { + error(`error listening for ${this.brokerURI}, ${err}`); + reject(err); + }; + server.once("error", errorListener); + + debug(`MqttBrokerServer creating server for ${this.brokerURI}`); + this.hostedBroker = server.listen(port, parsed.hostname, () => { + debug(`MqttBrokerServer listening ${this.brokerURI}`); + // clean up listener if not called + server.removeListener("error", errorListener); + resolve(); + }); + }); + } }