Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement resource discovery by "Flood with Random Walk" algorithm #1711

Merged
merged 26 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0d0cb67
node-data: Extend definition of GetData struct
May 7, 2024
fb00a9f
node: Impl network::flood_request method
May 7, 2024
46a0a63
node-data: Use absolute instead of relative ttl
May 7, 2024
fb6f9af
node: Handle new fields of GetData message
May 7, 2024
82e4dc1
node: Apply flood_request usage in chain
May 7, 2024
7cb107b
node-data: Rename GetData to GetResource to be aligned with spec
May 8, 2024
1713bfc
node: Rename GetData to GetResource to be aligned with spec
May 8, 2024
164f631
node: Return an error to instruct the caller to rebroadcast the request
May 10, 2024
2fdf353
node-data: Support GetResource request for a candidate block from hash
May 13, 2024
3f96596
node: Handle InvType::CandidateFromHash resource type
May 13, 2024
b1a41dd
node: Request candidate if block is not found
May 13, 2024
33652d8
node-data: Add hops_limit in GetRequest
May 13, 2024
d550606
node: Handle GetResource::hops_limit field
May 13, 2024
cbc9c98
node: Insert and use certificates from Certificates cache
May 14, 2024
6b0c61f
node: Ensure GetResource msg is not expired before processing it
May 14, 2024
d8461a8
node: Update flood_request doc comment
May 14, 2024
bc7de24
Merge branch 'master' into fix-1528
goshawk-3 May 14, 2024
0ee3196
node: Move certificates HashMap from Chain to FSM components
May 15, 2024
caed8ea
node: Remove expired certificates
May 15, 2024
e977c6b
node-data: Print in hash fields in hex format
May 16, 2024
447df74
node: Pass requester addr in GetResource on handling GetInv
May 16, 2024
fec2369
node: Address issues and PR comments
May 21, 2024
e29ce2e
node: Resend a flood request to a randomly selected alive peer
May 21, 2024
8370805
node: Send a flood request to 8 randomly selected peers
May 21, 2024
88c3197
node-data: Update comment
May 21, 2024
7946a36
node: Use proper hops_limit config
May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 194 additions & 19 deletions node-data/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Serializable for Message {
Payload::GetMempool(p) => p.write(w),
Payload::GetInv(p) => p.write(w),
Payload::GetBlocks(p) => p.write(w),
Payload::GetData(p) => p.write(w),
Payload::GetResource(p) => p.write(w),
Payload::Ratification(p) => p.write(w),
Payload::Empty | Payload::ValidationResult(_) => Ok(()), /* internal message, not sent on the wire */
}
Expand Down Expand Up @@ -138,8 +138,8 @@ impl Serializable for Message {
Topics::GetCandidate => {
Message::new_get_candidate(payload::GetCandidate::read(r)?)
}
Topics::GetData => {
Message::new_get_data(payload::GetData::read(r)?)
Topics::GetResource => {
Message::new_get_resource(payload::GetResource::read(r)?)
}
Topics::GetBlocks => {
Message::new_get_blocks(payload::GetBlocks::read(r)?)
Expand Down Expand Up @@ -237,11 +237,11 @@ impl Message {
}
}

/// Creates topics.GetData message
pub fn new_get_data(p: payload::GetData) -> Message {
/// Creates topics.GetResource message
pub fn new_get_resource(p: payload::GetResource) -> Message {
Self {
topic: Topics::GetData,
payload: Payload::GetData(p),
topic: Topics::GetResource,
payload: Payload::GetResource(p),
..Default::default()
}
}
Expand Down Expand Up @@ -372,7 +372,7 @@ pub enum Payload {
GetMempool(payload::GetMempool),
GetInv(payload::Inv),
GetBlocks(payload::GetBlocks),
GetData(payload::GetData),
GetResource(payload::GetResource),
CandidateResp(Box<payload::GetCandidateResp>),

// Internal messages payload
Expand All @@ -388,6 +388,10 @@ pub mod payload {
use crate::Serializable;
use std::fmt;
use std::io::{self, Read, Write};
use std::net::{
Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6,
};
use std::time::{SystemTime, UNIX_EPOCH};

use super::{ConsensusHeader, SignInfo};

Expand Down Expand Up @@ -744,13 +748,18 @@ pub mod payload {

#[derive(Clone, Default, Debug, Copy)]
pub enum InvType {
/// A transaction fetched by tx_id
MempoolTx,
#[default]
/// A full block fetched by block hash
BlockFromHash,
/// A full block fetched by block height
BlockFromHeight,
/// A candidate block fetched by block hash, Cert is None
CandidateFromHash,
}

#[derive(Debug, Clone, Copy)]
#[derive(Clone, Copy)]
pub enum InvParam {
Hash([u8; 32]),
Height(u64),
Expand All @@ -762,6 +771,15 @@ pub mod payload {
}
}

impl fmt::Debug for InvParam {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Hash(hash) => write!(f, "Hash: {}", to_str(hash)),
Self::Height(height) => write!(f, "Height: {}", height),
}
}
}

#[derive(Default, Debug, Clone, Copy)]
pub struct InvVect {
pub inv_type: InvType,
Expand All @@ -771,9 +789,17 @@ pub mod payload {
#[derive(Default, Debug, Clone)]
pub struct Inv {
pub inv_list: Vec<InvVect>,
pub max_entries: u16,
}

impl Inv {
pub fn new(max_entries: u16) -> Self {
Self {
inv_list: Default::default(),
max_entries,
}
}

pub fn add_tx_id(&mut self, id: [u8; 32]) {
self.inv_list.push(InvVect {
inv_type: InvType::MempoolTx,
Expand All @@ -794,6 +820,13 @@ pub mod payload {
param: InvParam::Height(height),
});
}

pub fn add_candidate_from_hash(&mut self, hash: [u8; 32]) {
self.inv_list.push(InvVect {
inv_type: InvType::CandidateFromHash,
param: InvParam::Hash(hash),
});
}
}

impl Serializable for Inv {
Expand All @@ -812,6 +845,7 @@ pub mod payload {
};
}

w.write_all(&self.max_entries.to_le_bytes())?;
Ok(())
}

Expand All @@ -829,6 +863,7 @@ pub mod payload {
0 => InvType::MempoolTx,
1 => InvType::BlockFromHash,
2 => InvType::BlockFromHeight,
3 => InvType::CandidateFromHash,
_ => {
return Err(io::Error::from(io::ErrorKind::InvalidData))
}
Expand All @@ -846,18 +881,28 @@ pub mod payload {
InvType::BlockFromHeight => {
inv.add_block_from_height(Self::read_u64_le(r)?);
}
InvType::CandidateFromHash => {
inv.add_candidate_from_hash(Self::read_bytes(r)?);
}
}
}

inv.max_entries = Self::read_u16_le(r)?;
Ok(inv)
}
}

#[derive(Debug, Clone, Default)]
#[derive(Clone, Default)]
pub struct GetBlocks {
pub locator: [u8; 32],
}

impl fmt::Debug for GetBlocks {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "GetBlocks, locator: {}", to_str(&self.locator))
}
}

impl Serializable for GetBlocks {
fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
w.write_all(&self.locator[..])
Expand All @@ -872,25 +917,155 @@ pub mod payload {
}
}

#[derive(Default, Debug, Clone)]
pub struct GetData {
pub inner: Inv,
#[derive(Debug, Clone)]
pub struct GetResource {
/// Inventory/Resource to search for
inventory: Inv,

/// (requester) Address to which the resource is sent back, if found
requester_addr: SocketAddr,

/// Limits request lifespan by absolute (epoch) time
ttl_as_sec: u64,
goshawk-3 marked this conversation as resolved.
Show resolved Hide resolved

/// Limits request lifespan by number of hops
hops_limit: u16,
}

impl GetResource {
pub fn new(
inventory: Inv,
requester_addr: SocketAddr,
ttl_as_sec: u64,
hops_limit: u16,
) -> Self {
Self {
inventory,
requester_addr,
ttl_as_sec,
hops_limit,
}
}

pub fn clone_with_hop_decrement(&self) -> Option<Self> {
if self.hops_limit == 1 {
return None;
}
let mut req = self.clone();
req.hops_limit -= 1;
Some(req)
}

pub fn get_addr(&self) -> SocketAddr {
self.requester_addr
}

pub fn get_inv(&self) -> &Inv {
&self.inventory
}

pub fn is_expired(&self) -> bool {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
> self.ttl_as_sec
}
}

impl Serializable for GetData {
impl Serializable for GetResource {
fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
self.inner.write(w)
self.inventory.write(w)?;
self.requester_addr.write(w)?;
w.write_all(&self.ttl_as_sec.to_le_bytes()[..])?;
w.write_all(&self.hops_limit.to_le_bytes()[..])
}

fn read<R: Read>(r: &mut R) -> io::Result<Self>
where
Self: Sized,
{
Ok(GetData {
inner: Inv::read(r)?,
let inner = Inv::read(r)?;
let requester_addr = SocketAddr::read(r)?;

let mut buf = [0u8; 8];
r.read_exact(&mut buf)?;
let ttl_as_sec = u64::from_le_bytes(buf);

let mut buf = [0u8; 2];
r.read_exact(&mut buf)?;
let hops_limit = u16::from_le_bytes(buf);

Ok(GetResource {
inventory: inner,
requester_addr,
ttl_as_sec,
hops_limit,
})
}
}

impl Serializable for SocketAddr {
fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
match self {
SocketAddr::V4(addr_v4) => {
w.write_all(&[4])?;
w.write_all(&addr_v4.ip().octets())?;
w.write_all(&addr_v4.port().to_le_bytes())?;
}
SocketAddr::V6(addr_v6) => {
w.write_all(&[6])?;
w.write_all(&addr_v6.ip().octets())?;
w.write_all(&addr_v6.port().to_le_bytes())?;
}
}
Ok(())
}

fn read<R: Read>(r: &mut R) -> io::Result<Self>
where
Self: Sized,
{
let mut ip_type = [0u8; 1];
r.read_exact(&mut ip_type)?;

let ip = match ip_type[0] {
4 => {
let mut octets = [0u8; 4];
r.read_exact(&mut octets)?;

let mut port_bytes = [0u8; 2];
r.read_exact(&mut port_bytes)?;

SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::from(octets),
u16::from_le_bytes(port_bytes),
))
}
6 => {
let mut octets = [0u8; 16];
r.read_exact(&mut octets)?;

let mut port_bytes = [0u8; 2];
r.read_exact(&mut port_bytes)?;

SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::from(octets),
u16::from_le_bytes(port_bytes),
0,
0,
))
}
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid IP type",
))
}
};
Ok(ip)
}
}
}

macro_rules! map_topic {
Expand All @@ -905,7 +1080,7 @@ macro_rules! map_topic {
#[cfg_attr(any(feature = "faker", test), derive(fake::Dummy))]
pub enum Topics {
// Data exchange topics.
GetData = 8,
GetResource = 8,
GetBlocks = 9,
GetMempool = 13, // NB: This is aliased as Mempool in the golang impl
GetInv = 14, // NB: This is aliased as Inv in the golang impl
Expand Down Expand Up @@ -942,7 +1117,7 @@ impl Topics {

impl From<u8> for Topics {
fn from(v: u8) -> Self {
map_topic!(v, Topics::GetData);
map_topic!(v, Topics::GetResource);
map_topic!(v, Topics::GetBlocks);
map_topic!(v, Topics::Tx);
map_topic!(v, Topics::Block);
Expand Down
2 changes: 2 additions & 0 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,13 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
Payload::Candidate(_)
| Payload::Validation(_)
| Payload::Ratification(_) => {
let acc = self.acceptor.as_ref().expect("initialize is called");
if let Err(e) = acc.read().await.reroute_msg(msg).await {
warn!("msg discarded: {e}");
}
},
Payload::Quorum(payload) => {
let acc = self.acceptor.as_ref().expect("initialize is called");
if let Err(e) = acc.read().await.reroute_msg(msg.clone()).await {
warn!("msg discarded: {e}");
}
Expand Down
4 changes: 1 addition & 3 deletions node/src/chain/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ impl<DB: database::DB, N: Network> dusk_consensus::commons::Database
{
fn store_candidate_block(&mut self, b: Block) {
tracing::trace!("store candidate block: {:?}", b);

match self.db.try_read() {
Ok(db) => {
if let Err(e) = db.update(|t| t.store_candidate_block(b)) {
Expand All @@ -219,7 +218,6 @@ impl<DB: database::DB, N: Network> dusk_consensus::commons::Database
) -> anyhow::Result<Block> {
// Make an attempt to fetch the candidate block from local storage
let res = self.db.read().await.view(|t| t.fetch_candidate_block(h))?;

if let Some(b) = res {
return Ok(b);
}
Expand All @@ -229,7 +227,7 @@ impl<DB: database::DB, N: Network> dusk_consensus::commons::Database

// For redundancy reasons, we send the GetCandidate request to multiple
// network peers
let request = Message::new_get_candidate(GetCandidate { hash: *h });
let request = Message::new_get_candidate(GetCandidate { hash: *h }); // TODO: Use GetResource
let res = self
.network
.write()
Expand Down
Loading
Loading