Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Issue #165

Open
YuanYuYuan opened this issue Aug 28, 2022 · 11 comments
Open

Performance Issue #165

YuanYuYuan opened this issue Aug 28, 2022 · 11 comments

Comments

@YuanYuYuan
Copy link

Hi, there,

While benchmarking the throughput of MQTT, I found that the Rust version is not stable and slower than one using Python. The testing is about the throughput of the publisher. Here are the details.

  • Broker: Mosquitto 2.0.15
  • Publisher: pub.rs (paho.mqtt.rust 0.11.1) or pub.py (paho.mqtt.python 1.6.1 )
  • QoS: 0
  • MQTT version: 5

pub.rs

use paho_mqtt as mqtt;
use anyhow::{Result, bail};
use clap::Parser;
use async_std::{sync::Arc, task};
use std::{
    time::{Duration, Instant},
};


const TOPIC: &str = "/test/thr";
const QOS: i32 = mqtt::QOS_0;
const MAX_PAYLOAD: usize = 268435400;

#[derive(Parser)]
struct Args {
    #[clap(short, long, default_value = "tcp://localhost:1883")]
    broker: String,

    #[clap(short, long, default_value = "8")]
    payload: usize,
}


#[async_std::main]
async fn main() -> Result<()> {
    env_logger::init();
    let Args { broker, payload } = Args::parse();

    if payload > MAX_PAYLOAD {
        bail!("Payload size {} exceeded the maximum value {} (bytes)", payload, MAX_PAYLOAD);
    }

    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(broker)
        .mqtt_version(mqtt::MQTT_VERSION_5)
        .finalize();
    let connect_opts = mqtt::ConnectOptionsBuilder::new()
        .finalize();

    let client = mqtt::AsyncClient::new(create_opts)?;
    let msg_payload: Vec<_> = (0..payload)
        .map(|_| 0u8)
        .collect();

    client.connect(connect_opts).await?;
    let msg = mqtt::Message::new(TOPIC, msg_payload.clone(), QOS);

    let mut counter = 0;
    let mut now = Instant::now();
    loop {
        client.publish(msg.clone()).await?;
        let elapsed = now.elapsed().as_micros();
        if elapsed > 1000000 {
            println!("{:.3} msg/s", counter as f64 * 1_000_000. / elapsed as f64);
            counter = 0;
            now = Instant::now();
        }
        counter += 1;
    }
    client.disconnect(None).await?;
    Ok(())
}

pub.py

import paho.mqtt.client as mqtt
import time
from random import randbytes

client = mqtt.Client()
client.connect("127.0.0.1", 1883, 60)

payload = randbytes(8)
cnt = 0
start = time.time_ns()
while True:
    elapsed_time = time.time_ns() - start
    if elapsed_time > 1000000000:
        start = time.time_ns()
        print("%.3f msg/s" % (float(cnt) * 1000000000. / float(elapsed_time)))
        cnt = 0
    client.publish("/test/thr", payload, qos=0, retain=False)
    cnt += 1

Throughput comparison

pub.rs

54248.241 msg/s
18179.741 msg/s
47613.571 msg/s
48019.992 msg/s
49332.507 msg/s
49145.853 msg/s
20423.574 msg/s
12281.802 msg/s
48072.856 msg/s
16983.018 msg/s
25440.103 msg/s
4394.399 msg/s
22710.194 msg/s
18406.605 msg/s
14120.632 msg/s
13052.244 msg/s
51242.385 msg/s
47943.329 msg/s
49872.900 msg/s
48808.854 msg/s
3040.162 msg/s

pub.py

95991.604 msg/s
96111.666 msg/s
95716.188 msg/s
92725.087 msg/s
92896.827 msg/s
96439.780 msg/s
96038.697 msg/s
95765.548 msg/s
95877.959 msg/s
95921.130 msg/s
94902.602 msg/s
92469.734 msg/s
95637.739 msg/s
96385.175 msg/s
95366.183 msg/s
96524.485 msg/s
94519.783 msg/s
95799.747 msg/s
95637.274 msg/s
95755.109 msg/s
67524.127 msg/s
47830.886 msg/s
52210.208 msg/s
55274.634 msg/s
54380.949 msg/s
55581.592 msg/s
56070.878 msg/s
56979.555 msg/s
57747.757 msg/s
57908.095 msg/s
49828.407 msg/s
53177.114 msg/s
53481.575 msg/s
54195.917 msg/s
50037.848 msg/s
55142.561 msg/s
55998.976 msg/s
51974.582 msg/s
54222.880 msg/s
48587.717 msg/s
49410.644 msg/s
54962.219 msg/s
53889.664 msg/s
54323.014 msg/s

The problems are

  1. The one written in Rust is slower than the Python counterpart. (I also tested it with the C counterpart, which is much faster.)
  2. The message rate is not stable while testing in Rust.

I'm wondering that if it's the performance issue. Or is anything missing in my Rust code? Any suggestion is appreciated!

@fpagliughi
Copy link
Contributor

I was actually going to revisit performance issues. There was an issue with the new C lib that has slowed things down and got me thinking about it again:
eclipse-paho/paho.mqtt.c#1254

When that fix is released, I want to compare against the C library, and also start testing the different async executors to see if they make a difference. My thought is that this library should be a little slower than the C one, due to the FFI overhead, but not by too much.

@YuanYuYuan
Copy link
Author

Thanks for your reply! I'll look forward to the new release.

@jerry73204
Copy link

Just profiled the async consumer with perf. AsyncClient::on_message_arrived() accounts for 51% of total running time when the message size is 1MB. The offending to_vec() is located here in the function that performs a costly memcpy. The copy seems to create an owned payload. It can be optimized to use the underingly ffi::MQTTAsync_message buffer.

@fpagliughi
Copy link
Contributor

fpagliughi commented Sep 24, 2022

Interesting. In my initial notes from 5yrs ago, it says, "reduce/eliminate mem copies." I should have listened to my own advice! I really do need to create some performance tests with large payloads to stay on top of this kind of thing.

It might be worth keeping the underlying C struct and creating a Rust slice over the payload. That creates all kinds of issues, like creating the messages from Rust, moving, etc. But for that performance improvement, it would be worth trying.

@jerry73204
Copy link

I explored your code and saw that the Message type is used in both publication and subscription. This way could make API simple, but the overhead is not trivial. It's because the payload was allocated in different ways in both cases. I preferred to use different underlying implementations.

For subscription, topicName and payload are allocated from pato.mqtt.c site. They are eventually freed by MQTTAsync_free() and MQTTAsync_freeMessage(). For publication, topicName and payload are allocated on the Rust side. The Message type will end up with two different message types or an enum of two kinds.

@fpagliughi
Copy link
Contributor

fpagliughi commented Sep 26, 2022

Yes, MQTTAsync_malloc() and MQTTAsync_free() are just thin wrappers around the C malloc() and free() respectively. They were put in the C lib both so that it could track/log allocations, but also so that apps and wrapper libraries like this (Rust) one could be sure that they are using the same allocator as the C lib.

Originally, when this library was started, Rust used a different allocator (jemalloc) that was not compatible with the system allocator used by C. So you could not allocate memory in Rust and free it in C, or visa versa. The app would likely crash.

This changed in 2018(?), and now Rust uses the system allocator by default:
https://internals.rust-lang.org/t/jemalloc-was-just-removed-from-the-standard-library/8759

For the most part, though, the C library doesn't pass memory ownership in and out of the library. The big exception is incoming messages, which the application must free manually. But, going the other way, on publish, the library doesn't take possession of the message. I can only assume it does an internal copy of the payload if it needs to cache the message, particularly for QoS >= 1.

That said, the Rust Message type just needs a container for payloads and topics. Using String and Vec makes it easy, but there's really no reason we couldn't use raw pointers and manually-allocated memory buffers, other than it would be more difficult and increase the chance of introducing memory leaks and buffer overruns.

But it also might just flip the problem around. Receiving messages would be more performant, but creating and sending messages could suffer a performance drop. With the current code, the app can build a payload into a Vec and just move the Vec into the Message. It we insist on a manual malloc(), though, then we would need to copy the Vec into the allocated buffer.

What we would likely need is something like a Cow which knows where the memory cam from, and how (if) to deallocate it.

@fpagliughi
Copy link
Contributor

fpagliughi commented Jan 7, 2023

The issue that @jerry73204 raised is a real one, and I'm still thinking of a way to make a Cow-like container for messages to eliminate the performance bottleneck. But that mainly applies to in-coming messages, and I wanted to get back to the original question about publish speed.

I copy/pasted @YuanYuYuan 's pub app to give it a try. I called it pubspeed, but it's the same code. BTW, my Caro.toml looks like this:

[package]
name = "pubspeed"
version = "0.1.0"
edition = "2021"

[dependencies]
env_logger = "0.7"
log = "0.4"
paho-mqtt = "0.11" 
anyhow = "1.0"
async-std = { version = "1.12", features = ["attributes"] }
clap = { Version = "4", features = ["derive"] }

The -sys crate has been updated on crates.io to eliminate one known performance issue. I'm using Linux Mint 20 on a fairly beefy desktop. Intel® Core™ i9-10900KF CPU @ 3.70GHz × 20, w/ 64GB RAM.

So I'm seeing these speeds:

$ ./target/debug/pubspeed 
83767.995 msg/s
91022.818 msg/s
82583.092 msg/s
80261.278 msg/s
81488.511 msg/s
30495.684 msg/s
75632.244 msg/s
84893.406 msg/s
87294.738 msg/s
87414.301 msg/s

A little closer to the Python speeds. But...
consider that this is the Debug build.

Trying again with the release build:

$ ./target/release/pubspeed 
159322.841 msg/s
 51710.447 msg/s
155608.911 msg/s
 62973.255 msg/s
155964.844 msg/s
161782.191 msg/s
149789.700 msg/s
 46388.740 msg/s
 73090.876 msg/s
157278.685 msg/s
 59091.222 msg/s
 26817.932 msg/s
 66829.902 msg/s
153944.384 msg/s
151238.673 msg/s
 46498.048 msg/s

This is certainly faster, but a lot more erratic. It would be interesting to see where the bottleneck is here and what's causing the differences. (BTW: The messages w/ an 8-byte payload total in at 87 bytes. So at max ~160k msg/s, it's doing around 14MB/s, not counting the TCP headers, etc)

I also tested with the unreleased code in the release branch, which now uses Paho C v1.3.12, which has some more changes to the underlying network code.

$ ./target/release/pubspeed 
152840.694 msg/s
145390.564 msg/s
148636.257 msg/s
 35341.695 msg/s
 31810.992 msg/s
146906.237 msg/s
156787.373 msg/s
 24630.211 msg/s
 46718.710 msg/s
 34790.556 msg/s
150734.397 msg/s

This seems fairly comparable with the previous version.

@jjj-vtm
Copy link

jjj-vtm commented Aug 28, 2024

Sorry for resurrecting this old thread but we try to integrate the rust library and so I got curious. I think the performance difference comes mainly from how the C library is implemented. It basically uses a thread for sending out the messages which reads from a queue and if there are no more elements it waits for a cond variable:

https://github.com/eclipse/paho.mqtt.c/blob/6b1e202a701ffcdaa277b5644ed291287a70a7aa/src/MQTTAsyncUtils.c#L1807C1-L1809C70

The problem is now that in this test there is only a single rust thread which adds an element to the queue and then sleeps until its woken up when the mqtt send thread successfully send the message via TCP. Since there will always be only one message in the queue the mqtt send thread will always wait on the cond variable so for small messages like 8byte the runtime is dominated by the thread switching. To see this, at least on my machine, removing the wait in the C library

https://github.com/eclipse/paho.mqtt.c/blob/6b1e202a701ffcdaa277b5644ed291287a70a7aa/src/MQTTAsyncUtils.c#L1807C1-L1809C70

made it a lot faster. Since this is not viable a better way would be to allow publishing with a callback like

         let _ = client.publish_cb(msg.clone(), cb);

for example and instead of awaiting the token simply count in the callback the number of successful publishes. This keeps the mqtt send queue full and prevents the mqtt send thread from sleeping / waiting. With this I get around 450k messages per second published to a local mosquitto broker on my M3. I will upload the branch to github and link it here would be interesting if this is reproducible on other systems.

@jjj-vtm
Copy link

jjj-vtm commented Aug 29, 2024

Here is a link to the testcase and the fixes:

https://github.com/jjj-vtm/paho.mqtt.rust/blob/performance_improvements/examples/pub_loop.rs

I am not sure if it is worth a feature request but it shows that the rust + c combination is quite fast

@jerry73204
Copy link

@jjj-vtm Thanks for your insight. It's nice if we can have callback in the public crate. It can serve those focusing on performance before we can make async/.await performant.

@fpagliughi
Copy link
Contributor

This is great stuff. I'm deep into refactoring the options structs to remove the C FFI dependence in them, as a 1st step towards a fully Rust library, but that is nearing completion, then I can start looking at this more closely.

Also, @jerry73204, I haven't forgotten about the copy-on-receiving-messages performance hit. That would be a relatively quick boost as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants