Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash caused by concurrent access to MQTT sink #40

Merged
merged 2 commits into from
Feb 20, 2024

Conversation

gmartin82
Copy link
Contributor

@gmartin82 gmartin82 commented Feb 16, 2024

Before this fix messages received through a Zenoh subscription were sent to the related MQTT sink directly from the Zenoh callback. Invoking the MQTT sink in this way leads to the ntex-mqtt implementations write cache being accessed from the calling thread. At the same time, it is possible for the ntex-mqtt server worker thread to also access this write cache as part of its background tasks. As the write cache is designed to be accessed in a single-threaded context only this can lead to an abort.

To fix this an MPSC channel is introduced allowing the Zenoh subscriber callbacks to send messages to an asynchronous task running within the ntex-mqtt server which then forwards the message to MQTT. This fix was inspired by this discussion: ntex-rs/ntex-mqtt#66

Fixes #39

Prior to this fix messages received through a Zenoh subscription were sent to the related MQTT sink directly from the Zenoh callback. Invoking the MQTT sink in this way leads to the ntex-mqtt implementations write cache being accessed from the calling thread. At the same time it is possible for the ntex-mqtt server worker thread to also access this write cache as part of its background tasks. As the write cache is designed to be accessed in a single threaded context only this lead to an abort.

To fix this an MPSC channel is introduced allowing the Zenoh subscriber callbacks to send messages to an asynchronous task running within the ntex-mqtt server which then forwards the message to MQTT. This fix was inspired by this discussion: ntex-rs/ntex-mqtt#66
@eclipse-zenoh-bot
Copy link
Contributor

@gmartin82 If this pull request contains a bugfix or a new feature, then please consider using Closes #ISSUE-NUMBER syntax to link it to an issue.

@gmartin82 gmartin82 changed the title Fix crash caused by concurrent access to MQTT sink (fixes #39) Fix crash caused by concurrent access to MQTT sink Feb 16, 2024
@eclipse-zenoh-bot
Copy link
Contributor

@gmartin82 If this pull request contains a bugfix or a new feature, then please consider using Closes #ISSUE-NUMBER syntax to link it to an issue.

@gmartin82
Copy link
Contributor Author

Note that while this PR fixes the crash due to concurrent access it exposes another issue which occurs when data is being forwarded from Zenoh at a high frequency. While data is successfully forwarded to connected MQTT clients ntex-mqtt isn't able to handle new MQTT connection requests or keep-alive messages. This seems to be a limitation of the current ntex-mqtt implementation.

I was only able to trigger this secondary issue when sending data as fast as possible from Zenoh to MQTT. When small sleeps were introduced between publishing messages (e.g. 1 ms) everything worked as expected.

@gmartin82 gmartin82 marked this pull request as ready for review February 19, 2024 15:25
Copy link
Member

@JEnoch JEnoch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@JEnoch JEnoch merged commit 551de06 into eclipse-zenoh:main Feb 20, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] MQTT plugin causes router to crash if client repeatedly creates and closes a session
3 participants