diff --git a/Cargo.lock b/Cargo.lock index 4476843f07..650b0f5a5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6669,6 +6669,7 @@ dependencies = [ "bytestring", "codederror", "derive_builder", + "derive_more", "flexbuffers", "futures", "googletest", @@ -6690,6 +6691,7 @@ dependencies = [ "test-log", "thiserror 2.0.6", "tokio", + "tokio-stream", "tonic", "tonic-build", "tracing", diff --git a/crates/metadata-store/Cargo.toml b/crates/metadata-store/Cargo.toml index ac86b58254..c2b70654fe 100644 --- a/crates/metadata-store/Cargo.toml +++ b/crates/metadata-store/Cargo.toml @@ -25,6 +25,7 @@ async-trait = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } derive_builder = { workspace = true } +derive_more = { workspace = true } futures = { workspace = true } http = { workspace = true } humantime = { workspace = true } @@ -39,6 +40,7 @@ slog = { version = "2.7.0" } static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true } tonic = { workspace = true, features = ["transport", "codegen", "prost"] } tracing = { workspace = true } tracing-slog = { version = "0.3.0" } diff --git a/crates/metadata-store/build.rs b/crates/metadata-store/build.rs index 6c2386c907..8529006572 100644 --- a/crates/metadata-store/build.rs +++ b/crates/metadata-store/build.rs @@ -21,5 +21,11 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .compile_protos(&["./proto/metadata_store_svc.proto"], &["proto"])?; + tonic_build::configure() + .bytes(["."]) + .file_descriptor_set_path(out_dir.join("raft_metadata_store_svc.bin")) + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["./proto/raft_metadata_store_svc.proto"], &["proto"])?; + Ok(()) } diff --git a/crates/metadata-store/proto/raft_metadata_store_svc.proto b/crates/metadata-store/proto/raft_metadata_store_svc.proto new file mode 100644 index 0000000000..84a4f4f074 --- /dev/null +++ b/crates/metadata-store/proto/raft_metadata_store_svc.proto @@ -0,0 +1,24 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package dev.restate.raft_metadata_store_svc; + +// Grpc service definition for the RaftMetadataStore implementation. +service RaftMetadataStoreSvc { + rpc Raft(stream RaftMessage) returns (stream RaftMessage); +} + +message RaftMessage { + bytes message = 1; +} + diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index ec3b9e986c..5bee385656 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -16,8 +16,6 @@ mod util; use crate::grpc::handler::MetadataStoreHandler; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; -use crate::local::LocalMetadataStore; -use crate::raft::RaftMetadataStore; use bytestring::ByteString; use restate_core::metadata_store::VersionedValue; pub use restate_core::metadata_store::{ @@ -192,13 +190,20 @@ pub async fn create_metadata_store( server_builder: &mut NetworkServerBuilder, ) -> anyhow::Result { match metadata_store_options.kind { - Kind::Local => { - let store = LocalMetadataStore::create(metadata_store_options, rocksdb_options).await?; - Ok(MetadataStoreRunner::new(store, health_status, server_builder).boxed()) - } - Kind::Raft => { - let store = RaftMetadataStore::create().await?; - Ok(MetadataStoreRunner::new(store, health_status, server_builder).boxed()) + Kind::Local => local::create_store( + metadata_store_options, + rocksdb_options, + health_status, + server_builder, + ) + .await + .map_err(anyhow::Error::from) + .map(|store| store.boxed()), + Kind::Raft(ref raft_options) => { + raft::create_store(raft_options, rocksdb_options, health_status, server_builder) + .await + .map_err(anyhow::Error::from) + .map(|store| store.boxed()) } } } diff --git a/crates/metadata-store/src/local/mod.rs b/crates/metadata-store/src/local/mod.rs index 8e906d0220..f00d16969c 100644 --- a/crates/metadata-store/src/local/mod.rs +++ b/crates/metadata-store/src/local/mod.rs @@ -8,17 +8,23 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::grpc::client::GrpcMetadataStoreClient; use restate_core::metadata_store::providers::create_object_store_based_meta_store; use restate_core::metadata_store::{providers::EtcdMetadataStore, MetadataStoreClient}; +use restate_core::network::NetworkServerBuilder; +use restate_rocksdb::RocksError; +use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; +use restate_types::health::HealthStatus; +use restate_types::live::BoxedLiveLoad; +use restate_types::protobuf::common::MetadataServerStatus; use restate_types::{ config::{MetadataStoreClient as MetadataStoreClientConfig, MetadataStoreClientOptions}, errors::GenericError, }; -use crate::grpc::client::GrpcMetadataStoreClient; - mod store; +use crate::MetadataStoreRunner; pub use store::LocalMetadataStore; /// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataStoreClient`]. @@ -50,5 +56,19 @@ pub async fn create_client( Ok(client) } +pub async fn create_store( + metadata_store_options: &MetadataStoreOptions, + rocksdb_options: BoxedLiveLoad, + health_status: HealthStatus, + server_builder: &mut NetworkServerBuilder, +) -> Result, RocksError> { + let store = LocalMetadataStore::create(metadata_store_options, rocksdb_options).await?; + Ok(MetadataStoreRunner::new( + store, + health_status, + server_builder, + )) +} + #[cfg(test)] mod tests; diff --git a/crates/metadata-store/src/raft/connection_manager.rs b/crates/metadata-store/src/raft/connection_manager.rs new file mode 100644 index 0000000000..cac261b5ef --- /dev/null +++ b/crates/metadata-store/src/raft/connection_manager.rs @@ -0,0 +1,188 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::grpc_svc::RaftMessage; +use futures::StreamExt; +use protobuf::Message as ProtobufMessage; +use raft::prelude::Message; +use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::BoxStream; +use tracing::{debug, instrument}; + +#[derive(Debug, thiserror::Error)] +pub enum ConnectionError { + #[error("internal error: {0}")] + Internal(String), + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + +#[derive(Clone, derive_more::Debug)] +pub struct ConnectionManager { + inner: Arc, +} + +impl ConnectionManager { + pub fn new(identity: u64, router: mpsc::Sender) -> Self { + ConnectionManager { + inner: Arc::new(ConnectionManagerInner::new(identity, router)), + } + } + + pub fn identity(&self) -> u64 { + self.inner.identity + } + + pub fn accept_connection( + &self, + raft_peer: u64, + incoming_rx: tonic::Streaming, + ) -> Result, ConnectionError> { + let (outgoing_tx, outgoing_rx) = mpsc::channel(128); + self.run_connection(raft_peer, outgoing_tx, incoming_rx)?; + + let outgoing_stream = ReceiverStream::new(outgoing_rx) + .map(Result::<_, tonic::Status>::Ok) + .boxed(); + Ok(outgoing_stream) + } + + pub fn run_connection( + &self, + remote_peer: u64, + outgoing_tx: mpsc::Sender, + incoming_rx: tonic::Streaming, + ) -> Result<(), ConnectionError> { + let mut guard = self.inner.connections.lock().unwrap(); + + if guard.contains_key(&remote_peer) { + // we already have a connection established to remote peer + return Ok(()); + } + + let connection = Connection::new(outgoing_tx); + guard.insert(remote_peer, connection); + + let reactor = ConnectionReactor { + remote_peer, + connection_manager: Arc::clone(&self.inner), + }; + + let _task_id = TaskCenter::spawn_child( + TaskKind::ConnectionReactor, + "raft-connection-reactor", + reactor.run(incoming_rx), + )?; + + Ok(()) + } + + pub fn get_connection(&self, target: u64) -> Option { + self.inner.connections.lock().unwrap().get(&target).cloned() + } +} + +struct ConnectionReactor { + remote_peer: u64, + connection_manager: Arc, +} + +impl ConnectionReactor { + #[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))] + async fn run(self, mut incoming_rx: tonic::Streaming) -> anyhow::Result<()> { + let mut shutdown = std::pin::pin!(cancellation_watcher()); + debug!("Run connection reactor"); + + loop { + tokio::select! { + _ = &mut shutdown => { + break; + }, + message = incoming_rx.next() => { + match message { + Some(message) => { + match message { + Ok(message) => { + let message = Message::parse_from_carllerche_bytes(&message.message)?; + + assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity); + + if self.connection_manager.router.send(message).await.is_err() { + // system is shutting down + debug!("System is shutting down; closing connection"); + break; + } + } + Err(err) => { + debug!("Closing connection because received error: {err}"); + break; + } + } + } + None => { + debug!("Remote peer closed connection"); + break + }, + } + } + } + } + + Ok(()) + } +} + +impl Drop for ConnectionReactor { + fn drop(&mut self) { + debug!(remote_peer = %self.remote_peer, "Close connection"); + self.connection_manager + .connections + .lock() + .expect("shouldn't be poisoned") + .remove(&self.remote_peer); + } +} + +#[derive(Debug)] +struct ConnectionManagerInner { + identity: u64, + connections: Mutex>, + router: mpsc::Sender, +} + +impl ConnectionManagerInner { + pub fn new(identity: u64, router: mpsc::Sender) -> Self { + ConnectionManagerInner { + identity, + router, + connections: Mutex::default(), + } + } +} + +#[derive(Debug, Clone)] +pub struct Connection { + tx: mpsc::Sender, +} + +impl Connection { + pub fn new(tx: mpsc::Sender) -> Self { + Connection { tx } + } + + pub fn try_send(&self, message: RaftMessage) -> Result<(), TrySendError> { + self.tx.try_send(message) + } +} diff --git a/crates/metadata-store/src/raft/grpc_svc.rs b/crates/metadata-store/src/raft/grpc_svc.rs new file mode 100644 index 0000000000..a9064c2611 --- /dev/null +++ b/crates/metadata-store/src/raft/grpc_svc.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +tonic::include_proto!("dev.restate.raft_metadata_store_svc"); + +pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("raft_metadata_store_svc"); diff --git a/crates/metadata-store/src/raft/handler.rs b/crates/metadata-store/src/raft/handler.rs new file mode 100644 index 0000000000..da9995ccba --- /dev/null +++ b/crates/metadata-store/src/raft/handler.rs @@ -0,0 +1,67 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::connection_manager::{ConnectionError, ConnectionManager}; +use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc; +use crate::raft::grpc_svc::RaftMessage; +use std::str::FromStr; +use tonic::codegen::BoxStream; +use tonic::{Request, Response, Status, Streaming}; + +pub const RAFT_PEER_METADATA_KEY: &str = "x-restate-raft-peer"; + +#[derive(Debug)] +pub struct RaftMetadataStoreHandler { + connection_manager: ConnectionManager, +} + +impl RaftMetadataStoreHandler { + pub fn new(connection_manager: ConnectionManager) -> Self { + Self { connection_manager } + } +} + +#[async_trait::async_trait] +impl RaftMetadataStoreSvc for RaftMetadataStoreHandler { + type RaftStream = BoxStream; + + async fn raft( + &self, + request: Request>, + ) -> Result, Status> { + let raft_peer_metadata = + request + .metadata() + .get(RAFT_PEER_METADATA_KEY) + .ok_or(Status::invalid_argument(format!( + "'{}' is missing", + RAFT_PEER_METADATA_KEY + )))?; + let raft_peer = u64::from_str( + raft_peer_metadata + .to_str() + .map_err(|err| Status::invalid_argument(err.to_string()))?, + ) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + let outgoing_rx = self + .connection_manager + .accept_connection(raft_peer, request.into_inner())?; + Ok(Response::new(outgoing_rx)) + } +} + +impl From for Status { + fn from(value: ConnectionError) -> Self { + match value { + ConnectionError::Internal(err) => Status::internal(err), + ConnectionError::Shutdown(err) => Status::aborted(err.to_string()), + } + } +} diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs index ea608049a2..06335c6bd8 100644 --- a/crates/metadata-store/src/raft/mod.rs +++ b/crates/metadata-store/src/raft/mod.rs @@ -8,7 +8,51 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod connection_manager; +pub mod grpc_svc; +mod handler; +mod networking; mod storage; mod store; +use crate::raft::connection_manager::ConnectionManager; +use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvcServer; +use crate::raft::handler::RaftMetadataStoreHandler; +use crate::raft::networking::Networking; +use crate::raft::store::BuildError; +use crate::MetadataStoreRunner; +use restate_core::network::NetworkServerBuilder; +use restate_types::config::{RaftOptions, RocksDbOptions}; +use restate_types::health::HealthStatus; +use restate_types::live::BoxedLiveLoad; +use restate_types::protobuf::common::MetadataServerStatus; pub use store::RaftMetadataStore; +use tokio::sync::mpsc; + +pub async fn create_store( + raft_options: &RaftOptions, + rocksdb_options: BoxedLiveLoad, + health_status: HealthStatus, + server_builder: &mut NetworkServerBuilder, +) -> Result, BuildError> { + let (router_tx, router_rx) = mpsc::channel(128); + let connection_manager = ConnectionManager::new(raft_options.id, router_tx); + let store = RaftMetadataStore::create( + raft_options, + rocksdb_options, + Networking::new(connection_manager.clone()), + router_rx, + ) + .await?; + + server_builder.register_grpc_service( + RaftMetadataStoreSvcServer::new(RaftMetadataStoreHandler::new(connection_manager)), + grpc_svc::FILE_DESCRIPTOR_SET, + ); + + Ok(MetadataStoreRunner::new( + store, + health_status, + server_builder, + )) +} diff --git a/crates/metadata-store/src/raft/networking.rs b/crates/metadata-store/src/raft/networking.rs new file mode 100644 index 0000000000..a8ae2dc39f --- /dev/null +++ b/crates/metadata-store/src/raft/networking.rs @@ -0,0 +1,154 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::connection_manager::ConnectionManager; +use crate::raft::grpc_svc::RaftMessage; +use crate::raft::handler::RAFT_PEER_METADATA_KEY; +use bytes::{BufMut, BytesMut}; +use futures::FutureExt; +use protobuf::Message as ProtobufMessage; +use raft::prelude::Message; +use restate_core::network::net_util; +use restate_core::{ShutdownError, TaskCenter, TaskHandle, TaskKind}; +use restate_types::config::{Configuration, NetworkingOptions}; +use restate_types::net::AdvertisedAddress; +use std::collections::HashMap; +use std::mem; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::metadata::MetadataValue; +use tonic::IntoStreamingRequest; +use tracing::{debug, trace}; + +#[derive(Debug, thiserror::Error)] +pub enum TrySendError { + #[error("failed sending message")] + Send(T), + #[error("unknown peer: {0}")] + UnknownPeer(u64), + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + +#[derive(derive_more::Debug)] +pub struct Networking { + connection_manager: ConnectionManager, + addresses: HashMap, + #[debug(skip)] + connection_attempts: HashMap>>, + serde_buffer: BytesMut, +} + +impl Networking { + pub fn new(connection_manager: ConnectionManager) -> Self { + Networking { + connection_manager, + addresses: HashMap::default(), + connection_attempts: HashMap::default(), + serde_buffer: BytesMut::with_capacity(1024), + } + } + + pub fn register_address(&mut self, peer: u64, address: AdvertisedAddress) { + self.addresses.insert(peer, address); + } + + pub fn try_send(&mut self, message: Message) -> Result<(), TrySendError> { + let target = message.to; + + if let Some(connection) = self.connection_manager.get_connection(target) { + let mut writer = mem::take(&mut self.serde_buffer).writer(); + message + .write_to_writer(&mut writer) + .expect("should be able to write message"); + self.serde_buffer = writer.into_inner(); + + // todo: Maybe send message directly w/o indirection through RaftMessage + let raft_message = RaftMessage { + message: self.serde_buffer.split().freeze(), + }; + + connection + .try_send(raft_message) + .map_err(|_err| TrySendError::Send(message))?; + } else if let Some(address) = self.addresses.get(&target) { + if let Some(task_handle) = self.connection_attempts.remove(&target) { + if !task_handle.is_finished() { + return Ok(()); + } else { + match task_handle.now_or_never().expect("should be finished") { + Ok(result) => { + match result { + Ok(_) => trace!("Previous connection attempt to '{target}' succeeded but connection was closed in meantime."), + Err(err) => trace!("Previous connection attempt to '{target}' failed: {}", err) + } + + } + Err(err) => { + trace!("Previous connection attempt to '{target}' panicked: {}", err) + } + } + } + } + + self.connection_attempts.insert( + target, + Self::try_connecting_to( + self.connection_manager.clone(), + target, + address.clone(), + &Configuration::pinned().networking, + )?, + ); + } else { + return Err(TrySendError::UnknownPeer(target)); + } + + Ok(()) + } + + fn try_connecting_to( + connection_manager: ConnectionManager, + target: u64, + address: AdvertisedAddress, + networking_options: &NetworkingOptions, + ) -> Result>, ShutdownError> { + TaskCenter::spawn_unmanaged( + TaskKind::SocketHandler, + "metadata-store-network-connection-attempt", + { + trace!(%target, "Try connecting to metadata store peer"); + let channel = net_util::create_tonic_channel(address.clone(), networking_options); + + async move { + let mut raft_client = crate::raft::grpc_svc::raft_metadata_store_svc_client::RaftMetadataStoreSvcClient::new(channel); + let (outgoing_tx, outgoing_rx) = mpsc::channel(128); + + let mut request = ReceiverStream::new(outgoing_rx).into_streaming_request(); + // send our identity alongside with the request to the target + request.metadata_mut().insert( + RAFT_PEER_METADATA_KEY, + MetadataValue::try_from(connection_manager.identity().to_string())?, + ); + + let incoming_rx = raft_client.raft(request).await?; + + connection_manager.run_connection( + target, + outgoing_tx, + incoming_rx.into_inner(), + )?; + + Ok(()) + } + }, + ) + } +} diff --git a/crates/metadata-store/src/raft/storage.rs b/crates/metadata-store/src/raft/storage.rs index 01594da3ee..b016888bb0 100644 --- a/crates/metadata-store/src/raft/storage.rs +++ b/crates/metadata-store/src/raft/storage.rs @@ -125,9 +125,7 @@ impl RocksDbStorage { fn find_last_index(db: &DB) -> u64 { let cf = db.cf_handle(RAFT_CF).expect("RAFT_CF exists"); let start = Self::raft_entry_key(0); - // end is exclusive so switch to the next discriminator - let mut end = [0; 9]; - end[0] = RAFT_ENTRY_DISCRIMINATOR + 1; + let end = Self::raft_entry_key(u64::MAX); let mut options = ReadOptions::default(); options.set_async_io(true); diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs index 977d90af1a..8ab0032836 100644 --- a/crates/metadata-store/src/raft/store.rs +++ b/crates/metadata-store/src/raft/store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::raft::networking::Networking; use crate::raft::storage; use crate::raft::storage::RocksDbStorage; use crate::{ @@ -23,7 +24,8 @@ use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Messa use raft::{Config, RawNode}; use restate_core::cancellation_watcher; use restate_core::metadata_store::{Precondition, VersionedValue}; -use restate_types::config::Configuration; +use restate_types::config::{Configuration, RaftOptions, RocksDbOptions}; +use restate_types::live::BoxedLiveLoad; use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version}; use slog::o; @@ -64,6 +66,8 @@ pub enum Error { pub struct RaftMetadataStore { _logger: slog::Logger, raw_node: RawNode, + networking: Networking, + raft_rx: mpsc::Receiver, tick_interval: time::Interval, callbacks: HashMap, @@ -74,28 +78,38 @@ pub struct RaftMetadataStore { } impl RaftMetadataStore { - pub async fn create() -> Result { + pub async fn create( + raft_options: &RaftOptions, + rocksdb_options: BoxedLiveLoad, + mut networking: Networking, + raft_rx: mpsc::Receiver, + ) -> Result { let (request_tx, request_rx) = mpsc::channel(2); let config = Config { - id: 1, + id: raft_options.id, ..Default::default() }; - let rocksdb_options = Configuration::updateable() - .map(|configuration| &configuration.common.rocksdb) - .boxed(); let mut metadata_store_options = Configuration::updateable().map(|configuration| &configuration.metadata_store); - let mut store = + let mut storage = RocksDbStorage::create(metadata_store_options.live_load(), rocksdb_options).await?; - let conf_state = ConfState::from((vec![1], vec![])); - store.store_conf_state(conf_state).await?; + + // todo: Only write configuration on initialization + let voters: Vec<_> = raft_options.peers.keys().cloned().collect(); + let conf_state = ConfState::from((voters, vec![])); + storage.store_conf_state(conf_state).await?; + + // todo: Persist address information with configuration + for (peer, address) in &raft_options.peers { + networking.register_address(*peer, address.clone()); + } let drain = TracingSlogDrain; let logger = slog::Logger::root(drain, o!()); - let raw_node = RawNode::new(&config, store, &logger)?; + let raw_node = RawNode::new(&config, storage, &logger)?; let mut tick_interval = time::interval(Duration::from_millis(100)); tick_interval.set_missed_tick_behavior(MissedTickBehavior::Burst); @@ -104,6 +118,8 @@ impl RaftMetadataStore { // we only need to keep it alive _logger: logger, raw_node, + raft_rx, + networking, tick_interval, callbacks: HashMap::default(), kv_entries: HashMap::default(), @@ -123,6 +139,13 @@ impl RaftMetadataStore { tokio::select! { _ = &mut cancellation => { break; + }, + raft = self.raft_rx.recv() => { + if let Some(raft) = raft { + self.raw_node.step(raft)?; + } else { + break; + } } Some(request) = self.request_rx.recv() => { // todo: Unclear whether every replica should be allowed to propose. Maybe @@ -226,8 +249,12 @@ impl RaftMetadataStore { self.callbacks.insert(callback.request_id, callback); } - fn send_messages(&self, _messages: Vec) { - // todo: Send messages to other peers + fn send_messages(&mut self, messages: Vec) { + for message in messages { + if let Err(err) = self.networking.try_send(message) { + debug!("failed sending message: {err}"); + } + } } async fn handle_committed_entries( diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index b5f2c7dceb..8f939f64ed 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -246,9 +246,9 @@ impl Node { && config.has_role(Role::HttpIngress) // todo remove once the safe fallback version supports the HttpIngress role || !config - .ingress - .experimental_feature_enable_separate_ingress_role - && config.has_role(Role::Worker) + .ingress + .experimental_feature_enable_separate_ingress_role + && config.has_role(Role::Worker) { Some(IngressRole::create( updateable_config diff --git a/crates/types/src/config/metadata_store.rs b/crates/types/src/config/metadata_store.rs index b5326c1e90..071d0034bf 100644 --- a/crates/types/src/config/metadata_store.rs +++ b/crates/types/src/config/metadata_store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; use std::num::NonZeroUsize; use std::path::PathBuf; @@ -18,6 +19,7 @@ use restate_serde_util::NonZeroByteCount; use tracing::warn; use super::{data_dir, CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; +use crate::net::AdvertisedAddress; /// # Metadata store options #[serde_as] @@ -60,13 +62,13 @@ pub struct MetadataStoreOptions { pub kind: Kind, } -#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[serde(rename_all = "kebab-case")] pub enum Kind { #[default] Local, - Raft, + Raft(RaftOptions), } impl MetadataStoreOptions { @@ -122,3 +124,14 @@ impl Default for MetadataStoreOptions { } } } + +#[serde_as] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub struct RaftOptions { + pub id: u64, + #[cfg_attr(feature = "schemars", schemars(with = "Vec<(u64, String)>"))] + #[serde_as(as = "serde_with::Seq<(_, _)>")] + pub peers: HashMap, +}