Skip to content

Commit

Permalink
Make metadata store network layer accept generic messages
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 27, 2024
1 parent eb5b9ec commit dda4416
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 46 deletions.
40 changes: 22 additions & 18 deletions crates/metadata-store/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::network::Message;
use crate::network::NetworkMessage;
use futures::StreamExt;
use protobuf::Message as ProtobufMessage;
use raft::prelude::Message;
use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
Expand All @@ -20,8 +20,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::BoxStream;
use tracing::{debug, instrument};

use crate::network::NetworkMessage;

#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
#[error("internal error: {0}")]
Expand All @@ -31,12 +29,15 @@ pub enum ConnectionError {
}

#[derive(Clone, derive_more::Debug)]
pub struct ConnectionManager {
inner: Arc<ConnectionManagerInner>,
pub struct ConnectionManager<M> {
inner: Arc<ConnectionManagerInner<M>>,
}

impl ConnectionManager {
pub fn new(identity: u64, router: mpsc::Sender<Message>) -> Self {
impl<M> ConnectionManager<M>
where
M: Message + Send + 'static,
{
pub fn new(identity: u64, router: mpsc::Sender<M>) -> Self {
ConnectionManager {
inner: Arc::new(ConnectionManagerInner::new(identity, router)),
}
Expand Down Expand Up @@ -95,12 +96,15 @@ impl ConnectionManager {
}
}

struct ConnectionReactor {
struct ConnectionReactor<M> {
remote_peer: u64,
connection_manager: Arc<ConnectionManagerInner>,
connection_manager: Arc<ConnectionManagerInner<M>>,
}

impl ConnectionReactor {
impl<M> ConnectionReactor<M>
where
M: Message,
{
#[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))]
async fn run(self, mut incoming_rx: tonic::Streaming<NetworkMessage>) -> anyhow::Result<()> {
let mut shutdown = std::pin::pin!(cancellation_watcher());
Expand All @@ -116,9 +120,9 @@ impl ConnectionReactor {
Some(message) => {
match message {
Ok(message) => {
let message = Message::parse_from_carllerche_bytes(&message.payload)?;
let message = M::deserialize(&message.payload)?;

assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity);
assert_eq!(message.to(), self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity);

if self.connection_manager.router.send(message).await.is_err() {
// system is shutting down
Expand All @@ -145,7 +149,7 @@ impl ConnectionReactor {
}
}

impl Drop for ConnectionReactor {
impl<M> Drop for ConnectionReactor<M> {
fn drop(&mut self) {
debug!(remote_peer = %self.remote_peer, "Close connection");
self.connection_manager
Expand All @@ -157,14 +161,14 @@ impl Drop for ConnectionReactor {
}

#[derive(Debug)]
struct ConnectionManagerInner {
struct ConnectionManagerInner<M> {
identity: u64,
connections: Mutex<HashMap<u64, Connection>>,
router: mpsc::Sender<Message>,
router: mpsc::Sender<M>,
}

impl ConnectionManagerInner {
pub fn new(identity: u64, router: mpsc::Sender<Message>) -> Self {
impl<M> ConnectionManagerInner<M> {
pub fn new(identity: u64, router: mpsc::Sender<M>) -> Self {
ConnectionManagerInner {
identity,
router,
Expand Down
15 changes: 9 additions & 6 deletions crates/metadata-store/src/network/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,29 @@
use crate::network::connection_manager::ConnectionError;
use crate::network::grpc_svc::metadata_store_network_svc_server::MetadataStoreNetworkSvc;
use crate::network::grpc_svc::NetworkMessage;
use crate::network::ConnectionManager;
use crate::network::{ConnectionManager, Message};
use std::str::FromStr;
use tonic::codegen::BoxStream;
use tonic::{Request, Response, Status, Streaming};

pub const PEER_METADATA_KEY: &str = "x-restate-metadata-store-peer";

#[derive(Debug)]
pub struct MetadataStoreNetworkHandler {
connection_manager: ConnectionManager,
pub struct MetadataStoreNetworkHandler<M> {
connection_manager: ConnectionManager<M>,
}

impl MetadataStoreNetworkHandler {
pub fn new(connection_manager: ConnectionManager) -> Self {
impl<M> MetadataStoreNetworkHandler<M> {
pub fn new(connection_manager: ConnectionManager<M>) -> Self {
Self { connection_manager }
}
}

#[async_trait::async_trait]
impl MetadataStoreNetworkSvc for MetadataStoreNetworkHandler {
impl<M> MetadataStoreNetworkSvc for MetadataStoreNetworkHandler<M>
where
M: Message + Send + 'static,
{
type ConnectToStream = BoxStream<NetworkMessage>;

async fn connect_to(
Expand Down
2 changes: 1 addition & 1 deletion crates/metadata-store/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ pub use connection_manager::ConnectionManager;
pub use grpc_svc::metadata_store_network_svc_server::MetadataStoreNetworkSvcServer;
pub use grpc_svc::{NetworkMessage, FILE_DESCRIPTOR_SET};
pub use handler::MetadataStoreNetworkHandler;
pub use networking::Networking;
pub use networking::{Message, Networking};
42 changes: 25 additions & 17 deletions crates/metadata-store/src/network/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
use crate::network::connection_manager::ConnectionManager;
use crate::network::handler::PEER_METADATA_KEY;
use crate::network::NetworkMessage;
use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use futures::FutureExt;
use protobuf::Message as ProtobufMessage;
use raft::prelude::Message;
use restate_core::network::net_util;
use restate_core::{ShutdownError, TaskCenter, TaskHandle, TaskKind};
use restate_types::config::{Configuration, NetworkingOptions};
use restate_types::net::AdvertisedAddress;
use std::collections::HashMap;
use std::mem;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::MetadataValue;
Expand All @@ -40,16 +37,19 @@ pub enum TrySendError<T> {
}

#[derive(derive_more::Debug)]
pub struct Networking {
connection_manager: ConnectionManager,
pub struct Networking<M> {
connection_manager: ConnectionManager<M>,
addresses: HashMap<u64, AdvertisedAddress>,
#[debug(skip)]
connection_attempts: HashMap<u64, TaskHandle<anyhow::Result<()>>>,
serde_buffer: BytesMut,
}

impl Networking {
pub fn new(connection_manager: ConnectionManager) -> Self {
impl<M> Networking<M>
where
M: Message + Clone + Send + 'static,
{
pub fn new(connection_manager: ConnectionManager<M>) -> Self {
Networking {
connection_manager,
addresses: HashMap::default(),
Expand All @@ -62,15 +62,9 @@ impl Networking {
self.addresses.insert(peer, address);
}

pub fn try_send(&mut self, message: Message) -> Result<(), TrySendError<Message>> {
let target = message.to;

pub fn try_send(&mut self, target: u64, message: M) -> Result<(), TrySendError<M>> {
if let Some(connection) = self.connection_manager.get_connection(target) {
let mut writer = mem::take(&mut self.serde_buffer).writer();
message
.write_to_writer(&mut writer)
.expect("should be able to write message");
self.serde_buffer = writer.into_inner();
message.serialize(&mut self.serde_buffer);

// todo: Maybe send message directly w/o indirection through NetworkMessage
let network_message = NetworkMessage {
Expand Down Expand Up @@ -118,7 +112,7 @@ impl Networking {
}

fn try_connecting_to(
connection_manager: ConnectionManager,
connection_manager: ConnectionManager<M>,
target: u64,
address: AdvertisedAddress,
networking_options: &NetworkingOptions,
Expand Down Expand Up @@ -158,3 +152,17 @@ impl Networking {
)
}
}

/// A message that can be sent over the network
pub trait Message {
/// The target of the message
fn to(&self) -> u64;

/// Serialize the message into the buffer
fn serialize(&self, buffer: impl BufMut);

/// Deserialize the message from the bytes
fn deserialize(bytes: &Bytes) -> anyhow::Result<Self>
where
Self: Sized;
}
22 changes: 21 additions & 1 deletion crates/metadata-store/src/raft/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ use crate::grpc::server::GrpcServer;
use crate::grpc::service_builder::GrpcServiceBuilder;
use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer;
use crate::network::{
ConnectionManager, MetadataStoreNetworkHandler, MetadataStoreNetworkSvcServer, Networking,
ConnectionManager, Message, MetadataStoreNetworkHandler, MetadataStoreNetworkSvcServer,
Networking,
};
use crate::raft::store::RaftMetadataStore;
use crate::{grpc_svc, network, Error, MetadataStoreService};
use anyhow::Context;
use assert2::let_assert;
use bytes::{BufMut, Bytes};
use futures::TryFutureExt;
use protobuf::Message as ProtobufMessage;
use restate_core::{TaskCenter, TaskKind};
use restate_types::config::{Kind, MetadataStoreOptions, RocksDbOptions};
use restate_types::health::HealthStatus;
Expand Down Expand Up @@ -89,3 +93,19 @@ impl MetadataStoreService for RaftMetadataStoreService {
Ok(())
}
}

impl Message for raft::prelude::Message {
fn to(&self) -> u64 {
self.to
}

fn serialize(&self, buffer: impl BufMut) {
let mut writer = buffer.writer();
self.write_to_writer(&mut writer)
.expect("should be able to write message");
}

fn deserialize(bytes: &Bytes) -> anyhow::Result<Self> {
ProtobufMessage::parse_from_carllerche_bytes(bytes).context("failed deserializing message")
}
}
6 changes: 3 additions & 3 deletions crates/metadata-store/src/raft/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub enum Error {
pub struct RaftMetadataStore {
_logger: slog::Logger,
raw_node: RawNode<RocksDbStorage>,
networking: Networking,
networking: Networking<Message>,
raft_rx: mpsc::Receiver<Message>,
tick_interval: time::Interval,

Expand All @@ -78,7 +78,7 @@ impl RaftMetadataStore {
pub async fn create(
raft_options: &RaftOptions,
rocksdb_options: BoxedLiveLoad<RocksDbOptions>,
mut networking: Networking,
mut networking: Networking<Message>,
raft_rx: mpsc::Receiver<Message>,
) -> Result<Self, BuildError> {
let (request_tx, request_rx) = mpsc::channel(2);
Expand Down Expand Up @@ -248,7 +248,7 @@ impl RaftMetadataStore {

fn send_messages(&mut self, messages: Vec<Message>) {
for message in messages {
if let Err(err) = self.networking.try_send(message) {
if let Err(err) = self.networking.try_send(message.to, message) {
debug!("failed sending message: {err}");
}
}
Expand Down

0 comments on commit dda4416

Please sign in to comment.