Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚨 Telegram order not preserved in MQTT message order #1714

Closed
Glodenox opened this issue Sep 28, 2022 · 11 comments
Closed

🚨 Telegram order not preserved in MQTT message order #1714

Glodenox opened this issue Sep 28, 2022 · 11 comments
Labels

Comments

@Glodenox
Copy link

Description

I've been running into warning messages in Home Assistant indicating that certain MQTT topics aren't strictly increasing all the time, as have several other people for at least a year. I'm not talking about the daily resets in some topics, but about an issue that pops up once every few minutes. It mostly seems to happen with dsmr/day-consumption/electricity1 and dsmr/day-consumption/electricity2 for me, but I've seen it in other topics as well. This is an issue, because you don't really expect these values to ever go down (excluding the daily resets). It was initially expecting the meter to do bad readings. But after some investigation, it seems like something else is going on within DSMR Reader.

Here's my setup: Raspberry Pi with the remote datalogger script connected to a Belgian Fluvius smart energy meter -> Docker instance of DSMR Reader -> local MQTT Broker -> Home Assistant

I slightly modified the remote datalogger script to also send a POST request to an endpoint I wrote to parse the telegram and check whether the new value is lower than the previous value. I also wrote a small application that subscribes to the DSMR MQTT topics, that does the same check. The application reading the MQTT messages is indicating a decrease every couple of minutes, while the endpoint that receives the telegrams so far hasn't registered any decreases. Just to be safe, I've verified that it really is working by temporarily logging every increase in value.

So it would seem that, for some reason, the MQTT messages aren't being sent in the same order as the telegrams are being received. I didn't fully dive into the DSMR Reader codebase myself to search for a cause yet.

Note: feel free to respond in Dutch, I'm just used to writing all my issues in English, and it's a hard habit to shake off.

DSMR-reader version

5.7.0

DSMR-reader platform

Docker (e.g. Xirixiz)

Debug info dump

No response

@Glodenox Glodenox added the bug label Sep 28, 2022
@dennissiemensma
Copy link
Member

dennissiemensma commented Sep 29, 2022 via email

@dennissiemensma
Copy link
Member

dennissiemensma commented Sep 29, 2022 via email

@Glodenox
Copy link
Author

I've had a quick look at the debug logs and I think I'm seeing behaviour that could explain this. It looks like the MQTT message queue is being emptied in a descending order. I've still got to debug it further to be certain, though. But seeing the logs, it wouldn't surprise me that new messages get added to the front of the queue and therefore get sent first.

I'm going to adjust my input and output monitoring to add more information so I can monitor the whole data flow, but I'm not sure whether I'll have time for that today.

2022-09-29 09:07:21,007 DEBUG    datalogger   telegram_to_reading              81 | Received telegram:
/FLU5\253769484_A
0-0:96.1.4(50216)
0-0:96.1.1(3153414733313030303338343634)
0-0:1.0.0(220929090756S)
1-0:1.8.1(014144.051*kWh)
1-0:1.8.2(016510.946*kWh)
1-0:2.8.1(003672.855*kWh)
1-0:2.8.2(001446.843*kWh)
0-0:96.14.0(0001)
1-0:1.7.0(00.050*kW)
1-0:2.7.0(00.000*kW)
1-0:21.7.0(00.007*kW)
1-0:41.7.0(00.264*kW)
1-0:61.7.0(00.000*kW)
1-0:22.7.0(00.000*kW)
1-0:42.7.0(00.000*kW)
1-0:62.7.0(00.221*kW)
1-0:32.7.0(238.7*V)
1-0:52.7.0(237.2*V)
1-0:72.7.0(237.0*V)
1-0:31.7.0(000.05*A)
1-0:51.7.0(001.56*A)
1-0:71.7.0(002.52*A)
0-0:96.3.10(1)
0-0:17.0.0(999.9*kW)
1-0:31.4.0(999*A)
0-0:96.13.0()
!0ED2
2022-09-29 09:07:21,124 DEBUG    messages     queue_message                    47 | MQTT: Queued message for dsmr/reading/timestamp: 2022-09-29T07:07:56Z
2022-09-29 09:07:21,152 DEBUG    messages     queue_message                    47 | MQTT: Queued message for dsmr/reading/electricity_delivered_1: 14144.051
<snip other queue_message logs>
2022-09-29 09:07:21,676 DEBUG    messages     queue_message                    47 | MQTT: Queued message for dsmr/meter-stats/rejected_telegrams: 11243
192.168.1.144 - - [29/Sep/2022:07:07:21 +0000] "POST /api/v1/datalogger/dsmrreading HTTP/1.1" 201 0 "-" "python-requests/2.27.1" "-"
2022-09-29 09:07:21,894 DEBUG    schedule     execute_scheduled_processes      31 | SP: 1 backend service(s) ready to run
2022-09-29 09:07:21,894 DEBUG    schedule     execute_scheduled_processes      34 | SP: Running \"Generate consumption data\" (dsmr_consumption.services.run)
2022-09-29 09:07:21,955 DEBUG    messages     queue_message                    47 | MQTT: Queued message for dsmr/day-consumption/electricity1: 6.883
<snip other queue_message logs>
2022-09-29 09:07:21,976 DEBUG    messages     queue_message                    47 | MQTT: Queued message for dsmr/day-consumption/energy_supplier_price_gas: 0.000000
2022-09-29 09:07:22,026 DEBUG    services     compact                         205 | Compact: Processed reading: 12203158 @ 2022-09-29 09:07:56+02:00
2022-09-29 09:07:22,040 DEBUG    schedule     reschedule                       78 | SP: Rescheduled \"Generate consumption data\" to 2022-09-29 09:07:23.026788+02:00 (ETA 0:00:00.999987)
2022-09-29 09:07:22,044 DEBUG    persistent_clients run                              35 | CLIENTS: Running 1 active client(s)
2022-09-29 09:07:22,046 DEBUG    broker       run                              81 | MQTT: Processing 48 message(s)
2022-09-29 09:07:22,046 DEBUG    broker       run                              84 | MQTT: Publishing queued message (#324574267) for dsmr/day-consumption/energy_supplier_price_gas: 0.000000
2022-09-29 09:07:22,046 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Sending PUBLISH (d0, q2, r1, m718), 'b'dsmr/day-consumption/energy_supplier_price_gas'', ... (8 bytes)
2022-09-29 09:07:22,046 DEBUG    broker       run                             108 | MQTT: Waiting for message (#324574267) to be marked published by broker
2022-09-29 09:07:22,046 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Received PUBREC (Mid: 718)
2022-09-29 09:07:22,046 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Sending PUBREL (Mid: 718)
2022-09-29 09:07:22,047 DEBUG    broker       run                             108 | MQTT: Waiting for message (#324574267) to be marked published by broker
2022-09-29 09:07:22,047 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Received PUBCOMP (Mid: 718)
2022-09-29 09:07:22,047 DEBUG    broker       run                             121 | MQTT: Deleting published message (#324574267) from queue
2022-09-29 09:07:22,062 DEBUG    broker       run                              84 | MQTT: Publishing queued message (#324574266) for dsmr/day-consumption/energy_supplier_price_electricity_returned_2: 0.027190
2022-09-29 09:07:22,063 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Sending PUBLISH (d0, q2, r1, m719), 'b'dsmr/day-consumption/energy_supplier_price_electricity_returned_2'', ... (8 bytes)
2022-09-29 09:07:22,063 DEBUG    broker       run                             108 | MQTT: Waiting for message (#324574266) to be marked published by broker
2022-09-29 09:07:22,063 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Received PUBREC (Mid: 719)
2022-09-29 09:07:22,063 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Sending PUBREL (Mid: 719)
2022-09-29 09:07:22,063 DEBUG    broker       run                             108 | MQTT: Waiting for message (#324574266) to be marked published by broker
2022-09-29 09:07:22,063 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Received PUBCOMP (Mid: 719)
2022-09-29 09:07:22,063 DEBUG    broker       run                             121 | MQTT: Deleting published message (#324574266) from queue
<snip other message sending in _descending_ ID order>
2022-09-29 09:07:22,919 DEBUG    broker       run                              84 | MQTT: Publishing queued message (#324574223) for dsmr/reading/electricity_delivered_2: 16510.946
2022-09-29 09:07:22,920 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Sending PUBLISH (d0, q2, r1, m762), 'b'dsmr/reading/electricity_delivered_2'', ... (9 bytes)
2022-09-29 09:07:22,920 DEBUG    broker       run                             108 | MQTT: Waiting for message (#324574223) to be marked published by broker
2022-09-29 09:07:22,920 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Received PUBREC (Mid: 762)
2022-09-29 09:07:22,920 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Sending PUBREL (Mid: 762)
2022-09-29 09:07:22,920 DEBUG    broker       run                             108 | MQTT: Waiting for message (#324574223) to be marked published by broker
2022-09-29 09:07:22,920 DEBUG    broker       run                             108 | MQTT: Waiting for message (#324574223) to be marked published by broker
2022-09-29 09:07:22,921 DEBUG    broker       on_log                          178 | MQTT: (Paho on_log) Received PUBCOMP (Mid: 762)
2022-09-29 09:07:22,921 DEBUG    broker       run                             121 | MQTT: Deleting published message (#324574223) from queue

@Glodenox
Copy link
Author

Glodenox commented Sep 29, 2022

I found some time to dive into the code and am suspicious of this line: https://github.com/dsmrreader/dsmr-reader/blob/v5/dsmr_mqtt/services/broker.py#L74
It sorts the messages in a descending order, so the newer messages are sent first. I can understand the reasoning: dealing with timeouts and preventing the queue from becoming too big. Inverting the order again after that statement would most likely solve this issue. EDIT: or even better: do order_by("pk") and slice the list in a different way: [-settings.DSMRREADER_MQTT_MAX_MESSAGES_IN_QUEUE : ]

@dennissiemensma
Copy link
Member

dennissiemensma commented Sep 29, 2022 via email

@Glodenox
Copy link
Author

It's also not really something most people will notice, as the difference is minimal and, indeed, only happens when a lot of messages are being sent.

As I see you're reacting via e-mail, I think you didn't see the edit yet in my previous message: I suspect it might be more efficient to sort in an ascending way with order_by("pk") and slice the list in a different way: [-settings.DSMRREADER_MQTT_MAX_MESSAGES_IN_QUEUE : ], which then gets the last x messages from the queue. Either way: I expect that it will fix this issue.

@dennissiemensma dennissiemensma added this to the 5.8 milestone Sep 29, 2022
@dennissiemensma
Copy link
Member

It seems I can't use negative indexing nor reordering it later in the query after slicing, which must be the reason I wrote it initially this way.
I'll just swap the logic, by deleting the excess first and then fetch the remainder sorted properly. It doesn't need to be an exact trim of any messages overflowing the queue anyway. Just any sane limit.

@dennissiemensma
Copy link
Member

I made a fix that'll probably be released Saturday in v5.8

@dennissiemensma
Copy link
Member

Released in v5.8

@Glodenox
Copy link
Author

Glodenox commented Oct 1, 2022

I've had my monitoring script run for the whole day and didn't encounter a single issue any more. That fully confirms that this issue is now fixed 👍 Thanks for the quick turnaround on this one!

@dennissiemensma
Copy link
Member

Great, thanks for checking!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants