Skip to content

Commit

Permalink
Implement simple networking layer for RaftMetadataStore
Browse files Browse the repository at this point in the history
This fixes restatedev#1803.
  • Loading branch information
tillrohrmann committed Jan 14, 2025
1 parent aa6482a commit a911ba6
Show file tree
Hide file tree
Showing 15 changed files with 595 additions and 31 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/metadata-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
humantime = { workspace = true }
Expand All @@ -39,6 +40,7 @@ slog = { version = "2.7.0" }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true, features = ["transport", "codegen", "prost"] }
tracing = { workspace = true }
tracing-slog = { version = "0.3.0" }
Expand Down
6 changes: 6 additions & 0 deletions crates/metadata-store/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["./proto/metadata_store_svc.proto"], &["proto"])?;

tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("raft_metadata_store_svc.bin"))
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["./proto/raft_metadata_store_svc.proto"], &["proto"])?;

Ok(())
}
24 changes: 24 additions & 0 deletions crates/metadata-store/proto/raft_metadata_store_svc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/proto/blob/main/LICENSE

syntax = "proto3";

import "google/protobuf/empty.proto";

package dev.restate.raft_metadata_store_svc;

// Grpc service definition for the RaftMetadataStore implementation.
service RaftMetadataStoreSvc {
rpc Raft(stream RaftMessage) returns (stream RaftMessage);
}

message RaftMessage {
bytes message = 1;
}

23 changes: 14 additions & 9 deletions crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ mod util;

use crate::grpc::handler::MetadataStoreHandler;
use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer;
use crate::local::LocalMetadataStore;
use crate::raft::RaftMetadataStore;
use bytestring::ByteString;
use restate_core::metadata_store::VersionedValue;
pub use restate_core::metadata_store::{
Expand Down Expand Up @@ -192,13 +190,20 @@ pub async fn create_metadata_store(
server_builder: &mut NetworkServerBuilder,
) -> anyhow::Result<BoxedMetadataStoreService> {
match metadata_store_options.kind {
Kind::Local => {
let store = LocalMetadataStore::create(metadata_store_options, rocksdb_options).await?;
Ok(MetadataStoreRunner::new(store, health_status, server_builder).boxed())
}
Kind::Raft => {
let store = RaftMetadataStore::create().await?;
Ok(MetadataStoreRunner::new(store, health_status, server_builder).boxed())
Kind::Local => local::create_store(
metadata_store_options,
rocksdb_options,
health_status,
server_builder,
)
.await
.map_err(anyhow::Error::from)
.map(|store| store.boxed()),
Kind::Raft(ref raft_options) => {
raft::create_store(raft_options, rocksdb_options, health_status, server_builder)
.await
.map_err(anyhow::Error::from)
.map(|store| store.boxed())
}
}
}
24 changes: 22 additions & 2 deletions crates/metadata-store/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::grpc::client::GrpcMetadataStoreClient;
use restate_core::metadata_store::providers::create_object_store_based_meta_store;
use restate_core::metadata_store::{providers::EtcdMetadataStore, MetadataStoreClient};
use restate_core::network::NetworkServerBuilder;
use restate_rocksdb::RocksError;
use restate_types::config::{MetadataStoreOptions, RocksDbOptions};
use restate_types::health::HealthStatus;
use restate_types::live::BoxedLiveLoad;
use restate_types::protobuf::common::MetadataServerStatus;
use restate_types::{
config::{MetadataStoreClient as MetadataStoreClientConfig, MetadataStoreClientOptions},
errors::GenericError,
};

use crate::grpc::client::GrpcMetadataStoreClient;

mod store;

use crate::MetadataStoreRunner;
pub use store::LocalMetadataStore;

/// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataStoreClient`].
Expand Down Expand Up @@ -50,5 +56,19 @@ pub async fn create_client(
Ok(client)
}

pub async fn create_store(
metadata_store_options: &MetadataStoreOptions,
rocksdb_options: BoxedLiveLoad<RocksDbOptions>,
health_status: HealthStatus<MetadataServerStatus>,
server_builder: &mut NetworkServerBuilder,
) -> Result<MetadataStoreRunner<LocalMetadataStore>, RocksError> {
let store = LocalMetadataStore::create(metadata_store_options, rocksdb_options).await?;
Ok(MetadataStoreRunner::new(
store,
health_status,
server_builder,
))
}

#[cfg(test)]
mod tests;
188 changes: 188 additions & 0 deletions crates/metadata-store/src/raft/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::raft::grpc_svc::RaftMessage;
use futures::StreamExt;
use protobuf::Message as ProtobufMessage;
use raft::prelude::Message;
use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::BoxStream;
use tracing::{debug, instrument};

#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
#[error("internal error: {0}")]
Internal(String),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
}

#[derive(Clone, derive_more::Debug)]
pub struct ConnectionManager {
inner: Arc<ConnectionManagerInner>,
}

impl ConnectionManager {
pub fn new(identity: u64, router: mpsc::Sender<Message>) -> Self {
ConnectionManager {
inner: Arc::new(ConnectionManagerInner::new(identity, router)),
}
}

pub fn identity(&self) -> u64 {
self.inner.identity
}

pub fn accept_connection(
&self,
raft_peer: u64,
incoming_rx: tonic::Streaming<RaftMessage>,
) -> Result<BoxStream<RaftMessage>, ConnectionError> {
let (outgoing_tx, outgoing_rx) = mpsc::channel(128);
self.run_connection(raft_peer, outgoing_tx, incoming_rx)?;

let outgoing_stream = ReceiverStream::new(outgoing_rx)
.map(Result::<_, tonic::Status>::Ok)
.boxed();
Ok(outgoing_stream)
}

pub fn run_connection(
&self,
remote_peer: u64,
outgoing_tx: mpsc::Sender<RaftMessage>,
incoming_rx: tonic::Streaming<RaftMessage>,
) -> Result<(), ConnectionError> {
let mut guard = self.inner.connections.lock().unwrap();

if guard.contains_key(&remote_peer) {
// we already have a connection established to remote peer
return Ok(());
}

let connection = Connection::new(outgoing_tx);
guard.insert(remote_peer, connection);

let reactor = ConnectionReactor {
remote_peer,
connection_manager: Arc::clone(&self.inner),
};

let _task_id = TaskCenter::spawn_child(
TaskKind::ConnectionReactor,
"raft-connection-reactor",
reactor.run(incoming_rx),
)?;

Ok(())
}

pub fn get_connection(&self, target: u64) -> Option<Connection> {
self.inner.connections.lock().unwrap().get(&target).cloned()
}
}

struct ConnectionReactor {
remote_peer: u64,
connection_manager: Arc<ConnectionManagerInner>,
}

impl ConnectionReactor {
#[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))]
async fn run(self, mut incoming_rx: tonic::Streaming<RaftMessage>) -> anyhow::Result<()> {
let mut shutdown = std::pin::pin!(cancellation_watcher());
debug!("Run connection reactor");

loop {
tokio::select! {
_ = &mut shutdown => {
break;
},
message = incoming_rx.next() => {
match message {
Some(message) => {
match message {
Ok(message) => {
let message = Message::parse_from_carllerche_bytes(&message.message)?;

assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity);

if self.connection_manager.router.send(message).await.is_err() {
// system is shutting down
debug!("System is shutting down; closing connection");
break;
}
}
Err(err) => {
debug!("Closing connection because received error: {err}");
break;
}
}
}
None => {
debug!("Remote peer closed connection");
break
},
}
}
}
}

Ok(())
}
}

impl Drop for ConnectionReactor {
fn drop(&mut self) {
debug!(remote_peer = %self.remote_peer, "Close connection");
self.connection_manager
.connections
.lock()
.expect("shouldn't be poisoned")
.remove(&self.remote_peer);
}
}

#[derive(Debug)]
struct ConnectionManagerInner {
identity: u64,
connections: Mutex<HashMap<u64, Connection>>,
router: mpsc::Sender<Message>,
}

impl ConnectionManagerInner {
pub fn new(identity: u64, router: mpsc::Sender<Message>) -> Self {
ConnectionManagerInner {
identity,
router,
connections: Mutex::default(),
}
}
}

#[derive(Debug, Clone)]
pub struct Connection {
tx: mpsc::Sender<RaftMessage>,
}

impl Connection {
pub fn new(tx: mpsc::Sender<RaftMessage>) -> Self {
Connection { tx }
}

pub fn try_send(&self, message: RaftMessage) -> Result<(), TrySendError<RaftMessage>> {
self.tx.try_send(message)
}
}
14 changes: 14 additions & 0 deletions crates/metadata-store/src/raft/grpc_svc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

tonic::include_proto!("dev.restate.raft_metadata_store_svc");

pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("raft_metadata_store_svc");
Loading

0 comments on commit a911ba6

Please sign in to comment.