From 5568ea233883236d8d673f06f4cb60e08bfa7397 Mon Sep 17 00:00:00 2001 From: tottoto Date: Fri, 6 Sep 2024 15:45:33 +0900 Subject: [PATCH] feat(channel): Add Change type to make tower internal dependency --- examples/src/dynamic_load_balance/client.rs | 3 +-- tonic/Cargo.toml | 1 - tonic/src/transport/channel/mod.rs | 3 ++- .../src/transport/channel/service/discover.rs | 20 ++++++++++++------- tonic/src/transport/channel/service/mod.rs | 1 + 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/examples/src/dynamic_load_balance/client.rs b/examples/src/dynamic_load_balance/client.rs index 45a524b8c..1f7ad0949 100644 --- a/examples/src/dynamic_load_balance/client.rs +++ b/examples/src/dynamic_load_balance/client.rs @@ -3,15 +3,14 @@ pub mod pb { } use pb::{echo_client::EchoClient, EchoRequest}; +use tonic::transport::channel::Change; use tonic::transport::Channel; - use tonic::transport::Endpoint; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use tokio::time::timeout; -use tower::discover::Change; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index bf32e23c1..02a0a3a0c 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -133,7 +133,6 @@ allowed_external_types = [ "futures_core::stream::Stream", "h2::error::Error", "http_body_util::combinators::box_body::UnsyncBoxBody", - "tower::discover::Change", "tower_service::Service", "tower_layer::Layer", "tower_layer::stack::Stack", diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index ed8d22275..74fc0227c 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod service; #[cfg(feature = "tls")] mod tls; +pub use self::service::Change; pub use endpoint::Endpoint; #[cfg(feature = "tls")] pub use tls::ClientTlsConfig; @@ -30,7 +31,7 @@ use hyper::rt; use tower::balance::p2c::Balance; use tower::{ buffer::{self, Buffer}, - discover::{Change, Discover}, + discover::Discover, util::{BoxService, Either}, Service, }; diff --git a/tonic/src/transport/channel/service/discover.rs b/tonic/src/transport/channel/service/discover.rs index b1d3c3331..564640973 100644 --- a/tonic/src/transport/channel/service/discover.rs +++ b/tonic/src/transport/channel/service/discover.rs @@ -7,11 +7,17 @@ use std::{ task::{Context, Poll}, }; use tokio::sync::mpsc::Receiver; - use tokio_stream::Stream; -use tower::discover::Change; - -type DiscoverResult = Result, E>; +use tower::discover::Change as TowerChange; + +/// A change in the service set. +#[derive(Debug, Clone)] +pub enum Change { + /// A new service identified by key `K` was identified. + Insert(K, V), + /// The service identified by key `K` disappeared. + Remove(K), +} pub(crate) struct DynamicServiceStream { changes: Receiver>, @@ -24,7 +30,7 @@ impl DynamicServiceStream { } impl Stream for DynamicServiceStream { - type Item = DiscoverResult; + type Item = Result, crate::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let c = &mut self.changes; @@ -39,10 +45,10 @@ impl Stream for DynamicServiceStream { http.enforce_http(false); let connection = Connection::lazy(endpoint.connector(http), endpoint); - let change = Ok(Change::Insert(k, connection)); + let change = Ok(TowerChange::Insert(k, connection)); Poll::Ready(Some(change)) } - Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))), + Change::Remove(k) => Poll::Ready(Some(Ok(TowerChange::Remove(k)))), }, } } diff --git a/tonic/src/transport/channel/service/mod.rs b/tonic/src/transport/channel/service/mod.rs index cd481e9a4..ba795f566 100644 --- a/tonic/src/transport/channel/service/mod.rs +++ b/tonic/src/transport/channel/service/mod.rs @@ -11,6 +11,7 @@ mod connection; pub(super) use self::connection::Connection; mod discover; +pub use self::discover::Change; pub(super) use self::discover::DynamicServiceStream; mod io;