Skip to content

Commit

Permalink
Generify RaftMetadataStoreSvc into MetadataStoreNetworkSvc
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 27, 2024
1 parent 9ebbcf6 commit d6cf7d1
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 58 deletions.
4 changes: 2 additions & 2 deletions crates/metadata-store/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("raft_metadata_store_svc.bin"))
.file_descriptor_set_path(out_dir.join("metadata_store_network_svc.bin"))
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["./proto/raft_metadata_store_svc.proto"], &["proto"])?;
.compile_protos(&["./proto/metadata_store_network_svc.proto"], &["proto"])?;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ syntax = "proto3";

import "google/protobuf/empty.proto";

package dev.restate.raft_metadata_store_svc;
package dev.restate.metadata_store_network_svc;

// Grpc service definition for the RaftMetadataStore implementation.
service RaftMetadataStoreSvc {
rpc Raft(stream RaftMessage) returns (stream RaftMessage);
// Grpc service definition for the metadata store network implementation.
service MetadataStoreNetworkSvc {
rpc ConnectTo(stream NetworkMessage) returns (stream NetworkMessage);
}

message RaftMessage {
bytes message = 1;
message NetworkMessage {
bytes payload = 1;
}

2 changes: 1 addition & 1 deletion crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
mod grpc;
mod grpc_svc;
pub mod local;
mod network;
pub mod raft;
mod util;
mod network;

use bytestring::ByteString;
use restate_core::metadata_store::VersionedValue;
Expand Down
21 changes: 11 additions & 10 deletions crates/metadata-store/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::BoxStream;
use tracing::{debug, instrument};
use crate::network::grpc_svc::RaftMessage;

use crate::network::NetworkMessage;

#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
Expand Down Expand Up @@ -48,8 +49,8 @@ impl ConnectionManager {
pub fn accept_connection(
&self,
raft_peer: u64,
incoming_rx: tonic::Streaming<RaftMessage>,
) -> Result<BoxStream<RaftMessage>, ConnectionError> {
incoming_rx: tonic::Streaming<NetworkMessage>,
) -> Result<BoxStream<NetworkMessage>, ConnectionError> {
let (outgoing_tx, outgoing_rx) = mpsc::channel(128);
self.run_connection(raft_peer, outgoing_tx, incoming_rx)?;

Expand All @@ -62,8 +63,8 @@ impl ConnectionManager {
pub fn run_connection(
&self,
remote_peer: u64,
outgoing_tx: mpsc::Sender<RaftMessage>,
incoming_rx: tonic::Streaming<RaftMessage>,
outgoing_tx: mpsc::Sender<NetworkMessage>,
incoming_rx: tonic::Streaming<NetworkMessage>,
) -> Result<(), ConnectionError> {
let mut guard = self.inner.connections.lock().unwrap();

Expand Down Expand Up @@ -101,7 +102,7 @@ struct ConnectionReactor {

impl ConnectionReactor {
#[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))]
async fn run(self, mut incoming_rx: tonic::Streaming<RaftMessage>) -> anyhow::Result<()> {
async fn run(self, mut incoming_rx: tonic::Streaming<NetworkMessage>) -> anyhow::Result<()> {
let mut shutdown = std::pin::pin!(cancellation_watcher());
debug!("Run connection reactor");

Expand All @@ -115,7 +116,7 @@ impl ConnectionReactor {
Some(message) => {
match message {
Ok(message) => {
let message = Message::parse_from_carllerche_bytes(&message.message)?;
let message = Message::parse_from_carllerche_bytes(&message.payload)?;

assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity);

Expand Down Expand Up @@ -174,15 +175,15 @@ impl ConnectionManagerInner {

#[derive(Debug, Clone)]
pub struct Connection {
tx: mpsc::Sender<RaftMessage>,
tx: mpsc::Sender<NetworkMessage>,
}

impl Connection {
pub fn new(tx: mpsc::Sender<RaftMessage>) -> Self {
pub fn new(tx: mpsc::Sender<NetworkMessage>) -> Self {
Connection { tx }
}

pub fn try_send(&self, message: RaftMessage) -> Result<(), TrySendError<RaftMessage>> {
pub fn try_send(&self, message: NetworkMessage) -> Result<(), TrySendError<NetworkMessage>> {
self.tx.try_send(message)
}
}
6 changes: 3 additions & 3 deletions crates/metadata-store/src/network/grpc_svc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
Expand All @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

tonic::include_proto!("dev.restate.raft_metadata_store_svc");
tonic::include_proto!("dev.restate.metadata_store_network_svc");

pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("raft_metadata_store_svc");
tonic::include_file_descriptor_set!("metadata_store_network_svc");
36 changes: 18 additions & 18 deletions crates/metadata-store/src/network/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,52 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::network::connection_manager::ConnectionError;
use crate::network::grpc_svc::metadata_store_network_svc_server::MetadataStoreNetworkSvc;
use crate::network::grpc_svc::NetworkMessage;
use crate::network::ConnectionManager;
use std::str::FromStr;
use tonic::codegen::BoxStream;
use tonic::{Request, Response, Status, Streaming};
use crate::network::connection_manager::ConnectionError;
use crate::network::ConnectionManager;
use crate::network::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc;
use crate::network::grpc_svc::RaftMessage;

pub const RAFT_PEER_METADATA_KEY: &str = "x-restate-raft-peer";
pub const PEER_METADATA_KEY: &str = "x-restate-metadata-store-peer";

#[derive(Debug)]
pub struct RaftMetadataStoreHandler {
pub struct MetadataStoreNetworkHandler {
connection_manager: ConnectionManager,
}

impl RaftMetadataStoreHandler {
impl MetadataStoreNetworkHandler {
pub fn new(connection_manager: ConnectionManager) -> Self {
Self { connection_manager }
}
}

#[async_trait::async_trait]
impl RaftMetadataStoreSvc for RaftMetadataStoreHandler {
type RaftStream = BoxStream<RaftMessage>;
impl MetadataStoreNetworkSvc for MetadataStoreNetworkHandler {
type ConnectToStream = BoxStream<NetworkMessage>;

async fn raft(
async fn connect_to(
&self,
request: Request<Streaming<RaftMessage>>,
) -> Result<Response<Self::RaftStream>, Status> {
let raft_peer_metadata =
request: Request<Streaming<NetworkMessage>>,
) -> Result<Response<Self::ConnectToStream>, Status> {
let peer_metadata =
request
.metadata()
.get(RAFT_PEER_METADATA_KEY)
.get(PEER_METADATA_KEY)
.ok_or(Status::invalid_argument(format!(
"'{}' is missing",
RAFT_PEER_METADATA_KEY
PEER_METADATA_KEY
)))?;
let raft_peer = u64::from_str(
raft_peer_metadata
let peer = u64::from_str(
peer_metadata
.to_str()
.map_err(|err| Status::invalid_argument(err.to_string()))?,
)
.map_err(|err| Status::invalid_argument(err.to_string()))?;
let outgoing_rx = self
.connection_manager
.accept_connection(raft_peer, request.into_inner())?;
.accept_connection(peer, request.into_inner())?;
Ok(Response::new(outgoing_rx))
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/metadata-store/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod handler;
mod networking;

pub use connection_manager::ConnectionManager;
pub use grpc_svc::FILE_DESCRIPTOR_SET;
pub use grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvcServer;
pub use handler::RaftMetadataStoreHandler;
pub use networking::Networking;
pub use grpc_svc::metadata_store_network_svc_server::MetadataStoreNetworkSvcServer;
pub use grpc_svc::{NetworkMessage, FILE_DESCRIPTOR_SET};
pub use handler::MetadataStoreNetworkHandler;
pub use networking::Networking;
20 changes: 10 additions & 10 deletions crates/metadata-store/src/network/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::network::connection_manager::ConnectionManager;
use crate::network::handler::PEER_METADATA_KEY;
use crate::network::NetworkMessage;
use bytes::{BufMut, BytesMut};
use futures::FutureExt;
use protobuf::Message as ProtobufMessage;
Expand All @@ -23,9 +26,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::MetadataValue;
use tonic::IntoStreamingRequest;
use tracing::{debug, trace};
use crate::network::connection_manager::ConnectionManager;
use crate::network::grpc_svc::RaftMessage;
use crate::network::handler::RAFT_PEER_METADATA_KEY;

#[derive(Debug, thiserror::Error)]
pub enum TrySendError<T> {
Expand Down Expand Up @@ -70,13 +70,13 @@ impl Networking {
.expect("should be able to write message");
self.serde_buffer = writer.into_inner();

// todo: Maybe send message directly w/o indirection through RaftMessage
let raft_message = RaftMessage {
message: self.serde_buffer.split().freeze(),
// todo: Maybe send message directly w/o indirection through NetworkMessage
let network_message = NetworkMessage {
payload: self.serde_buffer.split().freeze(),
};

connection
.try_send(raft_message)
.try_send(network_message)
.map_err(|_err| TrySendError::Send(message))?;
} else if let Some(address) = self.addresses.get(&target) {
if let Some(task_handle) = self.connection_attempts.remove(&target) {
Expand Down Expand Up @@ -131,17 +131,17 @@ impl Networking {
);

async move {
let mut raft_client = crate::network::grpc_svc::raft_metadata_store_svc_client::RaftMetadataStoreSvcClient::new(channel);
let mut network_client = crate::network::grpc_svc::metadata_store_network_svc_client::MetadataStoreNetworkSvcClient::new(channel);
let (outgoing_tx, outgoing_rx) = mpsc::channel(128);

let mut request = ReceiverStream::new(outgoing_rx).into_streaming_request();
// send our identity alongside with the request to the target
request.metadata_mut().insert(
RAFT_PEER_METADATA_KEY,
PEER_METADATA_KEY,
MetadataValue::try_from(connection_manager.identity().to_string())?,
);

let incoming_rx = raft_client.raft(request).await?;
let incoming_rx = network_client.connect_to(request).await?;

connection_manager.run_connection(
target,
Expand Down
8 changes: 5 additions & 3 deletions crates/metadata-store/src/raft/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use crate::grpc::handler::MetadataStoreHandler;
use crate::grpc::server::GrpcServer;
use crate::grpc::service_builder::GrpcServiceBuilder;
use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer;
use crate::network::{
ConnectionManager, MetadataStoreNetworkHandler, MetadataStoreNetworkSvcServer, Networking,
};
use crate::raft::store::RaftMetadataStore;
use crate::{grpc_svc, network, Error, MetadataStoreService};
use assert2::let_assert;
Expand All @@ -22,7 +25,6 @@ use restate_types::health::HealthStatus;
use restate_types::live::BoxedLiveLoad;
use restate_types::protobuf::common::MetadataServerStatus;
use tokio::sync::mpsc;
use crate::network::{ConnectionManager, Networking, RaftMetadataStoreHandler, RaftMetadataStoreSvcServer};

pub struct RaftMetadataStoreService {
health_status: HealthStatus<MetadataServerStatus>,
Expand Down Expand Up @@ -69,8 +71,8 @@ impl MetadataStoreService for RaftMetadataStoreService {
)));

builder.register_file_descriptor_set_for_reflection(network::FILE_DESCRIPTOR_SET);
builder.add_service(RaftMetadataStoreSvcServer::new(
RaftMetadataStoreHandler::new(connection_manager),
builder.add_service(MetadataStoreNetworkSvcServer::new(
MetadataStoreNetworkHandler::new(connection_manager),
));

let grpc_server =
Expand Down
2 changes: 1 addition & 1 deletion crates/metadata-store/src/raft/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::network::Networking;
use crate::raft::storage;
use crate::raft::storage::RocksDbStorage;
use crate::{
Expand All @@ -34,7 +35,6 @@ use tokio::time::MissedTickBehavior;
use tracing::{debug, info, warn};
use tracing_slog::TracingSlogDrain;
use ulid::Ulid;
use crate::network::Networking;

#[derive(Debug, thiserror::Error)]
pub enum BuildError {
Expand Down

0 comments on commit d6cf7d1

Please sign in to comment.