Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust: properly handle messages with log_time 0 #930

Merged
merged 4 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ]
repository = "https://github.com/foxglove/mcap"
documentation = "https://docs.rs/mcap"
readme = "README.md"
version = "0.7.0"
version = "0.7.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we need to do anything to release this version to crates?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only previous instance of a "release" I've found is this tag: https://github.com/foxglove/mcap/releases/tag/releases%2Frust%2Fv0.6.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@james-rms is the owner of the crate (see https://crates.io/crates/mcap) so he'll have to push an update with cargo publish unless someone has his credentials sitting around. 😛

edition = "2021"
license = "MIT"

Expand Down
105 changes: 60 additions & 45 deletions rust/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ pub struct Writer<'a, W: Write + Seek> {
options: WriteOptions,
schemas: HashMap<Schema<'a>, u16>,
channels: HashMap<Channel<'a>, u16>,
stats: records::Statistics,
chunk_indexes: Vec<records::ChunkIndex>,
attachment_indexes: Vec<records::AttachmentIndex>,
metadata_indexes: Vec<records::MetadataIndex>,
/// Message start and end time, or None if there are no messages yet.
message_bounds: Option<(u64, u64)>,
channel_message_counts: BTreeMap<u16, u64>,
}

impl<'a, W: Write + Seek> Writer<'a, W> {
Expand All @@ -199,10 +201,11 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
options: opts,
schemas: HashMap::new(),
channels: HashMap::new(),
stats: records::Statistics::default(),
chunk_indexes: Vec::new(),
attachment_indexes: Vec::new(),
metadata_indexes: Vec::new(),
message_bounds: None,
channel_message_counts: BTreeMap::new(),
})
}

Expand All @@ -219,8 +222,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
return Ok(*id);
}

self.stats.channel_count += 1;

let next_channel_id = self.channels.len() as u16;
assert!(self
.channels
Expand All @@ -236,8 +237,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
return Ok(*id);
}

self.stats.schema_count += 1;

// Schema IDs cannot be zero, that's the sentinel value in a channel
// for "no schema"
let next_schema_id = self.schemas.len() as u16 + 1;
Expand Down Expand Up @@ -279,17 +278,11 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
));
}

self.stats.message_count += 1;
self.stats.message_start_time = match self.stats.message_start_time {
0 => header.log_time,
nz => nz.min(header.log_time),
};
self.stats.message_end_time = match self.stats.message_end_time {
0 => header.log_time,
nz => nz.max(header.log_time),
};
self.message_bounds = Some(match self.message_bounds {
None => (header.log_time, header.log_time),
Some((start, end)) => (start.min(header.log_time), end.max(header.log_time)),
});
*self
.stats
.channel_message_counts
.entry(header.channel_id)
.or_insert(0) += 1;
Expand All @@ -313,8 +306,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
}

pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> {
self.stats.attachment_count += 1;

let header = records::AttachmentHeader {
log_time: attachment.log_time,
create_time: attachment.create_time,
Expand Down Expand Up @@ -350,8 +341,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
}

pub fn write_metadata(&mut self, metadata: &Metadata) -> McapResult<()> {
self.stats.metadata_count += 1;

let w = self.finish_chunk()?;
let offset = w.stream_position()?;

Expand Down Expand Up @@ -405,7 +394,6 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
self.writer = Some(match prev_writer {
WriteMode::Raw(w) => {
// It's chunkin time.
self.stats.chunk_count += 1;
WriteMode::Chunk(ChunkWriter::new(w, self.options.compression)?)
}
chunk => chunk,
Expand Down Expand Up @@ -467,8 +455,25 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
// (&mut self).
// (We could get around all this noise by having finish() take self,
// but then it wouldn't be droppable _and_ finish...able.)
let mut stats = records::Statistics::default();
std::mem::swap(&mut stats, &mut self.stats);
let mut channel_message_counts = BTreeMap::new();
std::mem::swap(
&mut channel_message_counts,
&mut self.channel_message_counts,
);

// Grab stats before we munge all the self fields below.
let message_bounds = self.message_bounds.unwrap_or((0, 0));
let stats = records::Statistics {
message_count: channel_message_counts.values().sum(),
schema_count: self.schemas.len() as u16,
channel_count: self.channels.len() as u32,
attachment_count: self.attachment_indexes.len() as u32,
metadata_count: self.metadata_indexes.len() as u32,
chunk_count: self.chunk_indexes.len() as u32,
message_start_time: message_bounds.0,
message_end_time: message_bounds.1,
channel_message_counts,
};

let mut chunk_indexes = Vec::new();
std::mem::swap(&mut chunk_indexes, &mut self.chunk_indexes);
Expand Down Expand Up @@ -685,7 +690,9 @@ impl<W: Write> Write for Compressor<W> {
struct ChunkWriter<W: Write> {
header_start: u64,
stream_start: u64,
header: records::ChunkHeader,
/// Message start and end time, or None if there are no messages yet.
message_bounds: Option<(u64, u64)>,
compression_name: &'static str,
compressor: CountingCrcWriter<Compressor<W>>,
indexes: BTreeMap<u16, Vec<records::MessageIndexEntry>>,
}
Expand All @@ -703,6 +710,8 @@ impl<W: Write + Seek> ChunkWriter<W> {
None => "",
};

// Write a dummy header that we'll overwrite with the actual values later.
// We just need its size (which only varies based on compression name).
let header = records::ChunkHeader {
message_start_time: 0,
message_end_time: 0,
Expand All @@ -711,7 +720,6 @@ impl<W: Write + Seek> ChunkWriter<W> {
compression: String::from(compression_name),
compressed_size: !0,
};

writer.write_le(&header)?;
let stream_start = writer.stream_position()?;

Expand All @@ -733,7 +741,8 @@ impl<W: Write + Seek> ChunkWriter<W> {
compressor,
header_start,
stream_start,
header,
compression_name,
message_bounds: None,
indexes: BTreeMap::new(),
})
}
Expand Down Expand Up @@ -770,15 +779,11 @@ impl<W: Write + Seek> ChunkWriter<W> {
}

fn write_message(&mut self, header: &MessageHeader, data: &[u8]) -> McapResult<()> {
// Update min/max time
self.header.message_start_time = match self.header.message_start_time {
0 => header.log_time,
nz => nz.min(header.log_time),
};
self.header.message_end_time = match self.header.message_end_time {
0 => header.log_time,
nz => nz.max(header.log_time),
};
// Update min/max time for the chunk
self.message_bounds = Some(match self.message_bounds {
None => (header.log_time, header.log_time),
Some((start, end)) => (start.min(header.log_time), end.max(header.log_time)),
});

// Add an index for this message
self.indexes
Expand All @@ -799,22 +804,32 @@ impl<W: Write + Seek> ChunkWriter<W> {
Ok(())
}

fn finish(mut self) -> McapResult<(W, records::ChunkIndex)> {
fn finish(self) -> McapResult<(W, records::ChunkIndex)> {
// Get the number of uncompressed bytes written and the CRC.
self.header.uncompressed_size = self.compressor.position();

let uncompressed_size = self.compressor.position();
let (stream, crc) = self.compressor.finalize();
self.header.uncompressed_crc = crc;
let uncompressed_crc = crc;

// Finalize the compression stream - it maintains an internal buffer.
let mut writer = stream.finish()?;
let end_of_stream = writer.stream_position()?;
self.header.compressed_size = end_of_stream - self.stream_start;
let compressed_size = end_of_stream - self.stream_start;
let record_size = (end_of_stream - self.header_start) as usize - 9; // 1 byte op, 8 byte len

// Back up, write our finished header, then continue at the end of the stream.
writer.seek(SeekFrom::Start(self.header_start))?;
op_and_len(&mut writer, op::CHUNK, record_size)?;
writer.write_le(&self.header)?;
let message_bounds = self.message_bounds.unwrap_or((0, 0));
let header = records::ChunkHeader {
message_start_time: message_bounds.0,
message_end_time: message_bounds.1,
uncompressed_size,
uncompressed_crc,
compression: String::from(self.compression_name),
compressed_size,
};
writer.write_le(&header)?;
assert_eq!(self.stream_start, writer.stream_position()?);
assert_eq!(writer.seek(SeekFrom::End(0))?, end_of_stream);

Expand All @@ -839,15 +854,15 @@ impl<W: Write + Seek> ChunkWriter<W> {
let end_of_indexes = writer.stream_position()?;

let index = records::ChunkIndex {
message_start_time: self.header.message_start_time,
message_end_time: self.header.message_end_time,
message_start_time: header.message_start_time,
message_end_time: header.message_end_time,
chunk_start_offset: self.header_start,
chunk_length: end_of_stream - self.header_start,
message_index_offsets,
message_index_length: end_of_indexes - end_of_stream,
compression: self.header.compression,
compressed_size: self.header.compressed_size,
uncompressed_size: self.header.uncompressed_size,
compression: header.compression,
compressed_size: header.compressed_size,
uncompressed_size: header.uncompressed_size,
};

Ok((writer, index))
Expand Down
Loading