Skip to content

Commit

Permalink
Move networking and connection manager into network module
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 27, 2024
1 parent e3d1331 commit 9ebbcf6
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 21 deletions.
1 change: 1 addition & 0 deletions crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod grpc_svc;
pub mod local;
pub mod raft;
mod util;
mod network;

use bytestring::ByteString;
use restate_core::metadata_store::VersionedValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
Expand All @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::raft::grpc_svc::RaftMessage;
use futures::StreamExt;
use protobuf::Message as ProtobufMessage;
use raft::prelude::Message;
Expand All @@ -20,6 +19,7 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::BoxStream;
use tracing::{debug, instrument};
use crate::network::grpc_svc::RaftMessage;

#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
Expand All @@ -8,12 +8,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::raft::connection_manager::{ConnectionError, ConnectionManager};
use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc;
use crate::raft::grpc_svc::RaftMessage;
use std::str::FromStr;
use tonic::codegen::BoxStream;
use tonic::{Request, Response, Status, Streaming};
use crate::network::connection_manager::ConnectionError;
use crate::network::ConnectionManager;
use crate::network::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc;
use crate::network::grpc_svc::RaftMessage;

pub const RAFT_PEER_METADATA_KEY: &str = "x-restate-raft-peer";

Expand Down
20 changes: 20 additions & 0 deletions crates/metadata-store/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod connection_manager;
mod grpc_svc;
mod handler;
mod networking;

pub use connection_manager::ConnectionManager;
pub use grpc_svc::FILE_DESCRIPTOR_SET;
pub use grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvcServer;
pub use handler::RaftMetadataStoreHandler;
pub use networking::Networking;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
Expand All @@ -8,9 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::raft::connection_manager::ConnectionManager;
use crate::raft::grpc_svc::RaftMessage;
use crate::raft::handler::RAFT_PEER_METADATA_KEY;
use bytes::{BufMut, BytesMut};
use futures::FutureExt;
use protobuf::Message as ProtobufMessage;
Expand All @@ -26,6 +23,9 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::MetadataValue;
use tonic::IntoStreamingRequest;
use tracing::{debug, trace};
use crate::network::connection_manager::ConnectionManager;
use crate::network::grpc_svc::RaftMessage;
use crate::network::handler::RAFT_PEER_METADATA_KEY;

#[derive(Debug, thiserror::Error)]
pub enum TrySendError<T> {
Expand Down Expand Up @@ -131,7 +131,7 @@ impl Networking {
);

async move {
let mut raft_client = crate::raft::grpc_svc::raft_metadata_store_svc_client::RaftMetadataStoreSvcClient::new(channel);
let mut raft_client = crate::network::grpc_svc::raft_metadata_store_svc_client::RaftMetadataStoreSvcClient::new(channel);
let (outgoing_tx, outgoing_rx) = mpsc::channel(128);

let mut request = ReceiverStream::new(outgoing_rx).into_streaming_request();
Expand Down
4 changes: 0 additions & 4 deletions crates/metadata-store/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod connection_manager;
pub mod grpc_svc;
mod handler;
mod networking;
pub mod service;
mod storage;
mod store;
9 changes: 4 additions & 5 deletions crates/metadata-store/src/raft/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@ use crate::grpc::handler::MetadataStoreHandler;
use crate::grpc::server::GrpcServer;
use crate::grpc::service_builder::GrpcServiceBuilder;
use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer;
use crate::raft::connection_manager::ConnectionManager;
use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvcServer;
use crate::raft::handler::RaftMetadataStoreHandler;
use crate::raft::networking::Networking;
use crate::raft::store::RaftMetadataStore;
use crate::{grpc_svc, Error, MetadataStoreService};
use crate::{grpc_svc, network, Error, MetadataStoreService};
use assert2::let_assert;
use futures::TryFutureExt;
use restate_core::{TaskCenter, TaskKind};
Expand All @@ -26,6 +22,7 @@ use restate_types::health::HealthStatus;
use restate_types::live::BoxedLiveLoad;
use restate_types::protobuf::common::MetadataServerStatus;
use tokio::sync::mpsc;
use crate::network::{ConnectionManager, Networking, RaftMetadataStoreHandler, RaftMetadataStoreSvcServer};

pub struct RaftMetadataStoreService {
health_status: HealthStatus<MetadataServerStatus>,
Expand Down Expand Up @@ -70,6 +67,8 @@ impl MetadataStoreService for RaftMetadataStoreService {
builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new(
store.request_sender(),
)));

builder.register_file_descriptor_set_for_reflection(network::FILE_DESCRIPTOR_SET);
builder.add_service(RaftMetadataStoreSvcServer::new(
RaftMetadataStoreHandler::new(connection_manager),
));
Expand Down
2 changes: 1 addition & 1 deletion crates/metadata-store/src/raft/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::raft::networking::Networking;
use crate::raft::storage;
use crate::raft::storage::RocksDbStorage;
use crate::{
Expand All @@ -35,6 +34,7 @@ use tokio::time::MissedTickBehavior;
use tracing::{debug, info, warn};
use tracing_slog::TracingSlogDrain;
use ulid::Ulid;
use crate::network::Networking;

#[derive(Debug, thiserror::Error)]
pub enum BuildError {
Expand Down

0 comments on commit 9ebbcf6

Please sign in to comment.