Skip to content

Commit

Permalink
optimize emit measurements script
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 15, 2024
1 parent 8a73819 commit 524de94
Showing 1 changed file with 47 additions and 36 deletions.
83 changes: 47 additions & 36 deletions examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::{SystemTime, UNIX_EPOCH};

use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::util::Timeout;

use denormalized_examples::Measurment;

Expand All @@ -16,52 +15,64 @@ use denormalized_examples::Measurment;
/// This data is read processed by other exmpales
#[tokio::main]
async fn main() -> Result<()> {
let mut rng = rand::thread_rng();
let mut tasks = tokio::task::JoinSet::new();

let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", String::from("localhost:9092"))
.set("message.timeout.ms", "60000")
.set("message.timeout.ms", "100")
.create()
.expect("Producer creation error");

let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"];
for _ in 0..2048 {
let producer = producer.clone();

loop {
let sensor_name = sensors.choose(&mut rng).unwrap().to_string();
tasks.spawn(async move {
let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"];

// Alternate between sending random temperature and humidity readings
let (topic, msg) = if rand::random::<f64>() < 0.4 {
(
"temperature".to_string(),
serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
sensor_name,
reading: rand::random::<f64>() * 115.0,
})
.unwrap(),
)
} else {
(
"humidity".to_string(),
serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
sensor_name,
reading: rand::random::<f64>(),
})
.unwrap(),
)
};
loop {
let sensor_name = sensors.choose(&mut rand::thread_rng()).unwrap().to_string();

producer
.send(
FutureRecord::<(), Vec<u8>>::to(topic.as_str()).payload(&msg),
Timeout::Never,
)
.await
.unwrap();
// Alternate between sending random temperature and humidity readings
let (topic, msg) = if rand::random::<f64>() < 0.4 {
(
"temperature".to_string(),
serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
sensor_name,
reading: rand::random::<f64>() * 115.0,
})
.unwrap(),
)
} else {
(
"humidity".to_string(),
serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
sensor_name,
reading: rand::random::<f64>(),
})
.unwrap(),
)
};

tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
let fut = producer
.send_result(FutureRecord::<(), Vec<u8>>::to(topic.as_str()).payload(&msg))
.unwrap();

tokio::spawn(async move {
let _ = fut.await;
});

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
});
}

while let Some(res) = tasks.join_next().await {
let _ = res;
}

Ok(())
}

fn get_timestamp_ms() -> u64 {
Expand Down

0 comments on commit 524de94

Please sign in to comment.