Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Getting Keepalive timeout when using client.handleMessage #1935

Open
vivek-doshi-genea opened this issue Sep 5, 2024 · 11 comments
Open
Labels

Comments

@vivek-doshi-genea
Copy link

vivek-doshi-genea commented Sep 5, 2024

MQTTjs Version

5

Broker

AWS IoT Core

Environment

NodeJS

Description

I've created 2 files Publisher.js and Subscriber.js

Publisher.js : Publish 1 message per second on topic iot-device/events

Subscriber.js : Subscribed to topic iot-device/events, and get message via handleMessage also added 2 sec await to mock DB or Third party API call process

Now when I run both the files, publisher keeps publishing the messages without any issue.
Issues

  • subscriber throws an error of Keepalive timeout after ~1 min of execution
  • afterwards it's getting messages out of sequence
  • it seems starting a new subscription and also keeping the old one, due to that logging messages twice a speed

The issue is not random, it appears every time.

Minimal Reproduction

Node js version : v20.17.0
NPM version : 10.8.2
mqtt npm: 5.10.1
Certificates to connect to broker

Steps :

  • create below files
  • Open 2 terminals,
  • In 1st terminal, run node Publisher.js
  • In 2nd terminal, run node Subscriber.js
  • Wait for 2 mins, it should throw Keepalive timeout and all the above points mentioned

Code for both files,

Publisher.js

var mqtt = require('mqtt');
const PUBLISH_TOPIC = 'iot-device/events'

const client = mqtt.connect('mqtts://xxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com:8883', {
    name: 'mqtt_connection_intrusion_event_processor-2',
    clientId: 'iot-publisher',
    key: mqtt_obj.private_key,
    cert: mqtt_obj.certificate_pem,
    clean: false,
    protocolVersion: 5,
    debug: true,
});

var messageCount = 0;
var maxMessages = 1;
var interval = 1000;

client.on('connect', function () {
    console.log('Connected to MQTT broker');
    setInterval(function () {
        for (var i = 0; i < maxMessages; i++) {
            client.publish(PUBLISH_TOPIC, JSON.stringify({ messageCount }),  { qos : 1 } , (err) => {
                if (err) {
                    console.error('Publish error:', err);
                    return reject(err);
                }
            });
            messageCount++;
            console.log('Published message:', {messageCount});
        }
    }, interval);
});

client.on('error', (error) => {
    console.error('Error:', error);
});

client.on('close', (err) => {
    console.log('Connection to MQTT broker closed');
});

Subscriber.js

const  mqtt = require('mqtt');
const wait = waitTime => new Promise(resolve => setTimeout(resolve, waitTime)); 
const SUB_TOPIC = 'iot-device/events'

const client = mqtt.connect('mqtts://xxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com:8883', {
    name: 'mqtt_connection_intrusion_event_processor-1',
    clientId: 'iot-subscriber',
    key: mqtt_obj.private_key,
    cert: mqtt_obj.certificate_pem,
    clean: false,
    protocolVersion: 5,
    debug: true,
});

client.handleMessage = async (packet, callback) => {    
    console.log('Received message:', packet.payload.toString());
    await wait(2000); // fake wait of 2 sec for real 
    callback();
};

client.on('connect', function () {
    client.subscribe(SUB_TOPIC, function (err) {
        if (err) {
            console.error('Error:', err);
        } else {
            console.log('Connected to MQTT broker');
        }
    });
});

client.on('error', (error) => {
    console.error('Error:', error);
});

client.on('close', (err) => {
    console.log('Connection to MQTT broker closed');
});

Debug logs

Publisher Logs :

Connected to MQTT broker
Published message: { messageCount: 1 }
Published message: { messageCount: 2 }
Published message: { messageCount: 3 }
Published message: { messageCount: 4 }
Published message: { messageCount: 5 }
Published message: { messageCount: 6 }
Published message: { messageCount: 7 }
Published message: { messageCount: 8 }
Published message: { messageCount: 9 }
Published message: { messageCount: 10 }
Published message: { messageCount: 11 }
Published message: { messageCount: 12 }
Published message: { messageCount: 13 }
Published message: { messageCount: 14 }
Published message: { messageCount: 15 }
Published message: { messageCount: 16 }
Published message: { messageCount: 17 }
Published message: { messageCount: 18 }
Published message: { messageCount: 19 }
Published message: { messageCount: 20 }
Published message: { messageCount: 21 }
Published message: { messageCount: 22 }
Published message: { messageCount: 23 }
Published message: { messageCount: 24 }
Published message: { messageCount: 25 }
Published message: { messageCount: 26 }
Published message: { messageCount: 27 }
Published message: { messageCount: 28 }
Published message: { messageCount: 29 }
Published message: { messageCount: 30 }
Published message: { messageCount: 31 }
Published message: { messageCount: 32 }
Published message: { messageCount: 33 }
Published message: { messageCount: 34 }
Published message: { messageCount: 35 }
Published message: { messageCount: 36 }
Published message: { messageCount: 37 }
Published message: { messageCount: 38 }
Published message: { messageCount: 39 }
Published message: { messageCount: 40 }
Published message: { messageCount: 41 }
Published message: { messageCount: 42 }
Published message: { messageCount: 43 }
Published message: { messageCount: 44 }
Published message: { messageCount: 45 }
Published message: { messageCount: 46 }
Published message: { messageCount: 47 }
Published message: { messageCount: 48 }
Published message: { messageCount: 49 }
Published message: { messageCount: 50 }
Published message: { messageCount: 51 }
Published message: { messageCount: 52 }
Published message: { messageCount: 53 }
Published message: { messageCount: 54 }
Published message: { messageCount: 55 }
Published message: { messageCount: 56 }
Published message: { messageCount: 57 }
Published message: { messageCount: 58 }
Published message: { messageCount: 59 }
Published message: { messageCount: 60 }
Published message: { messageCount: 61 }
Published message: { messageCount: 62 }
Published message: { messageCount: 63 }
Published message: { messageCount: 64 }
Published message: { messageCount: 65 }
Published message: { messageCount: 66 }
Published message: { messageCount: 67 }
Published message: { messageCount: 68 }
Published message: { messageCount: 69 }
Published message: { messageCount: 70 }
Published message: { messageCount: 71 }
Published message: { messageCount: 72 }
Published message: { messageCount: 73 }
Published message: { messageCount: 74 }
Published message: { messageCount: 75 }
Published message: { messageCount: 76 }
Published message: { messageCount: 77 }
Published message: { messageCount: 78 }
Published message: { messageCount: 79 }
Published message: { messageCount: 80 }
Published message: { messageCount: 81 }
Published message: { messageCount: 82 }
Published message: { messageCount: 83 }
Published message: { messageCount: 84 }
Published message: { messageCount: 85 }
Published message: { messageCount: 86 }
Published message: { messageCount: 87 }
Published message: { messageCount: 88 }
Published message: { messageCount: 89 }
Published message: { messageCount: 90 }
Published message: { messageCount: 91 }
Published message: { messageCount: 92 }
Published message: { messageCount: 93 }
Published message: { messageCount: 94 }
Published message: { messageCount: 95 }
Published message: { messageCount: 96 }
Published message: { messageCount: 97 }
Published message: { messageCount: 98 }
Published message: { messageCount: 99 }
Published message: { messageCount: 100 }
Published message: { messageCount: 101 }

Subscriber Logs :

Connected to MQTT broker
Received message: {"messageCount":0}
Received message: {"messageCount":1}
Received message: {"messageCount":2}
Received message: {"messageCount":3}
Received message: {"messageCount":4}
Received message: {"messageCount":5}
Received message: {"messageCount":6}
Received message: {"messageCount":7}
Received message: {"messageCount":8}
Received message: {"messageCount":9}
Received message: {"messageCount":10}
Received message: {"messageCount":11}
Received message: {"messageCount":12}
Received message: {"messageCount":13}
Received message: {"messageCount":14}
Received message: {"messageCount":15}
Received message: {"messageCount":16}
Received message: {"messageCount":17}
Received message: {"messageCount":18}
Received message: {"messageCount":19}
Received message: {"messageCount":20}
Received message: {"messageCount":21}
Received message: {"messageCount":22}
Received message: {"messageCount":23}
Received message: {"messageCount":24}
Received message: {"messageCount":25}
Received message: {"messageCount":26}
Received message: {"messageCount":27}
Received message: {"messageCount":28}
Received message: {"messageCount":29}
Received message: {"messageCount":30}
Received message: {"messageCount":31}
Received message: {"messageCount":32}
Received message: {"messageCount":33}
Received message: {"messageCount":34}
Received message: {"messageCount":35}
Received message: {"messageCount":36}
Received message: {"messageCount":37}
Received message: {"messageCount":38}
Received message: {"messageCount":39}
Received message: {"messageCount":40}
Received message: {"messageCount":41}
Received message: {"messageCount":42}
Received message: {"messageCount":43}
Received message: {"messageCount":44}
Error: Error: Keepalive timeout
    at MqttClient.onKeepaliveTimeout (D:\event\node_modules\mqtt\build\lib\client.js:1049:28)     
    at Timeout._onTimeout (D:\event\node_modules\mqtt\build\lib\KeepaliveManager.js:67:29)        
    at listOnTimeout (node:internal/timers:581:17)
    at process.processTimers (node:internal/timers:519:7)
Connection to MQTT broker closed
Connected to MQTT broker
Received message: {"messageCount":45}
Received message: {"messageCount":90}
Received message: {"messageCount":46}
Received message: {"messageCount":91}
Received message: {"messageCount":47}
Received message: {"messageCount":92}
Received message: {"messageCount":48}
Received message: {"messageCount":93}
Received message: {"messageCount":49}
Received message: {"messageCount":94}
Received message: {"messageCount":50}
Received message: {"messageCount":95}
Received message: {"messageCount":51}
@robertsLando
Copy link
Member

Hi @vivek-doshi-genea and thanks for your issue, would you like to submit a PR to fix this bug?

Also could you try to use a QoS > 0 in publish and subscribe and see if the issue happens anyway?

I have a feel the problem is that we are not shifting ping when only receiving messages and so keepalive timer triggers:

handlePublish(client, packet, done)
and also the pingreq packet is not sent due to handleMessage blocking the execution

@vivek-doshi-genea
Copy link
Author

vivek-doshi-genea commented Sep 5, 2024

Yes @robertsLando , I would love to contribute, but I am not the expert in this, just trying to get better understanding on all of this at the moment.
And yes, I am already using { qos : 1 } in Publisher.js,
And unfortunately AWS IoT Core Mqtt Broker doesn't support { qos: 2} so this is what I can go max upto.

I was going through the other issues didn't get much understanding from that but I also felt that it might be something related to pingreq.

@robertsLando
Copy link
Member

@vivek-doshi-genea You could try to send pingreq your own maybe? Just use client.sendPing

@robertsLando
Copy link
Member

robertsLando commented Sep 5, 2024

Lot of work as been done in order to improve keepalive management. The last rework introduced a keepalive manager that works following the specs:

const keepAliveTimeout = Math.ceil(this._keepalive * 1.5)
this._keepaliveTimeoutTimestamp = Date.now() + keepAliveTimeout
this._intervalEvery = Math.ceil(this._keepalive / 2)
this.timerId = this.timer.set(() => {
// this should never happen, but just in case
if (this.destroyed) {
return
}
this.counter += 1
// after keepalive seconds, send a pingreq
if (this.counter === 2) {
this.client.sendPing()
} else if (this.counter > 2) {
this.client.onKeepaliveTimeout()
}
}, this._intervalEvery)

In poor words giving the keepalive you pass to the client, it creates an interval every keepalive / 2 seconds. So every two iterations of the interval it sends a pingreq, if it does 3 iterations without receiving any pingresp (or other messages, see reschedulePing) it triggers the keepalive timeout. I'm pretty sure that this only happens due to the usage of handleMessage, usiong the message event should not cause this

@vivek-doshi-genea
Copy link
Author

I've added the client.sendPing(); in handleMessage , and it's working smoothly, no errors.

client.handleMessage = async (packet, callback) => {
    client.sendPing(); // <------------------ HERE -------------------
    console.log('Received message:', packet.payload.toString());
    await wait(2000);
    callback();
};

Any idea about this one ? this was happening after error, I am afraid if any other error might lead to this behaviour

  • it seems starting a new subscription and also keeping the old one, due to that logging messages with twice a speed

@vivek-doshi-genea
Copy link
Author

@robertsLando
I'm pretty sure that this only happens due to the usage of handleMessage, using the message event should not cause this

True, I've tested out with both the way
message : was working fine
handleMessage: facing issue with this method only

@robertsLando
Copy link
Member

@vivek-doshi-genea the solution is not ideal as increeses traffic a lot, could I ask you why not using the message event instead of handleMessage? Do you MUST ensure that the messages are processed one after the other? I mean handleMessage should be used just in case you must ensure you fiunished processing msg1 before processing msg2, otherwise message is what you should use as it will process messages the same way but you could receive the messages while you are processing another one. Also the fake delay of 2 seconds is just too high as it will slow down everything

@robertsLando
Copy link
Member

it seems starting a new subscription and also keeping the old one, due to that logging messages twice a speed

If you switch to the message event you should not face this

@vivek-doshi-genea
Copy link
Author

@robertsLando ,

Okay, let's take an example, of 1000 devices are generating an events, let's say 1 event per second per device , heavy sync operation are done for each messages like ,

  • each events need to be stored in DB
  • based on the event it might require few update on DB
  • publish some events / webhook

If we do all these processes with on message, it will just dump 1000 of messages per seconds
And it might chok up the DB or the third party services for webhook or push events if process all these thing in one shot.

Possible solution to resolve this might need to divert the messages to somewhere else like

  • Queue
  • Stream
  • DB

And then consume the messages directly from there.

@vivek-doshi-genea
Copy link
Author

@robertsLando ,

How this handleMessage working internally, ?
Does it queued up all the incoming messages in memory, and then process one by one,
if that's the case, when it does it send ACK for received message, after handleMessage calls callback function or when it was queued up in the memory?

Is there any document available for this?

Or

Is it using any Mqtt Broker feature?

@robertsLando
Copy link
Member

@vivek-doshi-genea The backpressure is done internally using nodejs streams.

I suggest you to firstly read nodejs stream docs to understand how stream works and then check this code:

this.stream = this.streamBuilder(this)

Firstly we build the stream based on the protocol, then we pipe that stream to a writeable stream so we can handle the write to this stream and parse the packet (using mqtt-packet parser) there, once the packet is parsed we call work that calls handlePacket where all the èacket handle logic happens. The handleMessage is called inside publiish and pubrel handler and it unpause the stream just when you call it's callback, otherwise the backpressure kicks in and no packet is passed to the writeable stream since it's still processing a packet (the queue is done automatically by nodejs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants