Skip to content

Commit

Permalink
feat: adding subscribe and get op to network monitor (#1372)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexisbatyk authored Jan 14, 2025
1 parent d09a880 commit d4804dc
Show file tree
Hide file tree
Showing 35 changed files with 5,085 additions and 1,504 deletions.
101 changes: 100 additions & 1 deletion crates/core/src/generated/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub enum ContractChange<'a> {
PutFailure(topology::PutFailure<'a>),
BroadcastEmitted(topology::BroadcastEmitted<'a>),
BroadcastReceived(topology::BroadcastReceived<'a>),
GetContract(topology::GetContract<'a>),
SubscribedToContract(topology::SubscribedToContract<'a>),
}

// TODO: Change this to EventWrapper
Expand Down Expand Up @@ -109,7 +111,7 @@ impl ContractChange<'_> {
contract: impl AsRef<str>,
requester: impl AsRef<str>,
target: impl AsRef<str>,
_timestamp: u64,
timestamp: u64,
) -> Vec<u8> {
let mut buf = flatbuffers::FlatBufferBuilder::new();
let transaction = buf.create_string(transaction.as_ref());
Expand All @@ -123,6 +125,7 @@ impl ContractChange<'_> {
key: Some(contract),
requester: Some(requester),
target: Some(target),
timestamp,
},
);
let msg = topology::ContractChange::create(
Expand Down Expand Up @@ -221,6 +224,82 @@ impl ContractChange<'_> {
buf.finish_minimal(msg);
buf.finished_data().to_vec()
}

pub fn get_contract_msg(
requester: impl AsRef<str>,
target: impl AsRef<str>,
transaction: impl AsRef<str>,
contract_key: impl AsRef<str>,
contract_location: f64,
timestamp: u64,
) -> Vec<u8> {
let mut buf = flatbuffers::FlatBufferBuilder::new();
let requester = buf.create_string(requester.as_ref());
let target = buf.create_string(target.as_ref());
let transaction = buf.create_string(transaction.as_ref());
let contract_key = buf.create_string(contract_key.as_ref());
let get_contract = topology::GetContract::create(
&mut buf,
&topology::GetContractArgs {
requester: Some(requester),
target: Some(target),
transaction: Some(transaction),
key: Some(contract_key),
contract_location,
timestamp,
},
);

let msg = topology::ContractChange::create(
&mut buf,
&topology::ContractChangeArgs {
contract_id: Some(contract_key),
change_type: topology::ContractChangeType::GetContract,
change: Some(get_contract.as_union_value()),
},
);
buf.finish_minimal(msg);
buf.finished_data().to_vec()
}

pub fn subscribed_msg(
requester: impl AsRef<str>,
transaction: impl AsRef<str>,
contract_key: impl AsRef<str>,
contract_location: f64,
at_peer: impl AsRef<str>,
at_peer_location: f64,
timestamp: u64,
) -> Vec<u8> {
let mut buf = flatbuffers::FlatBufferBuilder::new();
let requester = buf.create_string(requester.as_ref());
let transaction = buf.create_string(transaction.as_ref());
let contract_key = buf.create_string(contract_key.as_ref());
let at_peer = buf.create_string(at_peer.as_ref());
let subscribed = topology::SubscribedToContract::create(
&mut buf,
&topology::SubscribedToContractArgs {
requester: Some(requester),
transaction: Some(transaction),
key: Some(contract_key),
contract_location,
at_peer: Some(at_peer),
at_peer_location,
timestamp,
},
);

let msg = topology::ContractChange::create(
&mut buf,
&topology::ContractChangeArgs {
contract_id: Some(contract_key),
change_type: topology::ContractChangeType::SubscribedToContract,
change: Some(subscribed.as_union_value()),
},
);
buf.finish_minimal(msg);
buf.finished_data().to_vec()
}
}

impl PeerChange<'_> {
Expand Down Expand Up @@ -435,6 +514,26 @@ impl<'a> TryFromFbs<'a> for ContractChange<'a> {
})?;
Ok(Self::BroadcastReceived(req))
}
topology::ContractChangeType::GetContract => {
let req = req.change_as_get_contract().ok_or_else(|| {
flatbuffers::InvalidFlatbuffer::InconsistentUnion {
field: "change_type",
field_type: "ContractChangeType",
error_trace: Default::default(),
}
})?;
Ok(Self::GetContract(req))
}
topology::ContractChangeType::SubscribedToContract => {
let req = req.change_as_subscribed_to_contract().ok_or_else(|| {
flatbuffers::InvalidFlatbuffer::InconsistentUnion {
field: "change_type",
field_type: "ContractChangeType",
error_trace: Default::default(),
}
})?;
Ok(Self::SubscribedToContract(req))
}
_ => unreachable!("Invalid contract change type"),
}
}
Expand Down
Loading

0 comments on commit d4804dc

Please sign in to comment.