Skip to content

Commit

Permalink
Merge branch 'ZcashFoundation:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 authored Nov 20, 2023
2 parents c89b341 + 3be22b2 commit 8bd40f9
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 51 deletions.
4 changes: 4 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ pub const DEFAULT_MAX_CONNS_PER_IP: usize = 1;
/// This will be used as `Config.peerset_initial_target_size` if no valid value is provided.
pub const DEFAULT_PEERSET_INITIAL_TARGET_SIZE: usize = 25;

/// The maximum number of peers we will add to the address book after each `getaddr` request.
pub const PEER_ADDR_RESPONSE_LIMIT: usize =
DEFAULT_PEERSET_INITIAL_TARGET_SIZE * OUTBOUND_PEER_LIMIT_MULTIPLIER / 2;

/// The buffer size for the peer set.
///
/// This should be greater than 1 to avoid sender contention, but also reasonably
Expand Down
156 changes: 119 additions & 37 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::{
prelude::*,
stream::Stream,
};
use rand::{thread_rng, Rng};
use rand::{seq::SliceRandom, thread_rng, Rng};
use tokio::time::{sleep, Sleep};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
Expand All @@ -27,8 +27,8 @@ use zebra_chain::{

use crate::{
constants::{
self, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY,
OVERLOAD_PROTECTION_INTERVAL,
self, MAX_ADDRS_IN_MESSAGE, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY,
OVERLOAD_PROTECTION_INTERVAL, PEER_ADDR_RESPONSE_LIMIT,
},
meta_addr::MetaAddr,
peer::{
Expand Down Expand Up @@ -137,7 +137,14 @@ impl Handler {
/// interpretable as a response, we return ownership to the caller.
///
/// Unexpected messages are left unprocessed, and may be rejected later.
fn process_message(&mut self, msg: Message) -> Option<Message> {
///
/// `addr` responses are limited to avoid peer set takeover. Any excess
/// addresses are stored in `cached_addrs`.
fn process_message(
&mut self,
msg: Message,
cached_addrs: &mut Vec<MetaAddr>,
) -> Option<Message> {
let mut ignored_msg = None;
// TODO: can this be avoided?
let tmp_state = std::mem::replace(self, Handler::Finished(Ok(Response::Nil)));
Expand All @@ -152,7 +159,24 @@ impl Handler {
Handler::Ping(req_nonce)
}
}
(Handler::Peers, Message::Addr(addrs)) => Handler::Finished(Ok(Response::Peers(addrs))),

(Handler::Peers, Message::Addr(new_addrs)) => {
// Security: This method performs security-sensitive operations, see its comments
// for details.
let response_addrs =
Handler::update_addr_cache(cached_addrs, &new_addrs, PEER_ADDR_RESPONSE_LIMIT);

debug!(
new_addrs = new_addrs.len(),
response_addrs = response_addrs.len(),
remaining_addrs = cached_addrs.len(),
PEER_ADDR_RESPONSE_LIMIT,
"responding to Peers request using new and cached addresses",
);

Handler::Finished(Ok(Response::Peers(response_addrs)))
}

// `zcashd` returns requested transactions in a single batch of messages.
// Other transaction or non-transaction messages can come before or after the batch.
// After the transaction batch, `zcashd` sends `notfound` if any transactions are missing:
Expand Down Expand Up @@ -251,6 +275,7 @@ impl Handler {
)))
}
}

// `zcashd` returns requested blocks in a single batch of messages.
// Other blocks or non-blocks messages can come before or after the batch.
// `zcashd` silently skips missing blocks, rather than sending a final `notfound` message.
Expand Down Expand Up @@ -365,16 +390,18 @@ impl Handler {
block_hashes(&items[..]).collect(),
)))
}
(Handler::FindHeaders, Message::Headers(headers)) => {
Handler::Finished(Ok(Response::BlockHeaders(headers)))
}

(Handler::MempoolTransactionIds, Message::Inv(items))
if items.iter().all(|item| item.unmined_tx_id().is_some()) =>
{
Handler::Finished(Ok(Response::TransactionIds(
transaction_ids(&items).collect(),
)))
}
(Handler::FindHeaders, Message::Headers(headers)) => {
Handler::Finished(Ok(Response::BlockHeaders(headers)))
}

// By default, messages are not responses.
(state, msg) => {
trace!(?msg, "did not interpret message as response");
Expand All @@ -385,6 +412,52 @@ impl Handler {

ignored_msg
}

/// Adds `new_addrs` to the `cached_addrs` cache, then takes and returns `response_size`
/// addresses from that cache.
///
/// `cached_addrs` can be empty if the cache is empty. `new_addrs` can be empty or `None` if
/// there are no new addresses. `response_size` can be zero or `None` if there is no response
/// needed.
fn update_addr_cache<'new>(
cached_addrs: &mut Vec<MetaAddr>,
new_addrs: impl IntoIterator<Item = &'new MetaAddr>,
response_size: impl Into<Option<usize>>,
) -> Vec<MetaAddr> {
// # Peer Set Reliability
//
// Newly received peers are added to the cache, so that we can use them if the connection
// doesn't respond to our getaddr requests.
//
// Add the new addresses to the end of the cache.
cached_addrs.extend(new_addrs);

// # Security
//
// We limit how many peer addresses we take from each peer, so that our address book
// and outbound connections aren't controlled by a single peer (#1869). We randomly select
// peers, so the remote peer can't control which addresses we choose by changing the order
// in the messages they send.
let response_size = response_size.into().unwrap_or_default();

let mut temp_cache = Vec::new();
std::mem::swap(cached_addrs, &mut temp_cache);

// The response is fully shuffled, remaining is partially shuffled.
let (response, remaining) = temp_cache.partial_shuffle(&mut thread_rng(), response_size);

// # Security
//
// The cache size is limited to avoid memory denial of service.
//
// It's ok to just partially shuffle the cache, because it doesn't actually matter which
// peers we drop. Having excess peers is rare, because most peers only send one large
// unsolicited peer message when they first connect.
*cached_addrs = remaining.to_vec();
cached_addrs.truncate(MAX_ADDRS_IN_MESSAGE);

response.to_vec()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -780,7 +853,7 @@ where
let request_msg = match self.state {
State::AwaitingResponse {
ref mut handler, ..
} => span.in_scope(|| handler.process_message(peer_msg)),
} => span.in_scope(|| handler.process_message(peer_msg, &mut self.cached_addrs)),
_ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
self.state,
peer_msg,
Expand Down Expand Up @@ -929,16 +1002,21 @@ where
self.client_rx
),

// Consume the cached addresses from the peer,
// to work-around a `zcashd` response rate-limit.
// Take some cached addresses from the peer connection. This address cache helps
// work-around a `zcashd` addr response rate-limit.
(AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
let cached_addrs = std::mem::take(&mut self.cached_addrs);
// Security: This method performs security-sensitive operations, see its comments
// for details.
let response_addrs = Handler::update_addr_cache(&mut self.cached_addrs, None, PEER_ADDR_RESPONSE_LIMIT);

debug!(
addrs = cached_addrs.len(),
"responding to Peers request using cached addresses",
response_addrs = response_addrs.len(),
remaining_addrs = self.cached_addrs.len(),
PEER_ADDR_RESPONSE_LIMIT,
"responding to Peers request using some cached addresses",
);

Ok(Handler::Finished(Ok(Response::Peers(cached_addrs))))
Ok(Handler::Finished(Ok(Response::Peers(response_addrs))))
}
(AwaitingRequest, Peers) => self
.peer_tx
Expand Down Expand Up @@ -1145,28 +1223,32 @@ where
// Ignored, but consumed because it is technically a protocol error.
Consumed
}
// Zebra crawls the network proactively, to prevent
// peers from inserting data into our address book.
Message::Addr(ref addrs) => {
// Workaround `zcashd`'s `getaddr` response rate-limit
if addrs.len() > 1 {
// Always refresh the cache with multi-addr messages.
debug!(%msg, "caching unsolicited multi-addr message");
self.cached_addrs = addrs.clone();
Consumed
} else if addrs.len() == 1 && self.cached_addrs.len() <= 1 {
// Only refresh a cached single addr message with another single addr.
// (`zcashd` regularly advertises its own address.)
debug!(%msg, "caching unsolicited single addr message");
self.cached_addrs = addrs.clone();
Consumed
} else {
debug!(
%msg,
"ignoring unsolicited single addr message: already cached a multi-addr message"
);
Consumed
}

// # Security
//
// Zebra crawls the network proactively, and that's the only way peers get into our
// address book. This prevents peers from filling our address book with malicious peer
// addresses.
Message::Addr(ref new_addrs) => {
// # Peer Set Reliability
//
// We keep a list of the unused peer addresses sent by each connection, to work
// around `zcashd`'s `getaddr` response rate-limit.
let no_response =
Handler::update_addr_cache(&mut self.cached_addrs, new_addrs, None);
assert_eq!(
no_response,
Vec::new(),
"peers unexpectedly taken from cache"
);

debug!(
new_addrs = new_addrs.len(),
cached_addrs = self.cached_addrs.len(),
"adding unsolicited addresses to cached addresses",
);

Consumed
}
Message::Tx(ref transaction) => Request::PushTransaction(transaction.clone()).into(),
Message::Inv(ref items) => match &items[..] {
Expand Down
44 changes: 42 additions & 2 deletions zebra-network/src/peer/connection/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::{
sink::SinkMapErr,
SinkExt, StreamExt,
};
use proptest::prelude::*;
use proptest::{collection, prelude::*};
use tracing::Span;

use zebra_chain::{
Expand All @@ -18,7 +18,12 @@ use zebra_chain::{
use zebra_test::mock_service::{MockService, PropTestAssertion};

use crate::{
peer::{connection::Connection, ClientRequest, ErrorSlot},
constants::{MAX_ADDRS_IN_MESSAGE, PEER_ADDR_RESPONSE_LIMIT},
meta_addr::MetaAddr,
peer::{
connection::{Connection, Handler},
ClientRequest, ErrorSlot,
},
protocol::external::Message,
protocol::internal::InventoryResponse,
Request, Response, SharedPeerError,
Expand Down Expand Up @@ -129,6 +134,41 @@ proptest! {
Ok(())
})?;
}

/// This test makes sure that Zebra's per-connection peer cache is updated correctly.
#[test]
fn cache_is_updated_correctly(
mut cached_addrs in collection::vec(MetaAddr::gossiped_strategy(), 0..=MAX_ADDRS_IN_MESSAGE),
new_addrs in collection::vec(MetaAddr::gossiped_strategy(), 0..=MAX_ADDRS_IN_MESSAGE),
response_size in 0..=PEER_ADDR_RESPONSE_LIMIT,
) {
let _init_guard = zebra_test::init();

let old_cached_addrs = cached_addrs.clone();

let response = Handler::update_addr_cache(&mut cached_addrs, &new_addrs, response_size);

prop_assert!(cached_addrs.len() <= MAX_ADDRS_IN_MESSAGE, "cache has a limited size");
prop_assert!(response.len() <= response_size, "response has a limited size");

prop_assert!(response.len() <= old_cached_addrs.len() + new_addrs.len(), "no duplicate or made up addresses in response");
prop_assert!(cached_addrs.len() <= old_cached_addrs.len() + new_addrs.len(), "no duplicate or made up addresses in cache");

if old_cached_addrs.len() + new_addrs.len() >= response_size {
// If we deduplicate addresses, this check should fail and must be changed
prop_assert_eq!(response.len(), response_size, "response gets addresses before cache does");
} else {
prop_assert!(response.len() < response_size, "response gets all addresses if there is no excess");
}

if old_cached_addrs.len() + new_addrs.len() <= response_size {
prop_assert_eq!(cached_addrs.len(), 0, "cache is empty if there are no excess addresses");
} else {
// If we deduplicate addresses, this check should fail and must be changed
prop_assert_ne!(cached_addrs.len(), 0, "cache gets excess addresses");
}

}
}

/// Creates a new [`Connection`] instance for property tests.
Expand Down
18 changes: 6 additions & 12 deletions zebra-network/src/peer_set/candidate_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ mod tests;
/// β”‚β”‚ β–Ό β”‚β”‚
/// β”‚β”‚ Ξ› β”‚β”‚
/// β”‚β”‚ β•± β•² filter by β”‚β”‚
/// β”‚β”‚ β–• ▏ is_ready_for_connection_attempt β”‚β”‚
/// β”‚β”‚ β•² β•± to remove recent `Responded`, β”‚β”‚
/// β”‚β”‚ β–• ▏ is_ready_for_connection_attempt β”‚β”‚
/// β”‚β”‚ β•² β•± to remove recent `Responded`, β”‚β”‚
/// β”‚β”‚ V `AttemptPending`, and `Failed` peers β”‚β”‚
/// β”‚β”‚ β”‚ β”‚β”‚
/// β”‚β”‚ β”‚ try outbound connection, β”‚β”‚
Expand All @@ -105,7 +105,8 @@ mod tests;
/// β”‚ β”‚
/// β”‚ β–Ό
/// β”‚β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
/// β”‚β”‚ every time we receive a peer message: β”‚
/// β”‚β”‚ when connection succeeds, and every β”‚
/// β”‚β”‚ time we receive a peer heartbeat: β”‚
/// β””β”‚ * update state to `Responded` β”‚
/// β”‚ * update last_response to now() β”‚
/// β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Expand All @@ -120,11 +121,6 @@ mod tests;
// TODO:
// * show all possible transitions between Attempt/Responded/Failed,
// except Failed -> Responded is invalid, must go through Attempt
// * for now, seed peers go straight to handshaking and responded,
// but we'll fix that once we add the Seed state
// When we add the Seed state:
// * show that seed peers that transition to other never attempted
// states are already in the address book
pub(crate) struct CandidateSet<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Send,
Expand Down Expand Up @@ -447,10 +443,8 @@ fn validate_addrs(
// TODO:
// We should eventually implement these checks in this function:
// - Zebra should ignore peers that are older than 3 weeks (part of #1865)
// - Zebra should count back 3 weeks from the newest peer timestamp sent
// by the other peer, to compensate for clock skew
// - Zebra should limit the number of addresses it uses from a single Addrs
// response (#1869)
// - Zebra should count back 3 weeks from the newest peer timestamp sent
// by the other peer, to compensate for clock skew

let mut addrs: Vec<_> = addrs.into_iter().collect();

Expand Down

0 comments on commit 8bd40f9

Please sign in to comment.