diff --git a/examples/rest-grpc-multiplex/Cargo.toml b/examples/rest-grpc-multiplex/Cargo.toml index 11a6a3a2b4..1e28fd0b25 100644 --- a/examples/rest-grpc-multiplex/Cargo.toml +++ b/examples/rest-grpc-multiplex/Cargo.toml @@ -5,16 +5,17 @@ edition = "2021" publish = false [dependencies] -axum = { path = "../../axum" } +# needs to match tonic +axum = { version = "0.7.5" } futures = "0.3" hyper = { version = "1.0.0", features = ["full"] } -prost = "0.11" +prost = "0.13.1" tokio = { version = "1", features = ["full"] } -tonic = { version = "0.9" } -tonic-reflection = "0.9" +tonic = "0.12" +tonic-reflection = "0.12" tower = { version = "0.4", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } [build-dependencies] -tonic-build = { version = "0.9", features = ["prost"] } +tonic-build = { version = "0.12", features = ["prost"] } diff --git a/examples/rest-grpc-multiplex/src/main.rs b/examples/rest-grpc-multiplex/src/main.rs index 3ba0c0a3ee..c837f0fbd8 100644 --- a/examples/rest-grpc-multiplex/src/main.rs +++ b/examples/rest-grpc-multiplex/src/main.rs @@ -4,84 +4,90 @@ //! cargo run -p example-rest-grpc-multiplex //! ``` -// TODO -fn main() { - eprint!("this example has not yet been updated to hyper 1.0"); -} +use axum::{extract::Request, http::header::CONTENT_TYPE, routing::get, Router}; +use proto::{ + greeter_server::{Greeter, GreeterServer}, + HelloReply, HelloRequest, +}; +use std::net::SocketAddr; +use tonic::{Request as TonicRequest, Response as TonicResponse, Status}; +use tower::{steer::Steer, make::Shared}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -// use self::multiplex_service::MultiplexService; -// use axum::{routing::get, Router}; -// use proto::{ -// greeter_server::{Greeter, GreeterServer}, -// HelloReply, HelloRequest, -// }; -// use std::net::SocketAddr; -// use tonic::{Response as TonicResponse, Status}; -// use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +mod proto { + tonic::include_proto!("helloworld"); -// mod multiplex_service; + pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("helloworld_descriptor"); +} -// mod proto { -// tonic::include_proto!("helloworld"); +#[derive(Default)] +struct GrpcServiceImpl {} -// pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = -// tonic::include_file_descriptor_set!("helloworld_descriptor"); -// } +#[tonic::async_trait] +impl Greeter for GrpcServiceImpl { + async fn say_hello( + &self, + request: TonicRequest, + ) -> Result, Status> { + tracing::info!("Got a gRPC request from {:?}", request.remote_addr()); -// #[derive(Default)] -// struct GrpcServiceImpl {} + let reply = HelloReply { + message: format!("Hello {}!", request.into_inner().name), + }; -// #[tonic::async_trait] -// impl Greeter for GrpcServiceImpl { -// async fn say_hello( -// &self, -// request: tonic::Request, -// ) -> Result, Status> { -// tracing::info!("Got a request from {:?}", request.remote_addr()); + Ok(TonicResponse::new(reply)) + } +} -// let reply = HelloReply { -// message: format!("Hello {}!", request.into_inner().name), -// }; +async fn web_root() -> &'static str { + tracing::info!("Got a REST request"); -// Ok(TonicResponse::new(reply)) -// } -// } + "Hello, World!" +} -// async fn web_root() -> &'static str { -// "Hello, World!" -// } +#[tokio::main] +async fn main() { + // initialize tracing + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "example_rest_grpc_multiplex=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); -// #[tokio::main] -// async fn main() { -// // initialize tracing -// tracing_subscriber::registry() -// .with( -// tracing_subscriber::EnvFilter::try_from_default_env() -// .unwrap_or_else(|_| "example_rest_grpc_multiplex=debug".into()), -// ) -// .with(tracing_subscriber::fmt::layer()) -// .init(); + // build the rest service + let rest = Router::new().route("/", get(web_root)); -// // build the rest service -// let rest = Router::new().route("/", get(web_root)); + // build the grpc service + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET) + .build() + .unwrap(); -// // build the grpc service -// let reflection_service = tonic_reflection::server::Builder::configure() -// .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET) -// .build() -// .unwrap(); -// let grpc = tonic::transport::Server::builder() -// .add_service(reflection_service) -// .add_service(GreeterServer::new(GrpcServiceImpl::default())) -// .into_service(); + let grpc = tonic::transport::Server::builder() + .add_service(reflection_service) + .add_service(GreeterServer::new(GrpcServiceImpl::default())) + .into_router(); -// // combine them into one service -// let service = MultiplexService::new(rest, grpc); + // combine them into one service + let service = Steer::new(vec![rest, grpc], |req: &Request, _services: &[_]| { + if req + .headers() + .get(CONTENT_TYPE) + .map(|content_type| content_type.as_bytes()) + .filter(|content_type| content_type.starts_with(b"application/grpc")) + .is_some() + { + 1 + } else { + 0 + } + }); -// let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); -// tracing::debug!("listening on {}", addr); -// hyper::Server::bind(&addr) -// .serve(tower::make::Shared::new(service)) -// .await -// .unwrap(); -// } + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + tracing::debug!("listening on {}", addr); + axum::serve(listener, Shared::new(service)).await.unwrap(); +} diff --git a/examples/rest-grpc-multiplex/src/multiplex_service.rs b/examples/rest-grpc-multiplex/src/multiplex_service.rs deleted file mode 100644 index 80b612e12e..0000000000 --- a/examples/rest-grpc-multiplex/src/multiplex_service.rs +++ /dev/null @@ -1,118 +0,0 @@ -use axum::{ - body::Body, - extract::Request, - http::header::CONTENT_TYPE, - response::{IntoResponse, Response}, -}; -use futures::{future::BoxFuture, ready}; -use std::{ - convert::Infallible, - task::{Context, Poll}, -}; -use tower::Service; - -pub struct MultiplexService { - rest: A, - rest_ready: bool, - grpc: B, - grpc_ready: bool, -} - -impl MultiplexService { - pub fn new(rest: A, grpc: B) -> Self { - Self { - rest, - rest_ready: false, - grpc, - grpc_ready: false, - } - } -} - -impl Clone for MultiplexService -where - A: Clone, - B: Clone, -{ - fn clone(&self) -> Self { - Self { - rest: self.rest.clone(), - grpc: self.grpc.clone(), - // the cloned services probably wont be ready - rest_ready: false, - grpc_ready: false, - } - } -} - -impl Service> for MultiplexService -where - A: Service, Error = Infallible>, - A::Response: IntoResponse, - A::Future: Send + 'static, - B: Service>, - B::Response: IntoResponse, - B::Future: Send + 'static, -{ - type Response = Response; - type Error = B::Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // drive readiness for each inner service and record which is ready - loop { - match (self.rest_ready, self.grpc_ready) { - (true, true) => { - return Ok(()).into(); - } - (false, _) => { - ready!(self.rest.poll_ready(cx)).map_err(|err| match err {})?; - self.rest_ready = true; - } - (_, false) => { - ready!(self.grpc.poll_ready(cx))?; - self.grpc_ready = true; - } - } - } - } - - fn call(&mut self, req: Request) -> Self::Future { - // require users to call `poll_ready` first, if they don't we're allowed to panic - // as per the `tower::Service` contract - assert!( - self.grpc_ready, - "grpc service not ready. Did you forget to call `poll_ready`?" - ); - assert!( - self.rest_ready, - "rest service not ready. Did you forget to call `poll_ready`?" - ); - - // if we get a grpc request call the grpc service, otherwise call the rest service - // when calling a service it becomes not-ready so we have drive readiness again - if is_grpc_request(&req) { - self.grpc_ready = false; - let future = self.grpc.call(req); - Box::pin(async move { - let res = future.await?; - Ok(res.into_response()) - }) - } else { - self.rest_ready = false; - let future = self.rest.call(req); - Box::pin(async move { - let res = future.await.map_err(|err| match err {})?; - Ok(res.into_response()) - }) - } - } -} - -fn is_grpc_request(req: &Request) -> bool { - req.headers() - .get(CONTENT_TYPE) - .map(|content_type| content_type.as_bytes()) - .filter(|content_type| content_type.starts_with(b"application/grpc")) - .is_some() -}