Skip to content

Commit

Permalink
feat(channel): Add Change type to make tower internal dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Sep 30, 2024
1 parent 97ae25d commit c5af4b2
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 11 deletions.
3 changes: 1 addition & 2 deletions examples/src/dynamic_load_balance/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ pub mod pb {
}

use pb::{echo_client::EchoClient, EchoRequest};
use tonic::transport::channel::Change;
use tonic::transport::Channel;

use tonic::transport::Endpoint;

use std::sync::Arc;

use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use tokio::time::timeout;
use tower::discover::Change;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down
1 change: 0 additions & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ allowed_external_types = [
"futures_core::stream::Stream",
"h2::error::Error",
"http_body_util::combinators::box_body::UnsyncBoxBody",
"tower::discover::Change",
"tower_service::Service",
"tower_layer::Layer",
"tower_layer::stack::Stack",
Expand Down
3 changes: 2 additions & 1 deletion tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod service;
#[cfg(feature = "tls")]
mod tls;

pub use self::service::Change;
pub use endpoint::Endpoint;
#[cfg(feature = "tls")]
pub use tls::ClientTlsConfig;
Expand All @@ -30,7 +31,7 @@ use hyper::rt;
use tower::balance::p2c::Balance;
use tower::{
buffer::{self, Buffer},
discover::{Change, Discover},
discover::Discover,
util::{BoxService, Either},
Service,
};
Expand Down
20 changes: 13 additions & 7 deletions tonic/src/transport/channel/service/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ use std::{
task::{Context, Poll},
};
use tokio::sync::mpsc::Receiver;

use tokio_stream::Stream;
use tower::discover::Change;

type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;
use tower::discover::Change as TowerChange;

/// A change in the service set.
#[derive(Debug, Clone)]
pub enum Change<K, V> {
/// A new service identified by key `K` was identified.
Insert(K, V),
/// The service identified by key `K` disappeared.
Remove(K),
}

pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
changes: Receiver<Change<K, Endpoint>>,
Expand All @@ -24,7 +30,7 @@ impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
}

impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
type Item = DiscoverResult<K, Connection, crate::Error>;
type Item = Result<TowerChange<K, Connection>, crate::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let c = &mut self.changes;
Expand All @@ -39,10 +45,10 @@ impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
http.enforce_http(false);

let connection = Connection::lazy(endpoint.connector(http), endpoint);
let change = Ok(Change::Insert(k, connection));
let change = Ok(TowerChange::Insert(k, connection));
Poll::Ready(Some(change))
}
Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))),
Change::Remove(k) => Poll::Ready(Some(Ok(TowerChange::Remove(k)))),
},
}
}
Expand Down
1 change: 1 addition & 0 deletions tonic/src/transport/channel/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod connection;
pub(super) use self::connection::Connection;

mod discover;
pub use self::discover::Change;
pub(super) use self::discover::DynamicServiceStream;

mod io;
Expand Down

0 comments on commit c5af4b2

Please sign in to comment.