Skip to content

Commit

Permalink
snapshot: Get rid of Reader, read from &[i32] instead
Browse files Browse the repository at this point in the history
Varint-decode the whole snapshot first, and only then try to parse it.
Simplifies some code and allows to also read from a slice `&[i32]`.
  • Loading branch information
heinrich5991 committed Jul 9, 2024
1 parent 37a6ceb commit d332d8a
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 93 deletions.
16 changes: 4 additions & 12 deletions demo/src/ddnet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use libtw2_packer::IntUnpacker;
use libtw2_packer::Unpacker;
use libtw2_snapshot::snap;
use libtw2_snapshot::Delta;
use libtw2_snapshot::Reader as SnapReader;
use libtw2_snapshot::Snap;
use std::collections::BTreeMap;
use std::io;
Expand Down Expand Up @@ -43,7 +42,7 @@ pub struct DemoReader<'a, P: for<'p> Protocol<'p>> {
delta: Delta,
snap: Snap,
old_snap: Snap,
snap_reader: SnapReader,
snap_read_buf: Vec<i32>,
snapshot: Snapshot<P::SnapObj>,
protocol: PhantomData<P>,
}
Expand All @@ -54,7 +53,6 @@ pub enum Warning {
Snapshot(libtw2_snapshot::format::Warning),
Packer(libtw2_packer::Warning),
ExcessItemData,
ExcessSnapshotData,
Gamenet(libtw2_gamenet_common::error::Error),
}

Expand Down Expand Up @@ -98,7 +96,7 @@ impl<'a, P: for<'p> Protocol<'p>> DemoReader<'a, P> {
delta: Delta::new(),
snap: Snap::empty(),
old_snap: Snap::empty(),
snap_reader: SnapReader::new(),
snap_read_buf: Vec::new(),
snapshot: Snapshot::default(),
protocol: PhantomData,
})
Expand Down Expand Up @@ -157,15 +155,9 @@ impl<'a, P: for<'p> Protocol<'p>> DemoReader<'a, P> {
}
}
Some(RawChunk::Snapshot(snap)) => {
let mut unpacker = Unpacker::new(snap);
let builder = mem::replace(&mut self.snap, Snap::default()).recycle();
self.snap = self
.snap_reader
.read(wrap(warn), builder, &mut unpacker)
self.snap
.read(wrap(warn), &mut self.snap_read_buf, snap)
.map_err(ReadError::Snap)?;
unpacker.finish(warn::closure(&mut |ExcessData| {
warn.warn(Warning::ExcessSnapshotData)
}));
self.snapshot.build::<P, _>(warn, &self.snap)?;
Ok(Some(Chunk::Snapshot(self.snapshot.objects.iter())))
}
Expand Down
12 changes: 6 additions & 6 deletions packer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use buffer::BufferRef;
use buffer::CapacityError;
use libtw2_common::num::Cast;
use libtw2_common::unwrap_or_return;
use std::iter;
use std::mem;
use std::slice;
#[cfg(feature = "uuid")]
Expand Down Expand Up @@ -271,24 +270,25 @@ impl<'a> Unpacker<'a> {
}

pub struct IntUnpacker<'a> {
iter: iter::Cloned<slice::Iter<'a, i32>>,
iter: slice::Iter<'a, i32>,
}

impl<'a> IntUnpacker<'a> {
pub fn new(slice: &[i32]) -> IntUnpacker {
IntUnpacker {
iter: slice.iter().cloned(),
}
IntUnpacker { iter: slice.iter() }
}
pub fn read_int(&mut self) -> Result<i32, UnexpectedEnd> {
self.iter.next().ok_or(UnexpectedEnd)
self.iter.next().copied().ok_or(UnexpectedEnd)
}
pub fn finish<W: Warn<ExcessData>>(&mut self, warn: &mut W) {
// TODO: replace with !self.is_empty()
if self.iter.len() != 0 {
warn.warn(ExcessData);
}
}
pub fn as_slice(&self) -> &'a [i32] {
self.iter.as_slice()
}
}

pub fn in_range(v: i32, min: i32, max: i32) -> Result<i32, IntOutOfRange> {
Expand Down
8 changes: 8 additions & 0 deletions snapshot/src/format.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::snap::Error;
use buffer::CapacityError;
use libtw2_packer::IntUnpacker;
use libtw2_packer::Packer;
use libtw2_packer::Unpacker;
use warn::wrap;
Expand All @@ -14,6 +15,7 @@ pub enum Warning {
UnknownDelete,
DeleteUpdate,
NumUpdatedItems,
ExcessSnapData,
}

impl From<libtw2_packer::Warning> for Warning {
Expand Down Expand Up @@ -66,6 +68,12 @@ impl SnapHeader {
num_items: libtw2_packer::positive(p.read_int(wrap(warn))?)?,
})
}
pub fn decode_obj(p: &mut IntUnpacker) -> Result<SnapHeader, Error> {
Ok(SnapHeader {
data_size: libtw2_packer::positive(p.read_int()?)?,
num_items: libtw2_packer::positive(p.read_int()?)?,
})
}
}

#[derive(Clone, Copy, Debug)]
Expand Down
1 change: 0 additions & 1 deletion snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub use self::manager::Manager;
pub use self::receiver::DeltaReceiver;
pub use self::receiver::ReceivedDelta;
pub use self::snap::Delta;
pub use self::snap::Reader;
pub use self::snap::Snap;
pub use self::storage::Storage;

Expand Down
185 changes: 111 additions & 74 deletions snapshot/src/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use libtw2_gamenet_snap as msg;
use libtw2_gamenet_snap::SnapMsg;
use libtw2_gamenet_snap::MAX_SNAPSHOT_PACKSIZE;
use libtw2_packer::with_packer;
use libtw2_packer::IntUnpacker;
use libtw2_packer::Packer;
use libtw2_packer::UnexpectedEnd;
use libtw2_packer::Unpacker;
use std::cmp;
use std::collections::btree_map;
Expand Down Expand Up @@ -82,8 +84,8 @@ impl From<libtw2_packer::IntOutOfRange> for Error {
}
}

impl From<libtw2_packer::UnexpectedEnd> for Error {
fn from(_: libtw2_packer::UnexpectedEnd) -> Error {
impl From<UnexpectedEnd> for Error {
fn from(UnexpectedEnd: UnexpectedEnd) -> Error {
Error::UnexpectedEnd
}
}
Expand Down Expand Up @@ -159,6 +161,24 @@ impl Snap {
buf.extend(iter::repeat(0).take(size));
Ok(entry.insert(start..end))
}
fn add_item_raw(
&mut self,
type_id: u16,
id: u16,
size: usize,
) -> Result<&mut [i32], BuilderError> {
let offset = match self.offsets.entry(key(type_id, id)) {
btree_map::Entry::Occupied(..) => return Err(BuilderError::DuplicateKey),
btree_map::Entry::Vacant(v) => Snap::prepare_item_vacant(v, &mut self.buf, size)?,
}
.clone();
Ok(&mut self.buf[to_usize(offset)])
}
fn add_item(&mut self, type_id: u16, id: u16, data: &[i32]) -> Result<(), BuilderError> {
self.add_item_raw(type_id, id, data.len())?
.copy_from_slice(data);
Ok(())
}
fn prepare_item(&mut self, type_id: u16, id: u16, size: usize) -> Result<&mut [i32], Error> {
let offset = match self.offsets.entry(key(type_id, id)) {
btree_map::Entry::Occupied(o) => o.into_mut(),
Expand All @@ -167,6 +187,94 @@ impl Snap {
.clone();
Ok(&mut self.buf[to_usize(offset)])
}
pub fn read<W: Warn<Warning>>(
&mut self,
warn: &mut W,
buf: &mut Vec<i32>,
data: &[u8],
) -> Result<(), Error> {
self.clear();
buf.clear();

let mut unpacker = Unpacker::new(data);
while !unpacker.is_empty() {
match unpacker.read_int(wrap(warn)) {
Ok(int) => buf.push(int),
Err(UnexpectedEnd) => {
warn.warn(Warning::ExcessSnapData);
break;
}
}
}

self.read_from_ints(warn, &buf)
}
pub fn read_from_ints<W: Warn<Warning>>(
&mut self,
warn: &mut W,
data: &[i32],
) -> Result<(), Error> {
self.clear();

let mut unpacker = IntUnpacker::new(data);
let header = SnapHeader::decode_obj(&mut unpacker)?;
let data = unpacker.as_slice();

let offsets_len = header.num_items.assert_usize();
if data.len() < offsets_len {
return Err(Error::OffsetsUnpacking);
}
if header.data_size % 4 != 0 {
return Err(Error::InvalidOffset);
}
let items_len = (header.data_size / 4).assert_usize();
match (offsets_len + items_len).cmp(&data.len()) {
cmp::Ordering::Less => warn.warn(Warning::ExcessSnapData),
cmp::Ordering::Equal => {}
cmp::Ordering::Greater => return Err(Error::ItemsUnpacking),
}

let (offsets, item_data) = data.split_at(offsets_len);
let item_data = &item_data[..items_len];

let mut offsets = offsets.iter();
let mut prev_offset = None;
loop {
let offset = offsets.next().copied();
if let Some(offset) = offset {
if offset < 0 {
return Err(Error::InvalidOffset);
}
if offset % 4 != 0 {
return Err(Error::InvalidOffset);
}
}
let finished = offset.is_none();
let offset = offset.map(|o| o.assert_usize() / 4).unwrap_or(items_len);

if let Some(prev_offset) = prev_offset {
if offset <= prev_offset {
return Err(Error::InvalidOffset);
}
if offset > items_len {
return Err(Error::InvalidOffset);
}
let type_id = key_to_type_id(item_data[prev_offset]);
let id = key_to_id(item_data[prev_offset]);
self.add_item(type_id, id, &item_data[prev_offset + 1..offset])?;
} else if offset != 0 {
// First offset must be 0.
return Err(Error::InvalidOffset);
}

prev_offset = Some(offset);

if finished {
break;
}
}
Ok(())
}
pub fn read_with_delta<W>(
&mut self,
warn: &mut W,
Expand Down Expand Up @@ -254,66 +362,10 @@ impl Snap {
}
}

pub struct Reader {
sizes: Vec<i32>,
}

fn read_int_err<W: Warn<Warning>>(p: &mut Unpacker, w: &mut W, e: Error) -> Result<i32, Error> {
p.read_int(wrap(w)).map_err(|_| e)
}

impl Reader {
pub fn new() -> Reader {
Reader { sizes: Vec::new() }
}
pub fn read<W: Warn<Warning>>(
&mut self,
warn: &mut W,
mut builder: Builder,
p: &mut Unpacker,
) -> Result<Snap, Error> {
self.sizes.clear();
let header = SnapHeader::decode(warn, p)?;
let mut prev_offset = None;
for _ in 0..header.num_items {
let offset = read_int_err(p, warn, Error::OffsetsUnpacking)?;
if let Some(prev) = prev_offset {
if prev > offset {
return Err(Error::InvalidOffset);
}
self.sizes.push(offset - prev);
} else if offset != 0 {
// First offset must be 0
return Err(Error::InvalidOffset);
}
prev_offset = Some(offset);
}
if let Some(last) = prev_offset {
if last > header.data_size {
return Err(Error::InvalidOffset);
}
self.sizes.push(header.data_size - last)
}

for size in &self.sizes {
if *size % 4 != 0 {
return Err(Error::InvalidOffset);
}
if *size == 0 {
return Err(Error::InvalidOffset);
}
let size = size / 4;
let key = read_int_err(p, warn, Error::ItemsUnpacking)?;
let type_id = key_to_type_id(key);
let id = key_to_id(key);
for int in builder.add_item_raw(type_id, id, size.assert_usize() - 1)? {
*int = read_int_err(p, warn, Error::ItemsUnpacking)?;
}
}
Ok(builder.finish())
}
}

pub struct Items<'a> {
snap: &'a Snap,
iter: btree_map::Iter<'a, i32, ops::Range<u32>>,
Expand Down Expand Up @@ -502,23 +554,8 @@ impl Builder {
pub fn new() -> Builder {
Default::default()
}
fn add_item_raw(
&mut self,
type_id: u16,
id: u16,
size: usize,
) -> Result<&mut [i32], BuilderError> {
let offset = match self.snap.offsets.entry(key(type_id, id)) {
btree_map::Entry::Occupied(..) => return Err(BuilderError::DuplicateKey),
btree_map::Entry::Vacant(v) => Snap::prepare_item_vacant(v, &mut self.snap.buf, size)?,
}
.clone();
Ok(&mut self.snap.buf[to_usize(offset)])
}
pub fn add_item(&mut self, type_id: u16, id: u16, data: &[i32]) -> Result<(), BuilderError> {
self.add_item_raw(type_id, id, data.len())?
.copy_from_slice(data);
Ok(())
self.snap.add_item(type_id, id, data)
}
pub fn finish(self) -> Snap {
self.snap
Expand Down

0 comments on commit d332d8a

Please sign in to comment.