Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

[v2.0] Introduce Topics #236

Merged
merged 37 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
15eb1dc
First pass branching
DyrellC May 13, 2022
8886b36
AsRef for topic
DyrellC May 16, 2022
d79e56d
mask topic directly, include in address gen, eq failing for user
DyrellC May 18, 2022
125f314
Fix examples/merge overwrites
DyrellC May 19, 2022
c6eda71
Add anchor/latest link to key store, remove link_to from sending
DyrellC May 19, 2022
3fbf7ea
clippy/fmt/warnings
DyrellC May 19, 2022
bd41e2b
fix tests
DyrellC May 19, 2022
8515d9d
fmt
DyrellC May 19, 2022
6d904c5
fmt in nightly
DyrellC May 19, 2022
c5c104c
remove unused functions
DyrellC May 20, 2022
cd30f54
Write test to expose `BranchStore` not removing all cursors in `Branc…
May 24, 2022
27fe5ca
remove fold and use for_each in cursor removal
DyrellC May 24, 2022
ea855fb
Merge branch 'v2.0/test-topics' of https://github.com/DyrellC/streams…
DyrellC May 24, 2022
2b00e82
Insert own base branch topic
DyrellC May 25, 2022
8aa1fa6
Make Topic String
DyrellC May 26, 2022
0e12d43
Merge branch 'v2.0-dev' of https://github.com/iotaledger/streams into…
DyrellC May 31, 2022
53cb461
Check size before resizing bytes
DyrellC May 31, 2022
05c7d84
fmt/clippy
DyrellC May 31, 2022
d257605
tests update
DyrellC May 31, 2022
3990b43
Merge branch 'v2.0/move-keys' of https://github.com/DyrellC/streams i…
DyrellC May 31, 2022
21afe44
fmt
DyrellC May 31, 2022
79bd3ea
fix cursor store test
DyrellC May 31, 2022
0e222b4
Merge branch 'v2.0-dev' of https://github.com/iotaledger/streams into…
DyrellC Jun 2, 2022
4c26b18
first round review changes
DyrellC Jun 9, 2022
b068917
round two review updates
DyrellC Jun 9, 2022
10f923f
update tests
DyrellC Jun 9, 2022
f14f065
Mac check for backup/restore, Address::gen takes topic by ref
DyrellC Jun 9, 2022
1194c8e
Update streams/src/api/cursor_store.rs
DyrellC Jun 9, 2022
ef728fd
Update cursor_store for anchor and latest link set/get
DyrellC Jun 9, 2022
d301431
fmt, clippy
DyrellC Jun 9, 2022
158b3f3
Merge branch 'v2.0/test-topics' of https://github.com/DyrellC/streams…
DyrellC Jun 9, 2022
705daad
typo
DyrellC Jun 9, 2022
e514d50
base_topic -> base_branch
DyrellC Jun 9, 2022
5b837d5
clean up user from review suggestions
DyrellC Jun 9, 2022
91c0b37
revert byte changes
DyrellC Jun 9, 2022
a5f0ae1
remove resize result
DyrellC Jun 9, 2022
d191de1
rename CursorStore and BranchStore
DyrellC Jun 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions lets/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use spongos::{
};

// Local
use crate::id::Identifier;
use crate::{id::Identifier, message::Topic};

/// Abstract representation of a Message Address
///
Expand Down Expand Up @@ -141,12 +141,14 @@ impl AppAddr {
Self(bytes)
}

pub fn gen(identifier: Identifier, app_idx: usize) -> AppAddr {
pub fn gen(identifier: Identifier, base_topic: &Topic) -> AppAddr {
let mut addr = [0u8; 40];
let id_bytes = identifier.as_bytes();
// Create spongos to squeeze topic into final 8 bytes
let squeezed_topic: [u8; 8] = Spongos::<KeccakF1600>::init().sponge(base_topic);
assert_eq!(id_bytes.len(), 32, "identifier must be 32 bytes long");
addr[..32].copy_from_slice(id_bytes);
addr[32..].copy_from_slice(&app_idx.to_be_bytes());
addr[32..].copy_from_slice(&squeezed_topic);
Self::new(addr)
}

Expand Down Expand Up @@ -229,10 +231,11 @@ impl MsgId {
Self(bytes)
}

pub fn gen(appaddr: AppAddr, identifier: Identifier, seq_num: usize) -> MsgId {
pub fn gen(appaddr: AppAddr, identifier: Identifier, topic: &Topic, seq_num: usize) -> MsgId {
let mut s = Spongos::<KeccakF1600>::init();
s.absorb(appaddr);
s.absorb(identifier);
s.absorb(topic);
s.absorb(seq_num.to_be_bytes());
s.commit();
s.squeeze()
Expand Down
17 changes: 13 additions & 4 deletions lets/src/message/hdf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use alloc::boxed::Box;

use anyhow::{anyhow, ensure, Result};
use async_trait::async_trait;

Expand All @@ -18,12 +17,13 @@ use crate::{
id::Identifier,
message::{
content::{ContentSizeof, ContentUnwrap, ContentWrap},
topic::Topic,
version::{HDF_ID, STREAMS_1_VER, UTF8},
},
};

#[non_exhaustive]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[allow(clippy::upper_case_acronyms)]
pub struct HDF {
encoding: u8,
Expand All @@ -38,6 +38,7 @@ pub struct HDF {
pub linked_msg_address: Option<MsgId>,
pub sequence: usize,
pub publisher: Identifier,
pub topic: Topic,
}

impl Default for HDF {
Expand All @@ -52,12 +53,13 @@ impl Default for HDF {
linked_msg_address: Default::default(),
sequence: 0,
publisher: Default::default(),
topic: Default::default(),
}
}
}

impl HDF {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier) -> Result<Self> {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: Topic) -> Result<Self> {
ensure!(
message_type >> 4 == 0,
anyhow!(
Expand All @@ -75,6 +77,7 @@ impl HDF {
linked_msg_address: None,
sequence,
publisher,
topic,
})
}

Expand Down Expand Up @@ -118,6 +121,10 @@ impl HDF {
pub fn linked_msg_address(&self) -> Option<MsgId> {
self.linked_msg_address
}

pub fn topic(&self) -> &Topic {
&self.topic
}
}

#[async_trait(?Send)]
Expand All @@ -132,6 +139,7 @@ impl ContentSizeof<HDF> for sizeof::Context {
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?;

Expand Down Expand Up @@ -168,6 +176,7 @@ where
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?;

Expand Down Expand Up @@ -217,6 +226,7 @@ where
anyhow!("first 2 bits of payload-frame-count are reserved"),
)?
.absorb(Maybe::new(&mut hdf.linked_msg_address))?
.mask(&mut hdf.topic)?
.mask(&mut hdf.publisher)?
.skip(&mut seq_num)?;

Expand All @@ -232,7 +242,6 @@ where
x[2] = payload_frame_count_bytes[1];
x[3] = payload_frame_count_bytes[2];
hdf.payload_frame_count = u32::from_be_bytes(x);

hdf.sequence = seq_num.inner();

Ok(self)
Expand Down
6 changes: 3 additions & 3 deletions lets/src/message/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{
transport::TransportMessage,
};

#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug)]
#[derive(Clone, PartialEq, Eq, Hash, Default, Debug)]
pub struct Message<Content> {
header: HDF,
payload: PCF<Content>,
Expand All @@ -40,8 +40,8 @@ impl<Payload> Message<Payload> {
self
}

pub fn header(&self) -> HDF {
self.header
pub fn header(&self) -> &HDF {
&self.header
}

pub fn payload(&self) -> &PCF<Payload> {
Expand Down
3 changes: 3 additions & 0 deletions lets/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod version;
/// Linked Message with header already parsed
mod preparsed;

pub mod topic;

mod message;

pub use content::{
Expand All @@ -22,4 +24,5 @@ pub use hdf::HDF;
pub use message::Message;
pub use pcf::PCF;
pub use preparsed::PreparsedMessage;
pub use topic::Topic;
pub use transport::TransportMessage;
4 changes: 2 additions & 2 deletions lets/src/message/preparsed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl<F> PreparsedMessage<F> {
}
}

pub fn header(&self) -> HDF {
self.header
pub fn header(&self) -> &HDF {
&self.header
}

pub fn transport_msg(&self) -> &TransportMessage {
Expand Down
94 changes: 94 additions & 0 deletions lets/src/message/topic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use alloc::{
string::{String, ToString},
vec::Vec,
};
use core::{
convert::{TryFrom, TryInto},
fmt::Formatter,
};
use spongos::{
ddml::{
commands::{sizeof, unwrap, wrap, Mask},
io,
types::Bytes,
},
PRP,
};

#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
pub struct Topic(String);

impl Topic {
pub fn new(t: String) -> Self {
Self(t)
}
}

impl From<&str> for Topic {
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
fn from(t: &str) -> Self {
Self(t.to_string())
}
}

impl From<String> for Topic {
fn from(t: String) -> Self {
Self(t)
}
}

impl TryFrom<&[u8]> for Topic {
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
type Error = anyhow::Error;
fn try_from(t: &[u8]) -> Result<Self, Self::Error> {
let topic = String::from_utf8(t.to_vec())?;
Ok(Topic(topic))
}
}

impl TryFrom<Vec<u8>> for Topic {
type Error = anyhow::Error;
fn try_from(t: Vec<u8>) -> Result<Self, Self::Error> {
let topic = String::from_utf8(t)?;
Ok(Topic(topic))
}
}

impl core::fmt::Display for Topic {
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(f, "{}", &self.0)
}
}

impl AsRef<[u8]> for Topic {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

impl Mask<&Topic> for sizeof::Context {
fn mask(&mut self, topic: &Topic) -> anyhow::Result<&mut Self> {
self.mask(Bytes::new(topic))
}
}

impl<OS, F> Mask<&Topic> for wrap::Context<OS, F>
where
F: PRP,
OS: io::OStream,
{
fn mask(&mut self, topic: &Topic) -> anyhow::Result<&mut Self> {
self.mask(Bytes::new(topic))
}
}

impl<IS, F> Mask<&mut Topic> for unwrap::Context<IS, F>
where
F: PRP,
IS: io::IStream,
{
fn mask(&mut self, topic: &mut Topic) -> anyhow::Result<&mut Self> {
let mut topic_bytes = topic.as_ref().to_vec();
self.mask(Bytes::new(&mut topic_bytes))?;
*topic = topic_bytes.try_into()?;
Ok(self)
}
}
1 change: 1 addition & 0 deletions spongos/src/ddml/commands/unwrap/absorb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl<F: PRP, IS: io::IStream> Absorb<Bytes<&mut Vec<u8>>> for Context<IS, F> {
fn absorb(&mut self, mut bytes: Bytes<&mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.absorb(&mut size)?;
self.stream.ensure_size(size.inner())?;
bytes.resize(size.inner());
AbsorbContext::new(self).unwrapn(bytes)?;
Ok(self)
Expand Down
1 change: 1 addition & 0 deletions spongos/src/ddml/commands/unwrap/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl<'a, F: PRP, IS: io::IStream> Mask<Bytes<&'a mut Vec<u8>>> for Context<IS, F
fn mask(&mut self, mut bytes: Bytes<&'a mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.mask(&mut size)?;
self.stream.ensure_size(size.inner())?;
bytes.resize(size.inner());
MaskContext::new(self).unwrapn(bytes)?;
Ok(self)
Expand Down
1 change: 1 addition & 0 deletions spongos/src/ddml/commands/unwrap/skip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl<'a, F, IS: io::IStream> Skip<Bytes<&'a mut Vec<u8>>> for Context<IS, F> {
fn skip(&mut self, mut bytes: Bytes<&'a mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.skip(&mut size)?;
self.stream.ensure_size(size.inner())?;
bytes.resize(size.inner());
SkipContext::new(self).unwrapn(bytes)?;
Ok(self)
Expand Down
14 changes: 13 additions & 1 deletion spongos/src/ddml/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub trait OStream {

/// Read
pub trait IStream {
/// Ensure there are enough bytes left in stream for advancement
fn ensure_size(&self, n: usize) -> Result<()>;

/// Try get n bytes from the stream, returning a slice to the buffer.
fn try_advance(&mut self, n: usize) -> Result<&[u8]>;

Expand Down Expand Up @@ -50,8 +53,13 @@ where
}

impl IStream for &[u8] {
fn try_advance(&mut self, n: usize) -> Result<&[u8]> {
fn ensure_size(&self, n: usize) -> Result<()> {
ensure!(n <= self.len(), StreamAllocationExceededIn(n, self.len()));
Ok(())
}

fn try_advance(&mut self, n: usize) -> Result<&[u8]> {
self.ensure_size(n)?;
let (head, tail) = self.split_at(n);
*self = tail;
Ok(head)
Expand All @@ -66,6 +74,10 @@ impl<T> IStream for &mut T
where
T: IStream,
{
fn ensure_size(&self, n: usize) -> Result<()> {
self.deref().ensure_size(n)
}

fn try_advance(&mut self, n: usize) -> Result<&[u8]> {
self.deref_mut().try_advance(n)
}
Expand Down
Loading