Skip to content

Commit

Permalink
Rust: properly handle messages with log_time 0 (#930)
Browse files Browse the repository at this point in the history
Update the Writer tracking of message bounds to include log_time 0 as a valid time for bounds.

Additionally removes duplicated stats tracking data in the Chunk and overall MCAP writer.

Fix #924
  • Loading branch information
mrkline authored Jul 27, 2023
1 parent 1f7a7ff commit cb36b5c
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 46 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ]
repository = "https://github.com/foxglove/mcap"
documentation = "https://docs.rs/mcap"
readme = "README.md"
version = "0.7.0"
version = "0.7.1"
edition = "2021"
license = "MIT"

Expand Down
105 changes: 60 additions & 45 deletions rust/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ pub struct Writer<'a, W: Write + Seek> {
options: WriteOptions,
schemas: HashMap<Schema<'a>, u16>,
channels: HashMap<Channel<'a>, u16>,
stats: records::Statistics,
chunk_indexes: Vec<records::ChunkIndex>,
attachment_indexes: Vec<records::AttachmentIndex>,
metadata_indexes: Vec<records::MetadataIndex>,
/// Message start and end time, or None if there are no messages yet.
message_bounds: Option<(u64, u64)>,
channel_message_counts: BTreeMap<u16, u64>,
}

impl<'a, W: Write + Seek> Writer<'a, W> {
Expand All @@ -199,10 +201,11 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
options: opts,
schemas: HashMap::new(),
channels: HashMap::new(),
stats: records::Statistics::default(),
chunk_indexes: Vec::new(),
attachment_indexes: Vec::new(),
metadata_indexes: Vec::new(),
message_bounds: None,
channel_message_counts: BTreeMap::new(),
})
}

Expand All @@ -219,8 +222,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
return Ok(*id);
}

self.stats.channel_count += 1;

let next_channel_id = self.channels.len() as u16;
assert!(self
.channels
Expand All @@ -236,8 +237,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
return Ok(*id);
}

self.stats.schema_count += 1;

// Schema IDs cannot be zero, that's the sentinel value in a channel
// for "no schema"
let next_schema_id = self.schemas.len() as u16 + 1;
Expand Down Expand Up @@ -279,17 +278,11 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
));
}

self.stats.message_count += 1;
self.stats.message_start_time = match self.stats.message_start_time {
0 => header.log_time,
nz => nz.min(header.log_time),
};
self.stats.message_end_time = match self.stats.message_end_time {
0 => header.log_time,
nz => nz.max(header.log_time),
};
self.message_bounds = Some(match self.message_bounds {
None => (header.log_time, header.log_time),
Some((start, end)) => (start.min(header.log_time), end.max(header.log_time)),
});
*self
.stats
.channel_message_counts
.entry(header.channel_id)
.or_insert(0) += 1;
Expand All @@ -313,8 +306,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
}

pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> {
self.stats.attachment_count += 1;

let header = records::AttachmentHeader {
log_time: attachment.log_time,
create_time: attachment.create_time,
Expand Down Expand Up @@ -350,8 +341,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
}

pub fn write_metadata(&mut self, metadata: &Metadata) -> McapResult<()> {
self.stats.metadata_count += 1;

let w = self.finish_chunk()?;
let offset = w.stream_position()?;

Expand Down Expand Up @@ -405,7 +394,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
self.writer = Some(match prev_writer {
WriteMode::Raw(w) => {
// It's chunkin time.
self.stats.chunk_count += 1;
WriteMode::Chunk(ChunkWriter::new(w, self.options.compression)?)
}
chunk => chunk,
Expand Down Expand Up @@ -467,8 +455,25 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
// (&mut self).
// (We could get around all this noise by having finish() take self,
// but then it wouldn't be droppable _and_ finish...able.)
let mut stats = records::Statistics::default();
std::mem::swap(&mut stats, &mut self.stats);
let mut channel_message_counts = BTreeMap::new();
std::mem::swap(
&mut channel_message_counts,
&mut self.channel_message_counts,
);

// Grab stats before we munge all the self fields below.
let message_bounds = self.message_bounds.unwrap_or((0, 0));
let stats = records::Statistics {
message_count: channel_message_counts.values().sum(),
schema_count: self.schemas.len() as u16,
channel_count: self.channels.len() as u32,
attachment_count: self.attachment_indexes.len() as u32,
metadata_count: self.metadata_indexes.len() as u32,
chunk_count: self.chunk_indexes.len() as u32,
message_start_time: message_bounds.0,
message_end_time: message_bounds.1,
channel_message_counts,
};

let mut chunk_indexes = Vec::new();
std::mem::swap(&mut chunk_indexes, &mut self.chunk_indexes);
Expand Down Expand Up @@ -685,7 +690,9 @@ impl<W: Write> Write for Compressor<W> {
struct ChunkWriter<W: Write> {
header_start: u64,
stream_start: u64,
header: records::ChunkHeader,
/// Message start and end time, or None if there are no messages yet.
message_bounds: Option<(u64, u64)>,
compression_name: &'static str,
compressor: CountingCrcWriter<Compressor<W>>,
indexes: BTreeMap<u16, Vec<records::MessageIndexEntry>>,
}
Expand All @@ -703,6 +710,8 @@ impl<W: Write + Seek> ChunkWriter<W> {
None => "",
};

// Write a dummy header that we'll overwrite with the actual values later.
// We just need its size (which only varies based on compression name).
let header = records::ChunkHeader {
message_start_time: 0,
message_end_time: 0,
Expand All @@ -711,7 +720,6 @@ impl<W: Write + Seek> ChunkWriter<W> {
compression: String::from(compression_name),
compressed_size: !0,
};

writer.write_le(&header)?;
let stream_start = writer.stream_position()?;

Expand All @@ -733,7 +741,8 @@ impl<W: Write + Seek> ChunkWriter<W> {
compressor,
header_start,
stream_start,
header,
compression_name,
message_bounds: None,
indexes: BTreeMap::new(),
})
}
Expand Down Expand Up @@ -770,15 +779,11 @@ impl<W: Write + Seek> ChunkWriter<W> {
}

fn write_message(&mut self, header: &MessageHeader, data: &[u8]) -> McapResult<()> {
// Update min/max time
self.header.message_start_time = match self.header.message_start_time {
0 => header.log_time,
nz => nz.min(header.log_time),
};
self.header.message_end_time = match self.header.message_end_time {
0 => header.log_time,
nz => nz.max(header.log_time),
};
// Update min/max time for the chunk
self.message_bounds = Some(match self.message_bounds {
None => (header.log_time, header.log_time),
Some((start, end)) => (start.min(header.log_time), end.max(header.log_time)),
});

// Add an index for this message
self.indexes
Expand All @@ -799,22 +804,32 @@ impl<W: Write + Seek> ChunkWriter<W> {
Ok(())
}

fn finish(mut self) -> McapResult<(W, records::ChunkIndex)> {
fn finish(self) -> McapResult<(W, records::ChunkIndex)> {
// Get the number of uncompressed bytes written and the CRC.
self.header.uncompressed_size = self.compressor.position();

let uncompressed_size = self.compressor.position();
let (stream, crc) = self.compressor.finalize();
self.header.uncompressed_crc = crc;
let uncompressed_crc = crc;

// Finalize the compression stream - it maintains an internal buffer.
let mut writer = stream.finish()?;
let end_of_stream = writer.stream_position()?;
self.header.compressed_size = end_of_stream - self.stream_start;
let compressed_size = end_of_stream - self.stream_start;
let record_size = (end_of_stream - self.header_start) as usize - 9; // 1 byte op, 8 byte len

// Back up, write our finished header, then continue at the end of the stream.
writer.seek(SeekFrom::Start(self.header_start))?;
op_and_len(&mut writer, op::CHUNK, record_size)?;
writer.write_le(&self.header)?;
let message_bounds = self.message_bounds.unwrap_or((0, 0));
let header = records::ChunkHeader {
message_start_time: message_bounds.0,
message_end_time: message_bounds.1,
uncompressed_size,
uncompressed_crc,
compression: String::from(self.compression_name),
compressed_size,
};
writer.write_le(&header)?;
assert_eq!(self.stream_start, writer.stream_position()?);
assert_eq!(writer.seek(SeekFrom::End(0))?, end_of_stream);

Expand All @@ -839,15 +854,15 @@ impl<W: Write + Seek> ChunkWriter<W> {
let end_of_indexes = writer.stream_position()?;

let index = records::ChunkIndex {
message_start_time: self.header.message_start_time,
message_end_time: self.header.message_end_time,
message_start_time: header.message_start_time,
message_end_time: header.message_end_time,
chunk_start_offset: self.header_start,
chunk_length: end_of_stream - self.header_start,
message_index_offsets,
message_index_length: end_of_indexes - end_of_stream,
compression: self.header.compression,
compressed_size: self.header.compressed_size,
uncompressed_size: self.header.uncompressed_size,
compression: header.compression,
compressed_size: header.compressed_size,
uncompressed_size: header.uncompressed_size,
};

Ok((writer, index))
Expand Down
54 changes: 54 additions & 0 deletions rust/tests/handles_time0_messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::io::Cursor;

use anyhow::Result;

/// Check that chunks and statistics properly handle messages with log_time = 0
/// and don't ignore it, using the next time as the minimum.
#[test]
fn handles_time0_messages() -> Result<()> {
let mut buf = Vec::new();
let mut out = mcap::Writer::new(Cursor::new(&mut buf))?;

let my_channel = mcap::Channel {
topic: String::from("time"),
message_encoding: String::from("text/plain"),
metadata: Default::default(),
schema: None,
};

let channel_id = out.add_channel(&my_channel)?;

out.write_to_known_channel(
&mcap::records::MessageHeader {
channel_id,
sequence: 1,
log_time: 0,
publish_time: 0,
},
b"Time, Dr. Freeman?",
)?;
out.write_to_known_channel(
&mcap::records::MessageHeader {
channel_id,
sequence: 2,
log_time: 42,
publish_time: 42,
},
b"Is it really that time agian?",
)?;

out.finish()?;
drop(out);

let summary = mcap::read::Summary::read(&buf)?.expect("no summary");

let the_chunk = &summary.chunk_indexes[0];
assert_eq!(the_chunk.message_start_time, 0);
assert_eq!(the_chunk.message_end_time, 42);

let stats = &summary.stats.expect("no stats");
assert_eq!(stats.message_start_time, 0);
assert_eq!(stats.message_end_time, 42);

Ok(())
}

0 comments on commit cb36b5c

Please sign in to comment.