Skip to content

Commit

Permalink
Feed request records to topology manager
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Nov 6, 2023
1 parent e28c6df commit 007ceab
Show file tree
Hide file tree
Showing 22 changed files with 246 additions and 167 deletions.
6 changes: 3 additions & 3 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl Executor<Runtime> {
let key = contract.key();
let params = contract.params();

if self.get_local_contract(&key.id()).await.is_ok() {
if self.get_local_contract(key.id()).await.is_ok() {
// already existing contract, just try to merge states
return self
.perform_contract_update(key, UpdateData::State(state.into()))
Expand Down Expand Up @@ -963,7 +963,7 @@ impl Executor<Runtime> {
{
if self.mode == OperationMode::Local {
return Err(ExecutorError::request(RequestError::ContractError(
CoreContractError::MissingRelated { key: key.id() },
CoreContractError::MissingRelated { key: *key.id() },
)));
}
}
Expand Down Expand Up @@ -1109,7 +1109,7 @@ impl Executor<Runtime> {
}
if iterations == DEPENDENCY_CYCLE_LIMIT_GUARD {
return Err(ExecutorError::request(CoreContractError::MissingRelated {
key: original_key.id(),
key: *original_key.id(),
}));
}
Ok(())
Expand Down
11 changes: 7 additions & 4 deletions crates/core/src/contract/storages/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ impl StateStorage for RocksDb {

async fn store(&mut self, key: ContractKey, state: WrappedState) -> Result<(), Self::Error> {
self.0
.put([key.bytes(), RocksDb::STATE_SUFFIX].concat(), state)?;
.put([key.as_bytes(), RocksDb::STATE_SUFFIX].concat(), state)?;

Ok(())
}

async fn get(&self, key: &ContractKey) -> Result<Option<WrappedState>, Self::Error> {
match self.0.get([key.bytes(), RocksDb::STATE_SUFFIX].concat()) {
match self.0.get([key.as_bytes(), RocksDb::STATE_SUFFIX].concat()) {
Ok(result) => {
if let Some(r) = result.map(|r| Some(WrappedState::new(r))) {
Ok(r)
Expand Down Expand Up @@ -69,7 +69,7 @@ impl StateStorage for RocksDb {
params: Parameters<'static>,
) -> Result<(), Self::Error> {
self.0
.put([key.bytes(), RocksDb::PARAMS_SUFFIX].concat(), params)?;
.put([key.as_bytes(), RocksDb::PARAMS_SUFFIX].concat(), params)?;

Ok(())
}
Expand All @@ -78,7 +78,10 @@ impl StateStorage for RocksDb {
&'a self,
key: &'a ContractKey,
) -> Result<Option<Parameters<'static>>, Self::Error> {
match self.0.get([key.bytes(), RocksDb::PARAMS_SUFFIX].concat()) {
match self
.0
.get([key.as_bytes(), RocksDb::PARAMS_SUFFIX].concat())
{
Ok(result) => Ok(result
.map(|r| Some(Parameters::from(r)))
.expect("vec bytes")),
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/contract/storages/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl StateStorage for Pool {
ON CONFLICT(contract) DO UPDATE SET state = excluded.state
",
)
.bind(key.bytes())
.bind(key.as_bytes())
.bind(state.as_ref())
.execute(&self.0)
.await?;
Expand All @@ -75,7 +75,7 @@ impl StateStorage for Pool {

async fn get(&self, key: &ContractKey) -> Result<Option<WrappedState>, Self::Error> {
match sqlx::query("SELECT state FROM states WHERE contract = ?")
.bind(key.bytes())
.bind(key.as_bytes())
.map(|row: SqliteRow| Some(WrappedState::new(row.get("state"))))
.fetch_one(&self.0)
.await
Expand All @@ -97,7 +97,7 @@ impl StateStorage for Pool {
ON CONFLICT(contract) DO UPDATE SET params = excluded.params
",
)
.bind(key.bytes())
.bind(key.as_bytes())
.bind(params.as_ref())
.execute(&self.0)
.await?;
Expand All @@ -109,7 +109,7 @@ impl StateStorage for Pool {
key: &'a ContractKey,
) -> Result<Option<Parameters<'static>>, Self::Error> {
match sqlx::query("SELECT params FROM states WHERE contract = ?")
.bind(key.bytes())
.bind(key.as_bytes())
.map(|row: SqliteRow| Some(Parameters::from(row.get::<Vec<u8>, _>("params"))))
.fetch_one(&self.0)
.await
Expand Down
22 changes: 18 additions & 4 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
operations::{
connect::ConnectMsg, get::GetMsg, put::PutMsg, subscribe::SubscribeMsg, update::UpdateMsg,
},
ring::PeerKeyLocation,
ring::{Location, PeerKeyLocation},
};
pub(crate) use sealed_msg_type::{TransactionType, TransactionTypeId};

Expand Down Expand Up @@ -43,7 +43,7 @@ impl Transaction {
// Self { id }
}

pub fn tx_type(&self) -> TransactionType {
pub fn transaction_type(&self) -> TransactionType {
let id_byte = (self.id.0 & 0xFFu128) as u8;
match id_byte {
0 => TransactionType::Connect,
Expand Down Expand Up @@ -222,6 +222,8 @@ pub(crate) trait InnerMessage: Into<Message> {
fn target(&self) -> Option<&PeerKeyLocation>;

fn terminal(&self) -> bool;

fn requested_location(&self) -> Option<Location>;
}

/// Internal node events emitted to the event loop.
Expand Down Expand Up @@ -294,6 +296,18 @@ impl Message {
}
}

pub fn requested_location(&self) -> Option<Location> {
use Message::*;
match self {
Connect(op) => op.requested_location(),
Put(op) => op.requested_location(),
Get(op) => op.requested_location(),
Subscribe(op) => op.requested_location(),
Update(op) => op.requested_location(),
Aborted(_) => None,
}
}

pub fn track_stats(&self) -> bool {
use Message::*;
!matches!(self, Connect(_) | Subscribe(_) | Aborted(_))
Expand Down Expand Up @@ -325,9 +339,9 @@ mod tests {
let ts_0 = Ulid::new();
std::thread::sleep(Duration::from_millis(1));
let tx = Transaction::update(TransactionType::Connect, Ulid::new());
assert_eq!(tx.tx_type(), TransactionType::Connect);
assert_eq!(tx.transaction_type(), TransactionType::Connect);
let tx = Transaction::update(TransactionType::Subscribe, Ulid::new());
assert_eq!(tx.tx_type(), TransactionType::Subscribe);
assert_eq!(tx.transaction_type(), TransactionType::Subscribe);
std::thread::sleep(Duration::from_millis(1));
let ts_1 = Ulid::new();
assert!(
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ where
tracing::warn!("Performing a new join, attempt {}", backoff.retries() + 1);
if backoff.sleep().await.is_none() {
tracing::error!("Max number of retries reached");
return Err(OpError::MaxRetriesExceeded(tx_id, tx_id.tx_type()));
return Err(OpError::MaxRetriesExceeded(tx_id, tx_id.transaction_type()));
}
op.backoff = Some(backoff);
}
Expand Down Expand Up @@ -696,7 +696,7 @@ async fn handle_cancelled_op<CM>(
where
CM: NetworkBridge + Send + Sync,
{
if let TransactionType::Connect = tx.tx_type() {
if let TransactionType::Connect = tx.transaction_type() {
// the attempt to join the network failed, this could be a fatal error since the node
// is useless without connecting to the network, we will retry with exponential backoff
match op_storage.pop(&tx) {
Expand All @@ -712,7 +712,7 @@ where
}
}
Ok(Some(OpEnum::Connect(_))) => {
return Err(OpError::MaxRetriesExceeded(tx, tx.tx_type()));
return Err(OpError::MaxRetriesExceeded(tx, tx.transaction_type()));
}
_ => {}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/node/in_memory_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl NodeInMemory {
};

if let Ok(Either::Left(Message::Aborted(tx))) = msg {
let tx_type = tx.tx_type();
let tx_type = tx.transaction_type();
let res = handle_cancelled_op(
tx,
self.peer_key,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/node/network_bridge/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl NetworkBridge for MemoryConnManager {
.expect("unique lock")
.register_events(EventLog::from_outbound_msg(&msg, &self.op_manager))
.await;
self.op_manager.sending_transaction(target, msg.id());
self.op_manager.sending_transaction(target, &msg);
let msg = bincode::serialize(&msg)?;
self.transport.send(*target, msg);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl NetworkBridge for P2pBridge {
.try_lock()
.expect("single reference")
.register_events(EventLog::from_outbound_msg(&msg, &self.op_manager));
self.op_manager.sending_transaction(target, msg.id());
self.op_manager.sending_transaction(target, &msg);
self.ev_listener_tx
.send(Left((*target, Box::new(msg))))
.await
Expand Down Expand Up @@ -431,7 +431,7 @@ impl P2pConnManager {
let cb = self.bridge.clone();
match msg {
Message::Aborted(tx) => {
let tx_type = tx.tx_type();
let tx_type = tx.transaction_type();
let res = handle_cancelled_op(
tx,
op_manager.ring.peer_key,
Expand Down
23 changes: 14 additions & 9 deletions crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,27 @@ impl OpManager {
match op {
OpEnum::Connect(op) => {
#[cfg(debug_assertions)]
check_id_op!(id.tx_type(), TransactionType::Connect);
check_id_op!(id.transaction_type(), TransactionType::Connect);
self.ops.connect.insert(id, *op);
}
OpEnum::Put(op) => {
#[cfg(debug_assertions)]
check_id_op!(id.tx_type(), TransactionType::Put);
check_id_op!(id.transaction_type(), TransactionType::Put);
self.ops.put.insert(id, op);
}
OpEnum::Get(op) => {
#[cfg(debug_assertions)]
check_id_op!(id.tx_type(), TransactionType::Get);
check_id_op!(id.transaction_type(), TransactionType::Get);
self.ops.get.insert(id, op);
}
OpEnum::Subscribe(op) => {
#[cfg(debug_assertions)]
check_id_op!(id.tx_type(), TransactionType::Subscribe);
check_id_op!(id.transaction_type(), TransactionType::Subscribe);
self.ops.subscribe.insert(id, op);
}
OpEnum::Update(op) => {
#[cfg(debug_assertions)]
check_id_op!(id.tx_type(), TransactionType::Update);
check_id_op!(id.transaction_type(), TransactionType::Update);
self.ops.update.insert(id, op);
}
}
Expand All @@ -169,7 +169,7 @@ impl OpManager {
}
return Err(OpNotAvailable::Running);
}
let op = match id.tx_type() {
let op = match id.transaction_type() {
TransactionType::Connect => self
.ops
.connect
Expand Down Expand Up @@ -201,7 +201,12 @@ impl OpManager {
}

/// Notify the operation manager that a transaction is being transacted over the network.
pub fn sending_transaction(&self, peer: &PeerKey, transaction: &Transaction) {
pub fn sending_transaction(&self, peer: &PeerKey, msg: &Message) {
let transaction = msg.id();
if let Some(loc) = msg.requested_location() {
self.ring
.record_request(loc, transaction.transaction_type());
}
self.ring
.live_tx_tracker
.add_transaction(*peer, *transaction);
Expand All @@ -227,7 +232,7 @@ async fn garbage_cleanup_task(
if ops.completed.remove(&tx).is_some() {
continue;
}
let still_waiting = match tx.tx_type() {
let still_waiting = match tx.transaction_type() {
TransactionType::Connect => ops.connect.remove(&tx).is_none(),
TransactionType::Put => ops.put.remove(&tx).is_none(),
TransactionType::Get => ops.get.remove(&tx).is_none(),
Expand Down Expand Up @@ -256,7 +261,7 @@ async fn garbage_cleanup_task(
if ops.completed.remove(&tx).is_some() {
continue;
}
let removed = match tx.tx_type() {
let removed = match tx.transaction_type() {
TransactionType::Connect => ops.connect.remove(&tx).is_some(),
TransactionType::Put => ops.put.remove(&tx).is_some(),
TransactionType::Get => ops.get.remove(&tx).is_some(),
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,10 @@ mod messages {
} | Connected { .. }
)
}

fn requested_location(&self) -> Option<Location> {
self.target().and_then(|pkloc| pkloc.location)
}
}

impl ConnectMsg {
Expand Down
13 changes: 12 additions & 1 deletion crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,10 @@ impl Operation for GetOp {
"Failed getting a value for contract {}, reached max retries",
key
);
return Err(OpError::MaxRetriesExceeded(*id, id.tx_type()));
return Err(OpError::MaxRetriesExceeded(
*id,
id.transaction_type(),
));
}
}
Some(GetState::ReceivedRequest) => {
Expand Down Expand Up @@ -809,6 +812,14 @@ mod messages {
use GetMsg::*;
matches!(self, ReturnGet { .. })
}

fn requested_location(&self) -> Option<Location> {
match self {
GetMsg::RequestGet { key, .. } => Some(Location::from(key.id())),
GetMsg::SeekNode { key, .. } => Some(Location::from(key.id())),
GetMsg::ReturnGet { key, .. } => Some(Location::from(key.id())),
}
}
}

impl GetMsg {
Expand Down
19 changes: 11 additions & 8 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,12 +806,6 @@ mod messages {

#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum PutMsg {
/// Initialize the put operation by routing the value
RouteValue {
id: Transaction,
htl: usize,
target: PeerKeyLocation,
},
/// Internal node instruction to find a route to the target node.
RequestPut {
id: Transaction,
Expand Down Expand Up @@ -876,7 +870,6 @@ mod messages {
fn id(&self) -> &Transaction {
match self {
Self::SeekNode { id, .. } => id,
Self::RouteValue { id, .. } => id,
Self::RequestPut { id, .. } => id,
Self::Broadcasting { id, .. } => id,
Self::SuccessfulUpdate { id, .. } => id,
Expand All @@ -901,6 +894,17 @@ mod messages {
SuccessfulUpdate { .. } | SeekNode { .. } | PutForward { .. }
)
}

fn requested_location(&self) -> Option<Location> {
match self {
Self::SeekNode { contract, .. } => Some(Location::from(contract.id())),
Self::RequestPut { contract, .. } => Some(Location::from(contract.id())),
Self::Broadcasting { key, .. } => Some(Location::from(key.id())),
Self::PutForward { contract, .. } => Some(Location::from(contract.id())),
Self::BroadcastTo { key, .. } => Some(Location::from(key.id())),
_ => None,
}
}
}

impl PutMsg {
Expand All @@ -918,7 +922,6 @@ mod messages {
let id = self.id();
match self {
Self::SeekNode { .. } => write!(f, "SeekNode(id: {id})"),
Self::RouteValue { .. } => write!(f, "RouteValue(id: {id})"),
Self::RequestPut { .. } => write!(f, "RequestPut(id: {id})"),
Self::Broadcasting { .. } => write!(f, "Broadcasting(id: {id})"),
Self::SuccessfulUpdate { .. } => write!(f, "SusscessfulUpdate(id: {id})"),
Expand Down
Loading

0 comments on commit 007ceab

Please sign in to comment.