Skip to content

Commit

Permalink
refactor(zeromq): switch from zmq to zeromq (#20624)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongbo-miao authored Nov 22, 2024
1 parent 00c1fbe commit ba76461
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 328 deletions.
76 changes: 38 additions & 38 deletions data-visualization/iads/iads-data-producer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,49 @@ use tokio::net::TcpListener;
use tokio::time::{sleep, Duration};

// Constants for packet structure
const HEADER_SIZE: i32 = 32;
const TAG_SIZE: i32 = 2;
const VALUE_SIZE: i32 = 4;
const TIME_PAIR_SIZE: i32 = (TAG_SIZE * 2) + (VALUE_SIZE * 2); // 2 tags + 2 values
const SIGNAL_PAIR_SIZE: i32 = (TAG_SIZE * 2) + (VALUE_SIZE * 2); // 2 tags + 2 values
const HEADER_SIZE_BYTE: i32 = 32;
const TAG_SIZE_BYTE: i32 = 2;
const VALUE_SIZE_BYTE: i32 = 4;
const TIME_PAIR_SIZE_BYTE: i32 = (TAG_SIZE_BYTE * 2) + (VALUE_SIZE_BYTE * 2); // 2 tags + 2 values
const SIGNAL_PAIR_SIZE_BYTE: i32 = (TAG_SIZE_BYTE * 2) + (VALUE_SIZE_BYTE * 2); // 2 tags + 2 values

// Configuration signals
struct StreamConfig {
num_signals: i32,
signal_number: i32,
rate_hz: f64,
port: u16,
}

impl Default for StreamConfig {
fn default() -> Self {
StreamConfig {
num_signals: 4,
signal_number: 4,
rate_hz: 50.0,
port: 49000,
}
}
}

fn calculate_packet_size(num_signals: i32) -> i32 {
let signal_pairs = (num_signals + 1) / 2;
HEADER_SIZE + TIME_PAIR_SIZE + (signal_pairs * SIGNAL_PAIR_SIZE)
fn calculate_packet_size_byte(signal_number: i32) -> i32 {
let signal_pair_number = (signal_number + 1) / 2;
HEADER_SIZE_BYTE + TIME_PAIR_SIZE_BYTE + (signal_pair_number * SIGNAL_PAIR_SIZE_BYTE)
}

async fn send_data_stream(
mut stream: tokio::net::TcpStream,
config: &StreamConfig,
) -> std::io::Result<()> {
let packet_size = calculate_packet_size(config.num_signals);
let packet_size_byte = calculate_packet_size_byte(config.signal_number);
let mut rng = rand::thread_rng();

// Calculate the interval in milliseconds from Hz
let interval_ms = (1000.0 / config.rate_hz) as u64;
// Calculate time increment in nanoseconds from Hz
let time_increment = (1.0 / config.rate_hz * 1_000_000_000.0) as i64;
let time_increment_ns = (1.0 / config.rate_hz * 1_000_000_000.0) as i64;

let mut time = 0i64;
let mut time_ns = 0i64;
let mut packet_counter = 0i32;
let mut buffer = vec![0u8; packet_size as usize];
let mut buffer = vec![0u8; packet_size_byte as usize];

// Handshake
stream.write_all(&[1u8]).await?;
Expand All @@ -56,49 +56,49 @@ async fn send_data_stream(
let mut offset = 0usize;

// Header
buffer[offset..offset + VALUE_SIZE as usize]
.copy_from_slice(&(packet_size - VALUE_SIZE).to_le_bytes());
buffer[offset + VALUE_SIZE as usize..offset + VALUE_SIZE as usize * 2]
buffer[offset..offset + VALUE_SIZE_BYTE as usize]
.copy_from_slice(&(packet_size_byte - VALUE_SIZE_BYTE).to_le_bytes());
buffer[offset + VALUE_SIZE_BYTE as usize..offset + VALUE_SIZE_BYTE as usize * 2]
.copy_from_slice(&packet_counter.to_le_bytes());
buffer[offset + VALUE_SIZE as usize * 2..offset + VALUE_SIZE as usize * 3]
buffer[offset + VALUE_SIZE_BYTE as usize * 2..offset + VALUE_SIZE_BYTE as usize * 3]
.copy_from_slice(&packet_counter.to_le_bytes());
for i in 3..8 {
buffer[offset + VALUE_SIZE as usize * i..offset + VALUE_SIZE as usize * (i + 1)]
buffer[offset + VALUE_SIZE_BYTE as usize * i..offset + VALUE_SIZE_BYTE as usize * (i + 1)]
.copy_from_slice(&0i32.to_le_bytes());
}
offset += HEADER_SIZE as usize;
offset += HEADER_SIZE_BYTE as usize;

// Time tags and values
for tag in [1u16, 2u16] {
buffer[offset..offset + TAG_SIZE as usize].copy_from_slice(&tag.to_le_bytes());
offset += TAG_SIZE as usize;
buffer[offset..offset + TAG_SIZE_BYTE as usize].copy_from_slice(&tag.to_le_bytes());
offset += TAG_SIZE_BYTE as usize;
}

let time_high = (time >> 32) as u32;
let time_low = (time & 0xFFFFFFFF) as u32;
buffer[offset..offset + VALUE_SIZE as usize].copy_from_slice(&time_high.to_le_bytes());
offset += VALUE_SIZE as usize;
buffer[offset..offset + VALUE_SIZE as usize].copy_from_slice(&time_low.to_le_bytes());
offset += VALUE_SIZE as usize;
let time_high = (time_ns >> 32) as u32;
let time_low = (time_ns & 0xFFFFFFFF) as u32;
buffer[offset..offset + VALUE_SIZE_BYTE as usize].copy_from_slice(&time_high.to_le_bytes());
offset += VALUE_SIZE_BYTE as usize;
buffer[offset..offset + VALUE_SIZE_BYTE as usize].copy_from_slice(&time_low.to_le_bytes());
offset += VALUE_SIZE_BYTE as usize;

// Signals with random values
for n in (0..config.num_signals).step_by(2) {
for n in (0..config.signal_number).step_by(2) {
// Tags
buffer[offset..offset + TAG_SIZE as usize]
buffer[offset..offset + TAG_SIZE_BYTE as usize]
.copy_from_slice(&((3 + n) as u16).to_le_bytes());
offset += TAG_SIZE as usize;
buffer[offset..offset + TAG_SIZE as usize]
offset += TAG_SIZE_BYTE as usize;
buffer[offset..offset + TAG_SIZE_BYTE as usize]
.copy_from_slice(&((4 + n) as u16).to_le_bytes());
offset += TAG_SIZE as usize;
offset += TAG_SIZE_BYTE as usize;

// Random values
let value1: f32 = rng.gen_range(0.0..100.0);
let value2: f32 = rng.gen_range(0.0..100.0);

buffer[offset..offset + VALUE_SIZE as usize].copy_from_slice(&value1.to_le_bytes());
offset += VALUE_SIZE as usize;
buffer[offset..offset + VALUE_SIZE as usize].copy_from_slice(&value2.to_le_bytes());
offset += VALUE_SIZE as usize;
buffer[offset..offset + VALUE_SIZE_BYTE as usize].copy_from_slice(&value1.to_le_bytes());
offset += VALUE_SIZE_BYTE as usize;
buffer[offset..offset + VALUE_SIZE_BYTE as usize].copy_from_slice(&value2.to_le_bytes());
offset += VALUE_SIZE_BYTE as usize;
}

if let Err(e) = stream.write_all(&buffer).await {
Expand All @@ -107,7 +107,7 @@ async fn send_data_stream(
}

packet_counter += 1;
time += time_increment;
time_ns += time_increment_ns;
sleep(Duration::from_millis(interval_ms)).await;
}
}
Expand Down
Loading

0 comments on commit ba76461

Please sign in to comment.