Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #236 from DyrellC/v2.0/test-topics
Browse files Browse the repository at this point in the history
[v2.0] Introduce Topics
  • Loading branch information
DyrellC authored Jun 14, 2022
2 parents 95c8646 + d191de1 commit 35ea66f
Show file tree
Hide file tree
Showing 18 changed files with 776 additions and 242 deletions.
11 changes: 7 additions & 4 deletions lets/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use spongos::{
};

// Local
use crate::id::Identifier;
use crate::{id::Identifier, message::Topic};

/// Abstract representation of a Message Address
///
Expand Down Expand Up @@ -141,12 +141,14 @@ impl AppAddr {
Self(bytes)
}

pub fn gen(identifier: Identifier, app_idx: usize) -> AppAddr {
pub fn gen(identifier: Identifier, base_topic: &Topic) -> AppAddr {
let mut addr = [0u8; 40];
let id_bytes = identifier.as_bytes();
// Create spongos to squeeze topic into final 8 bytes
let squeezed_topic: [u8; 8] = Spongos::<KeccakF1600>::init().sponge(base_topic);
assert_eq!(id_bytes.len(), 32, "identifier must be 32 bytes long");
addr[..32].copy_from_slice(id_bytes);
addr[32..].copy_from_slice(&app_idx.to_be_bytes());
addr[32..].copy_from_slice(&squeezed_topic);
Self::new(addr)
}

Expand Down Expand Up @@ -229,10 +231,11 @@ impl MsgId {
Self(bytes)
}

pub fn gen(appaddr: AppAddr, identifier: Identifier, seq_num: usize) -> MsgId {
pub fn gen(appaddr: AppAddr, identifier: Identifier, topic: &Topic, seq_num: usize) -> MsgId {
let mut s = Spongos::<KeccakF1600>::init();
s.absorb(appaddr);
s.absorb(identifier);
s.absorb(topic);
s.absorb(seq_num.to_be_bytes());
s.commit();
s.squeeze()
Expand Down
17 changes: 13 additions & 4 deletions lets/src/message/hdf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use alloc::boxed::Box;

use anyhow::{anyhow, ensure, Result};
use async_trait::async_trait;

Expand All @@ -18,12 +17,13 @@ use crate::{
id::Identifier,
message::{
content::{ContentSizeof, ContentUnwrap, ContentWrap},
topic::Topic,
version::{HDF_ID, STREAMS_1_VER, UTF8},
},
};

#[non_exhaustive]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[allow(clippy::upper_case_acronyms)]
pub struct HDF {
encoding: u8,
Expand All @@ -38,6 +38,7 @@ pub struct HDF {
pub linked_msg_address: Option<MsgId>,
pub sequence: usize,
pub publisher: Identifier,
pub topic: Topic,
}

impl Default for HDF {
Expand All @@ -52,12 +53,13 @@ impl Default for HDF {
linked_msg_address: Default::default(),
sequence: 0,
publisher: Default::default(),
topic: Default::default(),
}
}
}

impl HDF {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier) -> Result<Self> {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: Topic) -> Result<Self> {
ensure!(
message_type >> 4 == 0,
anyhow!(
Expand All @@ -75,6 +77,7 @@ impl HDF {
linked_msg_address: None,
sequence,
publisher,
topic,
})
}

Expand Down Expand Up @@ -118,6 +121,10 @@ impl HDF {
pub fn linked_msg_address(&self) -> Option<MsgId> {
self.linked_msg_address
}

pub fn topic(&self) -> &Topic {
&self.topic
}
}

#[async_trait(?Send)]
Expand All @@ -132,6 +139,7 @@ impl ContentSizeof<HDF> for sizeof::Context {
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?;

Expand Down Expand Up @@ -168,6 +176,7 @@ where
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?;

Expand Down Expand Up @@ -217,6 +226,7 @@ where
anyhow!("first 2 bits of payload-frame-count are reserved"),
)?
.absorb(Maybe::new(&mut hdf.linked_msg_address))?
.mask(&mut hdf.topic)?
.mask(&mut hdf.publisher)?
.skip(&mut seq_num)?;

Expand All @@ -232,7 +242,6 @@ where
x[2] = payload_frame_count_bytes[1];
x[3] = payload_frame_count_bytes[2];
hdf.payload_frame_count = u32::from_be_bytes(x);

hdf.sequence = seq_num.inner();

Ok(self)
Expand Down
6 changes: 3 additions & 3 deletions lets/src/message/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{
transport::TransportMessage,
};

#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug)]
#[derive(Clone, PartialEq, Eq, Hash, Default, Debug)]
pub struct Message<Content> {
header: HDF,
payload: PCF<Content>,
Expand All @@ -40,8 +40,8 @@ impl<Payload> Message<Payload> {
self
}

pub fn header(&self) -> HDF {
self.header
pub fn header(&self) -> &HDF {
&self.header
}

pub fn payload(&self) -> &PCF<Payload> {
Expand Down
3 changes: 3 additions & 0 deletions lets/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod version;
/// Linked Message with header already parsed
mod preparsed;

pub mod topic;

mod message;

pub use content::{
Expand All @@ -22,4 +24,5 @@ pub use hdf::HDF;
pub use message::Message;
pub use pcf::PCF;
pub use preparsed::PreparsedMessage;
pub use topic::Topic;
pub use transport::TransportMessage;
4 changes: 2 additions & 2 deletions lets/src/message/preparsed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl<F> PreparsedMessage<F> {
}
}

pub fn header(&self) -> HDF {
self.header
pub fn header(&self) -> &HDF {
&self.header
}

pub fn transport_msg(&self) -> &TransportMessage {
Expand Down
94 changes: 94 additions & 0 deletions lets/src/message/topic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use alloc::{
string::{String, ToString},
vec::Vec,
};
use core::{
convert::{TryFrom, TryInto},
fmt::Formatter,
};
use spongos::{
ddml::{
commands::{sizeof, unwrap, wrap, Mask},
io,
types::Bytes,
},
PRP,
};

#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
pub struct Topic(String);

impl Topic {
pub fn new(t: String) -> Self {
Self(t)
}
}

impl From<&str> for Topic {
fn from(t: &str) -> Self {
Self(t.to_string())
}
}

impl From<String> for Topic {
fn from(t: String) -> Self {
Self(t)
}
}

impl TryFrom<&[u8]> for Topic {
type Error = anyhow::Error;
fn try_from(t: &[u8]) -> Result<Self, Self::Error> {
let topic = String::from_utf8(t.to_vec())?;
Ok(Topic(topic))
}
}

impl TryFrom<Vec<u8>> for Topic {
type Error = anyhow::Error;
fn try_from(t: Vec<u8>) -> Result<Self, Self::Error> {
let topic = String::from_utf8(t)?;
Ok(Topic(topic))
}
}

impl core::fmt::Display for Topic {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(f, "{}", &self.0)
}
}

impl AsRef<[u8]> for Topic {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

impl Mask<&Topic> for sizeof::Context {
fn mask(&mut self, topic: &Topic) -> anyhow::Result<&mut Self> {
self.mask(Bytes::new(topic))
}
}

impl<OS, F> Mask<&Topic> for wrap::Context<OS, F>
where
F: PRP,
OS: io::OStream,
{
fn mask(&mut self, topic: &Topic) -> anyhow::Result<&mut Self> {
self.mask(Bytes::new(topic))
}
}

impl<IS, F> Mask<&mut Topic> for unwrap::Context<IS, F>
where
F: PRP,
IS: io::IStream,
{
fn mask(&mut self, topic: &mut Topic) -> anyhow::Result<&mut Self> {
let mut topic_bytes = topic.as_ref().to_vec();
self.mask(Bytes::new(&mut topic_bytes))?;
*topic = topic_bytes.try_into()?;
Ok(self)
}
}
1 change: 1 addition & 0 deletions spongos/src/ddml/commands/unwrap/absorb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl<F: PRP, IS: io::IStream> Absorb<Bytes<&mut Vec<u8>>> for Context<IS, F> {
fn absorb(&mut self, mut bytes: Bytes<&mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.absorb(&mut size)?;
self.stream.ensure_size(size.inner())?;
bytes.resize(size.inner());
AbsorbContext::new(self).unwrapn(bytes)?;
Ok(self)
Expand Down
1 change: 1 addition & 0 deletions spongos/src/ddml/commands/unwrap/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl<'a, F: PRP, IS: io::IStream> Mask<Bytes<&'a mut Vec<u8>>> for Context<IS, F
fn mask(&mut self, mut bytes: Bytes<&'a mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.mask(&mut size)?;
self.stream.ensure_size(size.inner())?;
bytes.resize(size.inner());
MaskContext::new(self).unwrapn(bytes)?;
Ok(self)
Expand Down
1 change: 1 addition & 0 deletions spongos/src/ddml/commands/unwrap/skip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl<'a, F, IS: io::IStream> Skip<Bytes<&'a mut Vec<u8>>> for Context<IS, F> {
fn skip(&mut self, mut bytes: Bytes<&'a mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.skip(&mut size)?;
self.stream.ensure_size(size.inner())?;
bytes.resize(size.inner());
SkipContext::new(self).unwrapn(bytes)?;
Ok(self)
Expand Down
14 changes: 13 additions & 1 deletion spongos/src/ddml/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub trait OStream {

/// Read
pub trait IStream {
/// Ensure there are enough bytes left in stream for advancement
fn ensure_size(&self, n: usize) -> Result<()>;

/// Try get n bytes from the stream, returning a slice to the buffer.
fn try_advance(&mut self, n: usize) -> Result<&[u8]>;

Expand Down Expand Up @@ -50,8 +53,13 @@ where
}

impl IStream for &[u8] {
fn try_advance(&mut self, n: usize) -> Result<&[u8]> {
fn ensure_size(&self, n: usize) -> Result<()> {
ensure!(n <= self.len(), StreamAllocationExceededIn(n, self.len()));
Ok(())
}

fn try_advance(&mut self, n: usize) -> Result<&[u8]> {
self.ensure_size(n)?;
let (head, tail) = self.split_at(n);
*self = tail;
Ok(head)
Expand All @@ -66,6 +74,10 @@ impl<T> IStream for &mut T
where
T: IStream,
{
fn ensure_size(&self, n: usize) -> Result<()> {
self.deref().ensure_size(n)
}

fn try_advance(&mut self, n: usize) -> Result<&[u8]> {
self.deref_mut().try_advance(n)
}
Expand Down
Loading

0 comments on commit 35ea66f

Please sign in to comment.