diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 4014d6b974b2..2fe879e632eb 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -191,12 +191,11 @@ async fn stream_recording_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -225,12 +224,11 @@ async fn stream_recording_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }); drop(client); @@ -340,7 +338,6 @@ async fn stream_catalog_async( re_log::debug!("Fetching catalog…"); let mut resp = client - // TODO(zehiko) add support for fetching specific columns and rows .query_catalog(QueryCatalogRequest { column_projection: None, // fetch all columns filter: None, // fetch all rows @@ -348,12 +345,11 @@ async fn stream_catalog_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }); drop(client); diff --git a/crates/store/re_log_encoding/src/codec/wire/decoder.rs b/crates/store/re_log_encoding/src/codec/wire/decoder.rs index 06b9bccdbc23..db0db4613cd1 100644 --- a/crates/store/re_log_encoding/src/codec/wire/decoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/decoder.rs @@ -1,55 +1,23 @@ -use super::MessageHeader; -use super::TransportMessageV0; use crate::codec::arrow::read_arrow_from_bytes; use crate::codec::CodecError; use re_chunk::TransportChunk; -impl MessageHeader { - pub(crate) fn decode(read: &mut impl std::io::Read) -> Result { - let mut buffer = [0_u8; Self::SIZE_BYTES]; - read.read_exact(&mut buffer) - .map_err(CodecError::HeaderDecoding)?; - - let header = u8::from_le(buffer[0]); - - Ok(Self(header)) - } -} - -impl TransportMessageV0 { - pub(crate) fn from_bytes(data: &[u8]) -> Result { - let mut reader = std::io::Cursor::new(data); - let header = MessageHeader::decode(&mut reader)?; - - match header { - MessageHeader::NO_DATA => Ok(Self::NoData), - MessageHeader::RECORD_BATCH => { - let (schema, data) = read_arrow_from_bytes(&mut reader)?; - - let tc = TransportChunk { - schema: schema.clone(), - data, - }; - - Ok(Self::RecordBatch(tc)) - } - _ => Err(CodecError::UnknownMessageHeader), - } - } -} - /// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`. pub fn decode( version: re_protos::common::v0::EncoderVersion, data: &[u8], -) -> Result, CodecError> { +) -> Result { match version { re_protos::common::v0::EncoderVersion::V0 => { - let msg = TransportMessageV0::from_bytes(data)?; - match msg { - TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)), - TransportMessageV0::NoData => Ok(None), - } + let mut reader = std::io::Cursor::new(data); + let (schema, data) = read_arrow_from_bytes(&mut reader)?; + + let tc = TransportChunk { + schema: schema.clone(), + data, + }; + + Ok(tc) } } } diff --git a/crates/store/re_log_encoding/src/codec/wire/encoder.rs b/crates/store/re_log_encoding/src/codec/wire/encoder.rs index e6ae62c1e1b7..87438fd0a33c 100644 --- a/crates/store/re_log_encoding/src/codec/wire/encoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/encoder.rs @@ -1,58 +1,18 @@ -use super::MessageHeader; -use super::TransportMessageV0; use crate::codec::arrow::write_arrow_to_bytes; use crate::codec::CodecError; use re_chunk::TransportChunk; -impl MessageHeader { - pub(crate) fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> { - write - .write_all(&[self.0]) - .map_err(CodecError::HeaderEncoding)?; - - Ok(()) - } -} - -impl TransportMessageV0 { - pub(crate) fn to_bytes(&self) -> Result, CodecError> { - match self { - Self::NoData => { - let mut data: Vec = Vec::new(); - MessageHeader::NO_DATA.encode(&mut data)?; - Ok(data) - } - Self::RecordBatch(chunk) => { - let mut data: Vec = Vec::new(); - MessageHeader::RECORD_BATCH.encode(&mut data)?; - - write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; - - Ok(data) - } - } - } -} - -/// Encode a `NoData` message into a byte stream. This can be used by the remote store -/// (i.e. data producer) to signal back to the client that there's no data available. -pub fn no_data(version: re_protos::common::v0::EncoderVersion) -> Result, CodecError> { - match version { - re_protos::common::v0::EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(), - } -} - -// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead -// of sending schema in each transport message for the same stream of batches. This will require codec -// to become stateful and keep track if schema was sent / received. /// Encode a transport chunk into a byte stream. pub fn encode( version: re_protos::common::v0::EncoderVersion, - chunk: TransportChunk, + chunk: &TransportChunk, ) -> Result, CodecError> { match version { re_protos::common::v0::EncoderVersion::V0 => { - TransportMessageV0::RecordBatch(chunk).to_bytes() + let mut data: Vec = Vec::new(); + write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; + + Ok(data) } } } diff --git a/crates/store/re_log_encoding/src/codec/wire/mod.rs b/crates/store/re_log_encoding/src/codec/wire/mod.rs index 587e9e31e2ce..0bc6c702718a 100644 --- a/crates/store/re_log_encoding/src/codec/wire/mod.rs +++ b/crates/store/re_log_encoding/src/codec/wire/mod.rs @@ -4,28 +4,10 @@ pub mod encoder; pub use decoder::decode; pub use encoder::encode; -use re_chunk::TransportChunk; - -#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)] -pub struct MessageHeader(pub u8); - -impl MessageHeader { - pub const NO_DATA: Self = Self(1); - pub const RECORD_BATCH: Self = Self(2); - - pub const SIZE_BYTES: usize = 1; -} - -#[derive(Debug)] -pub enum TransportMessageV0 { - NoData, - RecordBatch(TransportChunk), -} - #[cfg(test)] mod tests { use crate::{ - codec::wire::{decode, encode, TransportMessageV0}, + codec::wire::{decode, encode}, codec::CodecError, }; use re_chunk::{Chunk, RowId}; @@ -56,35 +38,9 @@ mod tests { } #[test] - fn test_message_v0_no_data() { - let msg = TransportMessageV0::NoData; - let data = msg.to_bytes().unwrap(); - let decoded = TransportMessageV0::from_bytes(&data).unwrap(); - assert!(matches!(decoded, TransportMessageV0::NoData)); - } - - #[test] - fn test_message_v0_record_batch() { - let expected_chunk = get_test_chunk(); - - let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap()); - let data = msg.to_bytes().unwrap(); - let decoded = TransportMessageV0::from_bytes(&data).unwrap(); - - #[allow(clippy::match_wildcard_for_single_variants)] - match decoded { - TransportMessageV0::RecordBatch(transport) => { - let decoded_chunk = Chunk::from_transport(&transport).unwrap(); - assert_eq!(expected_chunk, decoded_chunk); - } - _ => panic!("unexpected message type"), - } - } - - #[test] - fn test_invalid_batch_data() { - let data = vec![2, 3, 4]; // '1' is NO_DATA message header - let decoded = TransportMessageV0::from_bytes(&data); + fn test_invalid_data() { + let data = vec![2, 3, 4]; + let decoded = decode(EncoderVersion::V0, &data); assert!(matches!( decoded.err().unwrap(), @@ -92,28 +48,16 @@ mod tests { )); } - #[test] - fn test_unknown_header() { - let data = vec![3]; - let decoded = TransportMessageV0::from_bytes(&data); - assert!(decoded.is_err()); - - assert!(matches!( - decoded.err().unwrap(), - CodecError::UnknownMessageHeader - )); - } - #[test] fn test_v0_codec() { let expected_chunk = get_test_chunk(); let encoded = encode( EncoderVersion::V0, - expected_chunk.clone().to_transport().unwrap(), + &expected_chunk.clone().to_transport().unwrap(), ) .unwrap(); - let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap(); + let decoded = decode(EncoderVersion::V0, &encoded).unwrap(); let decoded_chunk = Chunk::from_transport(&decoded).unwrap(); assert_eq!(expected_chunk, decoded_chunk); diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 414d116bb698..34d436aaf828 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -107,12 +107,11 @@ impl PyStorageNodeClient { .await .map_err(re_grpc_client::TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -156,12 +155,11 @@ impl PyStorageNodeClient { .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -207,12 +205,11 @@ impl PyStorageNodeClient { .await .map_err(|err| PyRuntimeError::new_err(err.to_string()))? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, _>>() .await @@ -272,7 +269,7 @@ impl PyStorageNodeClient { let metadata_tc = TransportChunk::from_arrow_record_batch(&metadata); - encode(EncoderVersion::V0, metadata_tc) + encode(EncoderVersion::V0, &metadata_tc) .map_err(|err| PyRuntimeError::new_err(err.to_string())) }) .transpose()? @@ -296,9 +293,7 @@ impl PyStorageNodeClient { .map_err(|err| PyRuntimeError::new_err(err.to_string()))? .into_inner(); let metadata = decode(resp.encoder_version(), &resp.payload) - .map_err(|err| PyRuntimeError::new_err(err.to_string()))? - // TODO(zehiko) this is going away soon - .ok_or(PyRuntimeError::new_err("No metadata"))?; + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; let recording_id = metadata .all_columns() @@ -344,7 +339,7 @@ impl PyStorageNodeClient { let request = UpdateCatalogRequest { metadata: Some(DataframePart { encoder_version: EncoderVersion::V0 as i32, - payload: encode(EncoderVersion::V0, metadata_tc) + payload: encode(EncoderVersion::V0, &metadata_tc) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?, }), }; @@ -434,13 +429,12 @@ impl PyStorageNodeClient { while let Some(result) = resp.next().await { let response = result.map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - let tc = decode(EncoderVersion::V0, &response.payload) - .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - - let Some(tc) = tc else { - return Err(PyRuntimeError::new_err("Stream error")); + let tc = match decode(EncoderVersion::V0, &response.payload) { + Ok(tc) => tc, + Err(err) => { + return Err(PyRuntimeError::new_err(err.to_string())); + } }; - let chunk = Chunk::from_transport(&tc) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; @@ -450,9 +444,9 @@ impl PyStorageNodeClient { } Ok(store) - })?; + }); - let handle = ChunkStoreHandle::new(store); + let handle = ChunkStoreHandle::new(store?); let cache = re_dataframe::QueryCacheHandle::new(re_dataframe::QueryCache::new(handle.clone()));