Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

[v2.0] Introduce Topics #236

Merged
merged 37 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
15eb1dc
First pass branching
DyrellC May 13, 2022
8886b36
AsRef for topic
DyrellC May 16, 2022
d79e56d
mask topic directly, include in address gen, eq failing for user
DyrellC May 18, 2022
125f314
Fix examples/merge overwrites
DyrellC May 19, 2022
c6eda71
Add anchor/latest link to key store, remove link_to from sending
DyrellC May 19, 2022
3fbf7ea
clippy/fmt/warnings
DyrellC May 19, 2022
bd41e2b
fix tests
DyrellC May 19, 2022
8515d9d
fmt
DyrellC May 19, 2022
6d904c5
fmt in nightly
DyrellC May 19, 2022
c5c104c
remove unused functions
DyrellC May 20, 2022
cd30f54
Write test to expose `BranchStore` not removing all cursors in `Branc…
May 24, 2022
27fe5ca
remove fold and use for_each in cursor removal
DyrellC May 24, 2022
ea855fb
Merge branch 'v2.0/test-topics' of https://github.com/DyrellC/streams…
DyrellC May 24, 2022
2b00e82
Insert own base branch topic
DyrellC May 25, 2022
8aa1fa6
Make Topic String
DyrellC May 26, 2022
0e12d43
Merge branch 'v2.0-dev' of https://github.com/iotaledger/streams into…
DyrellC May 31, 2022
53cb461
Check size before resizing bytes
DyrellC May 31, 2022
05c7d84
fmt/clippy
DyrellC May 31, 2022
d257605
tests update
DyrellC May 31, 2022
3990b43
Merge branch 'v2.0/move-keys' of https://github.com/DyrellC/streams i…
DyrellC May 31, 2022
21afe44
fmt
DyrellC May 31, 2022
79bd3ea
fix cursor store test
DyrellC May 31, 2022
0e222b4
Merge branch 'v2.0-dev' of https://github.com/iotaledger/streams into…
DyrellC Jun 2, 2022
4c26b18
first round review changes
DyrellC Jun 9, 2022
b068917
round two review updates
DyrellC Jun 9, 2022
10f923f
update tests
DyrellC Jun 9, 2022
f14f065
Mac check for backup/restore, Address::gen takes topic by ref
DyrellC Jun 9, 2022
1194c8e
Update streams/src/api/cursor_store.rs
DyrellC Jun 9, 2022
ef728fd
Update cursor_store for anchor and latest link set/get
DyrellC Jun 9, 2022
d301431
fmt, clippy
DyrellC Jun 9, 2022
158b3f3
Merge branch 'v2.0/test-topics' of https://github.com/DyrellC/streams…
DyrellC Jun 9, 2022
705daad
typo
DyrellC Jun 9, 2022
e514d50
base_topic -> base_branch
DyrellC Jun 9, 2022
5b837d5
clean up user from review suggestions
DyrellC Jun 9, 2022
91c0b37
revert byte changes
DyrellC Jun 9, 2022
a5f0ae1
remove resize result
DyrellC Jun 9, 2022
d191de1
rename CursorStore and BranchStore
DyrellC Jun 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions lets/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use spongos::{
};

// Local
use crate::id::Identifier;
use crate::{id::Identifier, message::Topic};

/// Abstract representation of a Message Address
///
Expand Down Expand Up @@ -141,12 +141,17 @@ impl AppAddr {
Self(bytes)
}

pub fn gen(identifier: Identifier, app_idx: usize) -> AppAddr {
pub fn gen(identifier: Identifier, base_topic: Topic) -> AppAddr {
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
let mut addr = [0u8; 40];
let id_bytes = identifier.as_bytes();
// Create spongos to squeeze topic into final 8 bytes
let mut s = Spongos::<KeccakF1600>::init();
s.absorb(base_topic);
s.commit();
let squeezed_topic: [u8; 8] = s.squeeze();
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(id_bytes.len(), 32, "identifier must be 32 bytes long");
addr[..32].copy_from_slice(id_bytes);
addr[32..].copy_from_slice(&app_idx.to_be_bytes());
addr[32..].copy_from_slice(&squeezed_topic);
Self::new(addr)
}

Expand Down Expand Up @@ -229,10 +234,11 @@ impl MsgId {
Self(bytes)
}

pub fn gen(appaddr: AppAddr, identifier: Identifier, seq_num: usize) -> MsgId {
pub fn gen(appaddr: AppAddr, identifier: Identifier, topic: Topic, seq_num: usize) -> MsgId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic does not need to be taken by value, and it is not Copy now so it should be taken by ref

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not do this for Identifier as well despite the Copy? It's an extra allocation without it being necessary.

Copy link
Contributor

@arnauorriols arnauorriols Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as something is Copy, you can be certain it does not incur in a heap allocation (basically because that's a hard condition for types to be able to implement Copy). Generally speaking, Copy is a marker that says that "cloning" the type is essentially the same as moving the type (a memcpy). For this reason, Copy types can keep using the old variable, even after a move to another variable (aka the copy semantics). Types that don't implement Copy have a more complex cloning (for example, because they handle resources, like the pointer to some heap allocated memory that tneeds to be freed when dropping the value), therefore move != clone anymore.

For the types that don't implement Copy, it's important not to take ownership in function parameters if it's not needed, cause you are forcing a potentially costly clone unnecessarily. For Copy types, this clone is as cheap as a move, and the extra cost of the move vs reference, even if arguable, I'd say is under the ream of "profile before assuming anything" and definitely negligible in our case. So in this case ergonomics prevail.

TL;DR: performance is not a factor in this decision. But still passing Identifier by reference would be valid if you prefer it (for coherence, or any other reason)

let mut s = Spongos::<KeccakF1600>::init();
s.absorb(appaddr);
s.absorb(identifier);
s.absorb(topic);
s.absorb(seq_num.to_be_bytes());
s.commit();
s.squeeze()
Expand Down
17 changes: 13 additions & 4 deletions lets/src/message/hdf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use alloc::boxed::Box;

use anyhow::{anyhow, ensure, Result};
use async_trait::async_trait;

Expand All @@ -18,12 +17,13 @@ use crate::{
id::Identifier,
message::{
content::{ContentSizeof, ContentUnwrap, ContentWrap},
topic::Topic,
version::{HDF_ID, STREAMS_1_VER, UTF8},
},
};

#[non_exhaustive]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[allow(clippy::upper_case_acronyms)]
pub struct HDF {
encoding: u8,
Expand All @@ -38,6 +38,7 @@ pub struct HDF {
pub linked_msg_address: Option<MsgId>,
pub sequence: usize,
pub publisher: Identifier,
pub topic: Topic,
}

impl Default for HDF {
Expand All @@ -52,12 +53,13 @@ impl Default for HDF {
linked_msg_address: Default::default(),
sequence: 0,
publisher: Default::default(),
topic: Default::default(),
}
}
}

impl HDF {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier) -> Result<Self> {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: Topic) -> Result<Self> {
ensure!(
message_type >> 4 == 0,
anyhow!(
Expand All @@ -75,6 +77,7 @@ impl HDF {
linked_msg_address: None,
sequence,
publisher,
topic,
})
}

Expand Down Expand Up @@ -118,6 +121,10 @@ impl HDF {
pub fn linked_msg_address(&self) -> Option<MsgId> {
self.linked_msg_address
}

pub fn topic(&self) -> &Topic {
&self.topic
}
}

#[async_trait(?Send)]
Expand All @@ -132,6 +139,7 @@ impl ContentSizeof<HDF> for sizeof::Context {
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?;

Expand Down Expand Up @@ -168,6 +176,7 @@ where
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?;

Expand Down Expand Up @@ -217,6 +226,7 @@ where
anyhow!("first 2 bits of payload-frame-count are reserved"),
)?
.absorb(Maybe::new(&mut hdf.linked_msg_address))?
.mask(&mut hdf.topic)?
.mask(&mut hdf.publisher)?
.skip(&mut seq_num)?;

Expand All @@ -232,7 +242,6 @@ where
x[2] = payload_frame_count_bytes[1];
x[3] = payload_frame_count_bytes[2];
hdf.payload_frame_count = u32::from_be_bytes(x);

hdf.sequence = seq_num.inner();

Ok(self)
Expand Down
4 changes: 2 additions & 2 deletions lets/src/message/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{
transport::TransportMessage,
};

#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug)]
#[derive(Clone, PartialEq, Eq, Hash, Default, Debug)]
pub struct Message<Content> {
header: HDF,
payload: PCF<Content>,
Expand All @@ -41,7 +41,7 @@ impl<Payload> Message<Payload> {
}

pub fn header(&self) -> HDF {
self.header
self.header.clone()
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn payload(&self) -> &PCF<Payload> {
Expand Down
3 changes: 3 additions & 0 deletions lets/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod version;
/// Linked Message with header already parsed
mod preparsed;

pub mod topic;

mod message;

pub use content::{
Expand All @@ -22,4 +24,5 @@ pub use hdf::HDF;
pub use message::Message;
pub use pcf::PCF;
pub use preparsed::PreparsedMessage;
pub use topic::Topic;
pub use transport::TransportMessage;
2 changes: 1 addition & 1 deletion lets/src/message/preparsed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl<F> PreparsedMessage<F> {
}

pub fn header(&self) -> HDF {
self.header
self.header.clone()
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn transport_msg(&self) -> &TransportMessage {
Expand Down
81 changes: 81 additions & 0 deletions lets/src/message/topic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use alloc::{
string::{String, ToString},
vec::Vec,
};
use core::{convert::TryFrom, fmt::Formatter};
use spongos::{
ddml::{
commands::{sizeof, unwrap, wrap, Mask},
io,
types::Bytes,
},
PRP,
};

#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
pub struct Topic(String);

impl Topic {
pub fn new(t: String) -> Self {
Self(t)
}
}

impl From<&str> for Topic {
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
fn from(t: &str) -> Self {
Self(t.to_string())
}
}

impl TryFrom<&[u8]> for Topic {
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
type Error = anyhow::Error;
fn try_from(t: &[u8]) -> Result<Self, Self::Error> {
let topic = String::from_utf8(t.to_vec())?;
Ok(Topic(topic))
}
}

impl core::fmt::Display for Topic {
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(f, "{}", &self.0)
}
}

impl AsRef<[u8]> for Topic {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

impl AsMut<Vec<u8>> for Topic {
DyrellC marked this conversation as resolved.
Show resolved Hide resolved
fn as_mut(&mut self) -> &mut Vec<u8> {
unsafe { self.0.as_mut_vec() }
}
}
DyrellC marked this conversation as resolved.
Show resolved Hide resolved

impl Mask<&Topic> for sizeof::Context {
fn mask(&mut self, topic: &Topic) -> anyhow::Result<&mut Self> {
self.mask(Bytes::new(topic))
}
}

impl<OS, F> Mask<&Topic> for wrap::Context<OS, F>
where
F: PRP,
OS: io::OStream,
{
fn mask(&mut self, topic: &Topic) -> anyhow::Result<&mut Self> {
self.mask(Bytes::new(topic))
}
}

impl<IS, F> Mask<&mut Topic> for unwrap::Context<IS, F>
where
F: PRP,
IS: io::IStream,
{
fn mask(&mut self, topic: &mut Topic) -> anyhow::Result<&mut Self> {
self.mask(Bytes::new(topic.as_mut()))?;
Ok(self)
}
}
2 changes: 1 addition & 1 deletion spongos/src/ddml/commands/unwrap/absorb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<F: PRP, IS: io::IStream> Absorb<Bytes<&mut Vec<u8>>> for Context<IS, F> {
fn absorb(&mut self, mut bytes: Bytes<&mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.absorb(&mut size)?;
bytes.resize(size.inner());
bytes.resize(size.inner())?;
AbsorbContext::new(self).unwrapn(bytes)?;
Ok(self)
}
Expand Down
2 changes: 1 addition & 1 deletion spongos/src/ddml/commands/unwrap/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<'a, F: PRP, IS: io::IStream> Mask<Bytes<&'a mut Vec<u8>>> for Context<IS, F
fn mask(&mut self, mut bytes: Bytes<&'a mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.mask(&mut size)?;
bytes.resize(size.inner());
bytes.resize(size.inner())?;
MaskContext::new(self).unwrapn(bytes)?;
Ok(self)
}
Expand Down
2 changes: 1 addition & 1 deletion spongos/src/ddml/commands/unwrap/skip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<'a, F, IS: io::IStream> Skip<Bytes<&'a mut Vec<u8>>> for Context<IS, F> {
fn skip(&mut self, mut bytes: Bytes<&'a mut Vec<u8>>) -> Result<&mut Self> {
let mut size = Size::default();
self.skip(&mut size)?;
bytes.resize(size.inner());
bytes.resize(size.inner())?;
SkipContext::new(self).unwrapn(bytes)?;
Ok(self)
}
Expand Down
11 changes: 9 additions & 2 deletions spongos/src/ddml/types/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::{fmt, iter::FromIterator};

use alloc::{string::String, vec::Vec};
use anyhow::anyhow;

/// Variable-size array of bytes, the size is not known at compile time and is encoded in trinary
/// representation.
Expand Down Expand Up @@ -60,8 +61,14 @@ where
}

impl Bytes<&mut Vec<u8>> {
pub(crate) fn resize(&mut self, new_size: usize) {
self.0.resize(new_size, 0)
// Return an error if size exceeds u32 max size to avoid panic
// TODO: Remove anyhow error and replace with no_std compatible error handling
pub(crate) fn resize(&mut self, new_size: usize) -> anyhow::Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this limit coming from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When recovering a user we end up unmasking the stream of wrapped topics/cursors, but if you use the wrong password (as we do in the examples) it mixes up the unmasked Size value for number of topics, and the Size within the topic Byte unwrap gets messed up as well. This results in an attempt to create a Vec with a size that exceeds its max allocation availability (https://doc.rust-lang.org/stable/std/vec/struct.Vec.html#method.reserve), resulting in a panic that we have no way to catch.

One thing that threw me off was I tried to use isize::MAX and it still resulted in panics on occassion when the new size exceeded u32::MAX, so I made that the default max length. From a Streams perspective, we have no instance where Bytes should exceed 32Kb let alone u32::MAX so I thought that was an appropriate value to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that threw me off was I tried to use isize::MAX
Well, I can imagine having some problem allocating 9223 petabytes of memory 😅

You actually found an important attack vector, as the Size of of Bytes is an user input after all. I think u32::MAX is still too high, but at least it creates an initial barrier, and u16::MAX would be too low (I wouldn't assume we'll always have the 32KB limit in the transport).

There are actually 2 different cases: the attack vector case, where an actor tries to exploit the Bytes preallocation, and the incorrect password case that causes an unwrapping of an incorrect value of the Byte's Size. About the latter, It's a bad smell that something like this can happen at all, and does not speak very good about how the decoding is implemented. And the error that we are returning here has nothing to do with the true error. I suggest 2 improvements, besides this u32::MAX limit:

  • In ContentUnwrap<State>, we should add a MAC right after absorbing the password to validate it and stop the unwrap if the password is incorrect.
  • in unwrap::Absorb<Bytes<T>>, unwrap::Skip<Bytes<T>> and unwrap::Mask<Bytes<T>> we should check that the Size is smaller than the remaining length of the context buffer. This would be a very effective way to protect about attempts to exploit the Bytes preallocation without having to set an static maximum that is too low.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Mac addition is simple enough, but the size check on stream will require a new method in the IStream trait to check against the intended length. It'll just need to move the ensure! check inside the try_advance() function so that we can ensure the length doesn't exceed size without splitting the stream.

if new_size > u32::MAX as usize {
return Err(anyhow!("{} exceeds maximum available space for Byte resize", new_size));
}
self.0.resize(new_size, 0);
Ok(())
}
}

Expand Down
Loading