Skip to content

Commit

Permalink
feat(rooch-da): add data length tracking to segments (#2664)
Browse files Browse the repository at this point in the history
* feat(rooch-da): add data length tracking to segments

Added a data_len field to SegmentV0 for tracking data length. Refactored related methods to handle the data length field, ensuring accurate serialization and deserialization. This improves segment management within the chunk processing.

* fix(rooch-da): validate byte length in segment

Prevent out-of-bounds errors by ensuring byte array length suffices before processing segment data. This adds a check for the minimum required length to avoid runtime panics.
  • Loading branch information
popcnt1 authored Sep 21, 2024
1 parent 91161c7 commit c482a27
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 9 deletions.
1 change: 1 addition & 0 deletions crates/rooch-da/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl Chunk for ChunkV0 {
segment_number: i as u64,
},
is_last: i == segments_count - 1, // extra info overhead is much smaller than max_block_size - max_segment_size
data_len: data.len() as u64,
// *_checksum will be filled in to_bytes method of Segment
data_checksum: 0,
checksum: 0,
Expand Down
27 changes: 21 additions & 6 deletions crates/rooch-da/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ pub trait Segment: fmt::Debug + Send {
fn is_last(&self) -> bool;
}

pub const SEGMENT_V0_DATA_OFFSET: usize = 42;
pub const SEGMENT_V0_CHECKSUM_OFFSET: usize = 34;
pub const SEGMENT_V0_DATA_OFFSET: usize = 50;
pub const SEGMENT_V0_CHECKSUM_OFFSET: usize = 42;

#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct SegmentV0 {
pub id: SegmentID,
pub is_last: bool, // is last segment in chunk
pub data_len: u64, // length of data
pub data_checksum: u64, // checksum of data, xxh3_64
pub checksum: u64, // checksum of above fields(exclude data) and version after to_bytes, xxh3_64

Expand All @@ -44,9 +45,20 @@ impl SegmentV0 {
let chunk_id = u128::from_le_bytes(bytes[1..17].try_into()?);
let segment_number = u64::from_le_bytes(bytes[17..25].try_into()?);
let is_last = bytes[25] != 0;
let data_checksum = u64::from_le_bytes(bytes[26..34].try_into()?);
let checksum = u64::from_le_bytes(bytes[34..SEGMENT_V0_DATA_OFFSET].try_into()?);
let data = bytes[SEGMENT_V0_DATA_OFFSET..].to_vec();
let data_len = u64::from_le_bytes(bytes[26..34].try_into()?);
let data_checksum = u64::from_le_bytes(bytes[34..42].try_into()?);
let checksum = u64::from_le_bytes(bytes[42..SEGMENT_V0_DATA_OFFSET].try_into()?);
// check bytes has enough length
if bytes.len() < SEGMENT_V0_DATA_OFFSET + data_len as usize {
return Err(anyhow::anyhow!(format!(
"segment_v0: bytes:{} less than exp header:{} + data:{}",
bytes.len(),
SEGMENT_V0_DATA_OFFSET,
data_len as usize
)));
}
let data =
bytes[SEGMENT_V0_DATA_OFFSET..SEGMENT_V0_DATA_OFFSET + data_len as usize].to_vec();

let exp_checksum = xxh3_64(&bytes[0..SEGMENT_V0_CHECKSUM_OFFSET]);
if exp_checksum != checksum {
Expand All @@ -64,6 +76,7 @@ impl SegmentV0 {
segment_number,
},
is_last,
data_len,
data_checksum,
checksum,
data,
Expand All @@ -73,11 +86,12 @@ impl SegmentV0 {

impl Segment for SegmentV0 {
fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(SEGMENT_V0_DATA_OFFSET + self.data.len());
let mut bytes = Vec::with_capacity(SEGMENT_V0_DATA_OFFSET + self.data_len as usize);
bytes.push(ChunkVersion::V0.into()); // version
bytes.extend_from_slice(&self.id.chunk_id.to_le_bytes());
bytes.extend_from_slice(&self.id.segment_number.to_le_bytes());
bytes.push(self.is_last as u8);
bytes.extend_from_slice(&self.data_len.to_le_bytes());
let data_checksum = xxh3_64(&self.data);
bytes.extend_from_slice(&data_checksum.to_le_bytes());
let checksum = xxh3_64(&bytes[0..SEGMENT_V0_CHECKSUM_OFFSET]);
Expand Down Expand Up @@ -179,6 +193,7 @@ mod tests {
segment_number: 12345678,
},
is_last: true,
data_len: 5,
data_checksum: 1234567890,
checksum: 12345678,
data: vec![1, 2, 3, 4, 5],
Expand Down
21 changes: 18 additions & 3 deletions crates/rooch-da/src/server/openda/actor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use crate::chunk::{Chunk, ChunkV0};
use rooch_config::da_config::{DAServerOpenDAConfig, OpenDAScheme};

use crate::messages::PutBatchInternalDAMessage;
use crate::segment::SegmentID;

pub struct DAServerOpenDAActor {
max_segment_size: usize,
operator: Operator,
}

pub const CHUNK_V0_PREFIX: &str = "chunk_v0";
pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 4 * 1024 * 1024;
pub const DEFAULT_MAX_RETRY_TIMES: usize = 4;

Expand Down Expand Up @@ -119,9 +121,9 @@ impl DAServerOpenDAActor {
let segments = chunk.to_segments(self.max_segment_size);
for segment in segments {
let bytes = segment.to_bytes();

match self
.operator
.write(&segment.get_id().to_string(), bytes)
.write_segment(segment.get_id(), bytes, CHUNK_V0_PREFIX.to_string())
.await
{
Ok(_) => {
Expand All @@ -136,13 +138,26 @@ impl DAServerOpenDAActor {
segment.get_id(),
e,
);
return Err(e.into());
return Err(e);
}
}
}

Ok(())
}

async fn write_segment(
&self,
segment_id: SegmentID,
segment_bytes: Vec<u8>,
prefix: String,
) -> Result<()> {
let path = format!("{}/{}", prefix, segment_id);
let mut w = self.operator.writer(&path).await?;
w.write(segment_bytes).await?;
w.close().await?;
Ok(())
}
}

fn check_config_exist(
Expand Down

0 comments on commit c482a27

Please sign in to comment.