Skip to content

Commit

Permalink
Merge pull request eclipse-thingweb#1272 from relu91/refactor-binding…
Browse files Browse the repository at this point in the history
…-mqtt

refactor(binding-mqtt/mqtt-broker-server): move listening to start method
  • Loading branch information
relu91 authored Apr 24, 2024
2 parents 786fde6 + 2ae75f7 commit f32ea33
Showing 1 changed file with 64 additions and 51 deletions.
115 changes: 64 additions & 51 deletions packages/binding-mqtt/src/mqtt-broker-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,6 @@ export default class MqttBrokerServer implements ProtocolServer {
}

this.brokerURI = config.uri;

const selfHost = config.selfHost ?? false;
if (selfHost) {
this.hostedServer = Server({});
let server;
if (config.key) {
server = tls.createServer({ key: config.key, cert: config.cert }, this.hostedServer.handle);
} else {
server = net.createServer(this.hostedServer.handle);
}
const parsed = new url.URL(this.brokerURI);
const port = parseInt(parsed.port);
this.port = port > 0 ? port : 1883;
this.hostedBroker = server.listen(port, parsed.hostname);
this.hostedServer.authenticate = this.selfHostAuthentication.bind(this);
}
}

public async expose(thing: ExposedThing): Promise<void> {
Expand Down Expand Up @@ -374,44 +358,42 @@ export default class MqttBrokerServer implements ProtocolServer {
return removedThing !== undefined;
}

public start(servient: Servient): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (this.brokerURI === undefined) {
warn(`No broker defined for MQTT server binding - skipping`);
resolve();
public async start(servient: Servient): Promise<void> {
if (this.brokerURI === undefined) {
warn(`No broker defined for MQTT server binding - skipping`);
} else {
const selfHost = this.config.selfHost ?? false;
if (selfHost) {
await this.startBroker();
}
// try to connect to the broker without or with credentials
if (this.config.psw === undefined) {
debug(`MqttBrokerServer trying to connect to broker at ${this.brokerURI}`);
} else if (this.config.clientId === undefined) {
debug(`MqttBrokerServer trying to connect to secured broker at ${this.brokerURI}`);
} else if (this.config.protocolVersion === undefined) {
debug(
`MqttBrokerServer trying to connect to secured broker at ${this.brokerURI} with client ID ${this.config.clientId}`
);
} else {
// try to connect to the broker without or with credentials
if (this.config.psw === undefined) {
debug(`MqttBrokerServer trying to connect to broker at ${this.brokerURI}`);
} else if (this.config.clientId === undefined) {
debug(`MqttBrokerServer trying to connect to secured broker at ${this.brokerURI}`);
} else if (this.config.protocolVersion === undefined) {
debug(
`MqttBrokerServer trying to connect to secured broker at ${this.brokerURI} with client ID ${this.config.clientId}`
);
} else {
debug(
`MqttBrokerServer trying to connect to secured broker at ${this.brokerURI} with client ID ${this.config.clientId}`
);
}
debug(
`MqttBrokerServer trying to connect to secured broker at ${this.brokerURI} with client ID ${this.config.clientId}`
);
}

try {
this.broker = await mqtt.connectAsync(this.brokerURI, this.config);
info(`MqttBrokerServer connected to broker at ${this.brokerURI}`);

this.broker = mqtt.connect(this.brokerURI, this.config);

this.broker.on("connect", () => {
info(`MqttBrokerServer connected to broker at ${this.brokerURI}`);

const parsed = new url.URL(this.brokerURI);
this.address = parsed.hostname;
const port = parseInt(parsed.port);
this.port = port > 0 ? port : 1883;
resolve();
});
this.broker.on("error", (err: Error) => {
error(`MqttBrokerServer could not connect to broker at ${this.brokerURI}`);
reject(err);
});
const parsed = new url.URL(this.brokerURI);
this.address = parsed.hostname;
const port = parseInt(parsed.port);
this.port = port > 0 ? port : 1883;
} catch (err) {
error(`MqttBrokerServer could not connect to broker at ${this.brokerURI}`);
throw err;
}
});
}
}

public async stop(): Promise<void> {
Expand Down Expand Up @@ -463,4 +445,35 @@ export default class MqttBrokerServer implements ProtocolServer {
}
done(null, true);
}

private async startBroker() {
return new Promise<void>((resolve, reject) => {
this.hostedServer = Server({});
let server: tls.Server | net.Server;
if (this.config.key) {
server = tls.createServer({ key: this.config.key, cert: this.config.cert }, this.hostedServer.handle);
} else {
server = net.createServer(this.hostedServer.handle);
}
const parsed = new url.URL(this.brokerURI);
const port = parseInt(parsed.port);

this.port = port > 0 ? port : 1883;
this.hostedServer.authenticate = this.selfHostAuthentication.bind(this);

const errorListener = (err: Error) => {
error(`error listening for ${this.brokerURI}, ${err}`);
reject(err);
};
server.once("error", errorListener);

debug(`MqttBrokerServer creating server for ${this.brokerURI}`);
this.hostedBroker = server.listen(port, parsed.hostname, () => {
debug(`MqttBrokerServer listening ${this.brokerURI}`);
// clean up listener if not called
server.removeListener("error", errorListener);
resolve();
});
});
}
}

0 comments on commit f32ea33

Please sign in to comment.