From d6cf7d1a3e1b33743cbad2519c915b4a051d7ecd Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 27 Nov 2024 17:22:46 +0100 Subject: [PATCH] Generify RaftMetadataStoreSvc into MetadataStoreNetworkSvc --- crates/metadata-store/build.rs | 4 +-- ...proto => metadata_store_network_svc.proto} | 12 +++---- crates/metadata-store/src/lib.rs | 2 +- .../src/network/connection_manager.rs | 21 +++++------ crates/metadata-store/src/network/grpc_svc.rs | 6 ++-- crates/metadata-store/src/network/handler.rs | 36 +++++++++---------- crates/metadata-store/src/network/mod.rs | 8 ++--- .../metadata-store/src/network/networking.rs | 20 +++++------ crates/metadata-store/src/raft/service.rs | 8 +++-- crates/metadata-store/src/raft/store.rs | 2 +- 10 files changed, 61 insertions(+), 58 deletions(-) rename crates/metadata-store/proto/{raft_metadata_store_svc.proto => metadata_store_network_svc.proto} (59%) diff --git a/crates/metadata-store/build.rs b/crates/metadata-store/build.rs index 852900657..2e16327e7 100644 --- a/crates/metadata-store/build.rs +++ b/crates/metadata-store/build.rs @@ -23,9 +23,9 @@ fn main() -> Result<(), Box> { tonic_build::configure() .bytes(["."]) - .file_descriptor_set_path(out_dir.join("raft_metadata_store_svc.bin")) + .file_descriptor_set_path(out_dir.join("metadata_store_network_svc.bin")) .protoc_arg("--experimental_allow_proto3_optional") - .compile_protos(&["./proto/raft_metadata_store_svc.proto"], &["proto"])?; + .compile_protos(&["./proto/metadata_store_network_svc.proto"], &["proto"])?; Ok(()) } diff --git a/crates/metadata-store/proto/raft_metadata_store_svc.proto b/crates/metadata-store/proto/metadata_store_network_svc.proto similarity index 59% rename from crates/metadata-store/proto/raft_metadata_store_svc.proto rename to crates/metadata-store/proto/metadata_store_network_svc.proto index 84a4f4f07..ea6c1aa3e 100644 --- a/crates/metadata-store/proto/raft_metadata_store_svc.proto +++ b/crates/metadata-store/proto/metadata_store_network_svc.proto @@ -11,14 +11,14 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; -package dev.restate.raft_metadata_store_svc; +package dev.restate.metadata_store_network_svc; -// Grpc service definition for the RaftMetadataStore implementation. -service RaftMetadataStoreSvc { - rpc Raft(stream RaftMessage) returns (stream RaftMessage); +// Grpc service definition for the metadata store network implementation. +service MetadataStoreNetworkSvc { + rpc ConnectTo(stream NetworkMessage) returns (stream NetworkMessage); } -message RaftMessage { - bytes message = 1; +message NetworkMessage { + bytes payload = 1; } diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index 3d360416d..fc2eaefe8 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -11,9 +11,9 @@ mod grpc; mod grpc_svc; pub mod local; +mod network; pub mod raft; mod util; -mod network; use bytestring::ByteString; use restate_core::metadata_store::VersionedValue; diff --git a/crates/metadata-store/src/network/connection_manager.rs b/crates/metadata-store/src/network/connection_manager.rs index b515d7306..1fb1769e1 100644 --- a/crates/metadata-store/src/network/connection_manager.rs +++ b/crates/metadata-store/src/network/connection_manager.rs @@ -19,7 +19,8 @@ use tokio::sync::mpsc::error::TrySendError; use tokio_stream::wrappers::ReceiverStream; use tonic::codegen::BoxStream; use tracing::{debug, instrument}; -use crate::network::grpc_svc::RaftMessage; + +use crate::network::NetworkMessage; #[derive(Debug, thiserror::Error)] pub enum ConnectionError { @@ -48,8 +49,8 @@ impl ConnectionManager { pub fn accept_connection( &self, raft_peer: u64, - incoming_rx: tonic::Streaming, - ) -> Result, ConnectionError> { + incoming_rx: tonic::Streaming, + ) -> Result, ConnectionError> { let (outgoing_tx, outgoing_rx) = mpsc::channel(128); self.run_connection(raft_peer, outgoing_tx, incoming_rx)?; @@ -62,8 +63,8 @@ impl ConnectionManager { pub fn run_connection( &self, remote_peer: u64, - outgoing_tx: mpsc::Sender, - incoming_rx: tonic::Streaming, + outgoing_tx: mpsc::Sender, + incoming_rx: tonic::Streaming, ) -> Result<(), ConnectionError> { let mut guard = self.inner.connections.lock().unwrap(); @@ -101,7 +102,7 @@ struct ConnectionReactor { impl ConnectionReactor { #[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))] - async fn run(self, mut incoming_rx: tonic::Streaming) -> anyhow::Result<()> { + async fn run(self, mut incoming_rx: tonic::Streaming) -> anyhow::Result<()> { let mut shutdown = std::pin::pin!(cancellation_watcher()); debug!("Run connection reactor"); @@ -115,7 +116,7 @@ impl ConnectionReactor { Some(message) => { match message { Ok(message) => { - let message = Message::parse_from_carllerche_bytes(&message.message)?; + let message = Message::parse_from_carllerche_bytes(&message.payload)?; assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity); @@ -174,15 +175,15 @@ impl ConnectionManagerInner { #[derive(Debug, Clone)] pub struct Connection { - tx: mpsc::Sender, + tx: mpsc::Sender, } impl Connection { - pub fn new(tx: mpsc::Sender) -> Self { + pub fn new(tx: mpsc::Sender) -> Self { Connection { tx } } - pub fn try_send(&self, message: RaftMessage) -> Result<(), TrySendError> { + pub fn try_send(&self, message: NetworkMessage) -> Result<(), TrySendError> { self.tx.try_send(message) } } diff --git a/crates/metadata-store/src/network/grpc_svc.rs b/crates/metadata-store/src/network/grpc_svc.rs index a9064c261..20fc64f48 100644 --- a/crates/metadata-store/src/network/grpc_svc.rs +++ b/crates/metadata-store/src/network/grpc_svc.rs @@ -1,4 +1,4 @@ -// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. // All rights reserved. // // Use of this software is governed by the Business Source License @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -tonic::include_proto!("dev.restate.raft_metadata_store_svc"); +tonic::include_proto!("dev.restate.metadata_store_network_svc"); pub const FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("raft_metadata_store_svc"); + tonic::include_file_descriptor_set!("metadata_store_network_svc"); diff --git a/crates/metadata-store/src/network/handler.rs b/crates/metadata-store/src/network/handler.rs index 1e7812e4c..ccf9ecfd1 100644 --- a/crates/metadata-store/src/network/handler.rs +++ b/crates/metadata-store/src/network/handler.rs @@ -8,52 +8,52 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::network::connection_manager::ConnectionError; +use crate::network::grpc_svc::metadata_store_network_svc_server::MetadataStoreNetworkSvc; +use crate::network::grpc_svc::NetworkMessage; +use crate::network::ConnectionManager; use std::str::FromStr; use tonic::codegen::BoxStream; use tonic::{Request, Response, Status, Streaming}; -use crate::network::connection_manager::ConnectionError; -use crate::network::ConnectionManager; -use crate::network::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc; -use crate::network::grpc_svc::RaftMessage; -pub const RAFT_PEER_METADATA_KEY: &str = "x-restate-raft-peer"; +pub const PEER_METADATA_KEY: &str = "x-restate-metadata-store-peer"; #[derive(Debug)] -pub struct RaftMetadataStoreHandler { +pub struct MetadataStoreNetworkHandler { connection_manager: ConnectionManager, } -impl RaftMetadataStoreHandler { +impl MetadataStoreNetworkHandler { pub fn new(connection_manager: ConnectionManager) -> Self { Self { connection_manager } } } #[async_trait::async_trait] -impl RaftMetadataStoreSvc for RaftMetadataStoreHandler { - type RaftStream = BoxStream; +impl MetadataStoreNetworkSvc for MetadataStoreNetworkHandler { + type ConnectToStream = BoxStream; - async fn raft( + async fn connect_to( &self, - request: Request>, - ) -> Result, Status> { - let raft_peer_metadata = + request: Request>, + ) -> Result, Status> { + let peer_metadata = request .metadata() - .get(RAFT_PEER_METADATA_KEY) + .get(PEER_METADATA_KEY) .ok_or(Status::invalid_argument(format!( "'{}' is missing", - RAFT_PEER_METADATA_KEY + PEER_METADATA_KEY )))?; - let raft_peer = u64::from_str( - raft_peer_metadata + let peer = u64::from_str( + peer_metadata .to_str() .map_err(|err| Status::invalid_argument(err.to_string()))?, ) .map_err(|err| Status::invalid_argument(err.to_string()))?; let outgoing_rx = self .connection_manager - .accept_connection(raft_peer, request.into_inner())?; + .accept_connection(peer, request.into_inner())?; Ok(Response::new(outgoing_rx)) } } diff --git a/crates/metadata-store/src/network/mod.rs b/crates/metadata-store/src/network/mod.rs index 8e8bf6771..2ab1d7567 100644 --- a/crates/metadata-store/src/network/mod.rs +++ b/crates/metadata-store/src/network/mod.rs @@ -14,7 +14,7 @@ mod handler; mod networking; pub use connection_manager::ConnectionManager; -pub use grpc_svc::FILE_DESCRIPTOR_SET; -pub use grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvcServer; -pub use handler::RaftMetadataStoreHandler; -pub use networking::Networking; \ No newline at end of file +pub use grpc_svc::metadata_store_network_svc_server::MetadataStoreNetworkSvcServer; +pub use grpc_svc::{NetworkMessage, FILE_DESCRIPTOR_SET}; +pub use handler::MetadataStoreNetworkHandler; +pub use networking::Networking; diff --git a/crates/metadata-store/src/network/networking.rs b/crates/metadata-store/src/network/networking.rs index cb14f719c..d12636eb5 100644 --- a/crates/metadata-store/src/network/networking.rs +++ b/crates/metadata-store/src/network/networking.rs @@ -8,6 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::network::connection_manager::ConnectionManager; +use crate::network::handler::PEER_METADATA_KEY; +use crate::network::NetworkMessage; use bytes::{BufMut, BytesMut}; use futures::FutureExt; use protobuf::Message as ProtobufMessage; @@ -23,9 +26,6 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::metadata::MetadataValue; use tonic::IntoStreamingRequest; use tracing::{debug, trace}; -use crate::network::connection_manager::ConnectionManager; -use crate::network::grpc_svc::RaftMessage; -use crate::network::handler::RAFT_PEER_METADATA_KEY; #[derive(Debug, thiserror::Error)] pub enum TrySendError { @@ -70,13 +70,13 @@ impl Networking { .expect("should be able to write message"); self.serde_buffer = writer.into_inner(); - // todo: Maybe send message directly w/o indirection through RaftMessage - let raft_message = RaftMessage { - message: self.serde_buffer.split().freeze(), + // todo: Maybe send message directly w/o indirection through NetworkMessage + let network_message = NetworkMessage { + payload: self.serde_buffer.split().freeze(), }; connection - .try_send(raft_message) + .try_send(network_message) .map_err(|_err| TrySendError::Send(message))?; } else if let Some(address) = self.addresses.get(&target) { if let Some(task_handle) = self.connection_attempts.remove(&target) { @@ -131,17 +131,17 @@ impl Networking { ); async move { - let mut raft_client = crate::network::grpc_svc::raft_metadata_store_svc_client::RaftMetadataStoreSvcClient::new(channel); + let mut network_client = crate::network::grpc_svc::metadata_store_network_svc_client::MetadataStoreNetworkSvcClient::new(channel); let (outgoing_tx, outgoing_rx) = mpsc::channel(128); let mut request = ReceiverStream::new(outgoing_rx).into_streaming_request(); // send our identity alongside with the request to the target request.metadata_mut().insert( - RAFT_PEER_METADATA_KEY, + PEER_METADATA_KEY, MetadataValue::try_from(connection_manager.identity().to_string())?, ); - let incoming_rx = raft_client.raft(request).await?; + let incoming_rx = network_client.connect_to(request).await?; connection_manager.run_connection( target, diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs index 9015bb371..7be720180 100644 --- a/crates/metadata-store/src/raft/service.rs +++ b/crates/metadata-store/src/raft/service.rs @@ -12,6 +12,9 @@ use crate::grpc::handler::MetadataStoreHandler; use crate::grpc::server::GrpcServer; use crate::grpc::service_builder::GrpcServiceBuilder; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::network::{ + ConnectionManager, MetadataStoreNetworkHandler, MetadataStoreNetworkSvcServer, Networking, +}; use crate::raft::store::RaftMetadataStore; use crate::{grpc_svc, network, Error, MetadataStoreService}; use assert2::let_assert; @@ -22,7 +25,6 @@ use restate_types::health::HealthStatus; use restate_types::live::BoxedLiveLoad; use restate_types::protobuf::common::MetadataServerStatus; use tokio::sync::mpsc; -use crate::network::{ConnectionManager, Networking, RaftMetadataStoreHandler, RaftMetadataStoreSvcServer}; pub struct RaftMetadataStoreService { health_status: HealthStatus, @@ -69,8 +71,8 @@ impl MetadataStoreService for RaftMetadataStoreService { ))); builder.register_file_descriptor_set_for_reflection(network::FILE_DESCRIPTOR_SET); - builder.add_service(RaftMetadataStoreSvcServer::new( - RaftMetadataStoreHandler::new(connection_manager), + builder.add_service(MetadataStoreNetworkSvcServer::new( + MetadataStoreNetworkHandler::new(connection_manager), )); let grpc_server = diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs index 737dca812..ec53acc75 100644 --- a/crates/metadata-store/src/raft/store.rs +++ b/crates/metadata-store/src/raft/store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::network::Networking; use crate::raft::storage; use crate::raft::storage::RocksDbStorage; use crate::{ @@ -34,7 +35,6 @@ use tokio::time::MissedTickBehavior; use tracing::{debug, info, warn}; use tracing_slog::TracingSlogDrain; use ulid::Ulid; -use crate::network::Networking; #[derive(Debug, thiserror::Error)] pub enum BuildError {