diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index a0925b5..5fbc33e 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -5,7 +5,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; use rdkafka::config::ClientConfig; use rdkafka::producer::FutureRecord; -use rdkafka::util::Timeout; use denormalized_examples::Measurment; @@ -16,52 +15,64 @@ use denormalized_examples::Measurment; /// This data is read processed by other exmpales #[tokio::main] async fn main() -> Result<()> { - let mut rng = rand::thread_rng(); + let mut tasks = tokio::task::JoinSet::new(); let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", String::from("localhost:9092")) - .set("message.timeout.ms", "60000") + .set("message.timeout.ms", "100") .create() .expect("Producer creation error"); - let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"]; + for _ in 0..2048 { + let producer = producer.clone(); - loop { - let sensor_name = sensors.choose(&mut rng).unwrap().to_string(); + tasks.spawn(async move { + let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"]; - // Alternate between sending random temperature and humidity readings - let (topic, msg) = if rand::random::() < 0.4 { - ( - "temperature".to_string(), - serde_json::to_vec(&Measurment { - occurred_at_ms: get_timestamp_ms(), - sensor_name, - reading: rand::random::() * 115.0, - }) - .unwrap(), - ) - } else { - ( - "humidity".to_string(), - serde_json::to_vec(&Measurment { - occurred_at_ms: get_timestamp_ms(), - sensor_name, - reading: rand::random::(), - }) - .unwrap(), - ) - }; + loop { + let sensor_name = sensors.choose(&mut rand::thread_rng()).unwrap().to_string(); - producer - .send( - FutureRecord::<(), Vec>::to(topic.as_str()).payload(&msg), - Timeout::Never, - ) - .await - .unwrap(); + // Alternate between sending random temperature and humidity readings + let (topic, msg) = if rand::random::() < 0.4 { + ( + "temperature".to_string(), + serde_json::to_vec(&Measurment { + occurred_at_ms: get_timestamp_ms(), + sensor_name, + reading: rand::random::() * 115.0, + }) + .unwrap(), + ) + } else { + ( + "humidity".to_string(), + serde_json::to_vec(&Measurment { + occurred_at_ms: get_timestamp_ms(), + sensor_name, + reading: rand::random::(), + }) + .unwrap(), + ) + }; - tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + let fut = producer + .send_result(FutureRecord::<(), Vec>::to(topic.as_str()).payload(&msg)) + .unwrap(); + + tokio::spawn(async move { + let _ = fut.await; + }); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + }); } + + while let Some(res) = tasks.join_next().await { + let _ = res; + } + + Ok(()) } fn get_timestamp_ms() -> u64 {