diff --git a/crates/Cargo.toml b/crates/Cargo.toml index 6085357ca5..ad1f3a32f0 100644 --- a/crates/Cargo.toml +++ b/crates/Cargo.toml @@ -18,3 +18,4 @@ panic = "abort" tonic = { git = "https://github.com/hyperium/tonic" } tonic-web = { git = "https://github.com/hyperium/tonic", branch = "lucio/grpc-web-client" } tonic-build = { git = "https://github.com/hyperium/tonic" } +tower-http = { git = "https://github.com/tower-rs/tower-http", branch = "lucio/grpc-defaults" } diff --git a/crates/replication/Cargo.toml b/crates/replication/Cargo.toml index d886b1cc40..2c093d2358 100644 --- a/crates/replication/Cargo.toml +++ b/crates/replication/Cargo.toml @@ -28,6 +28,7 @@ serde_json = "1.0.103" serde = { version = "1.0.173", features = ["serde_derive"] } tonic = { version = "0.9", features = ["tls", "tls-roots", "tls-webpki-roots"] } tonic-web = "0.9" +tower-http = { version = "0.4", features = ["trace", "util"] } prost = "0.11" hyper = "0.14" hyper-rustls = { version = "0.24", features = ["webpki-roots"] } diff --git a/crates/replication/src/client.rs b/crates/replication/src/client.rs index b1a269c0e4..60dc28414a 100644 --- a/crates/replication/src/client.rs +++ b/crates/replication/src/client.rs @@ -3,19 +3,27 @@ use std::{ task::{Context, Poll}, }; -use hyper::{client::HttpConnector, Client}; -use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +use hyper::Client; +use hyper_rustls::HttpsConnectorBuilder; use tonic::body::BoxBody; use tonic_web::{GrpcWebCall, GrpcWebClientService}; -use tower::Service; +use tower::{util::BoxCloneService, Service, ServiceBuilder}; +use tower_http::{classify, trace, ServiceBuilderExt}; + +type ResponseBody = trace::ResponseBody< + GrpcWebCall, + classify::GrpcEosErrorsAsFailures, + trace::DefaultOnBodyChunk, + trace::DefaultOnEos, + trace::DefaultOnFailure, +>; #[derive(Debug, Clone)] -pub struct H2cChannel { - client: GrpcWebClientService, GrpcWebCall>>, +pub struct GrpcChannel { + client: BoxCloneService, http::Response, hyper::Error>, } -impl H2cChannel { - #[allow(unused)] +impl GrpcChannel { pub fn new() -> Self { let https = HttpsConnectorBuilder::new() .with_webpki_roots() @@ -26,12 +34,16 @@ impl H2cChannel { let client = Client::builder().build(https); let client = GrpcWebClientService::new(client); + let svc = ServiceBuilder::new().trace_for_grpc().service(client); + + let client = BoxCloneService::new(svc); + Self { client } } } -impl Service> for H2cChannel { - type Response = http::Response>; +impl Service> for GrpcChannel { + type Response = http::Response; type Error = hyper::Error; type Future = Pin> + Send>>; diff --git a/crates/replication/src/lib.rs b/crates/replication/src/lib.rs index a12fa9b564..7df5b0920e 100644 --- a/crates/replication/src/lib.rs +++ b/crates/replication/src/lib.rs @@ -31,10 +31,10 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::mpsc::Sender; -use crate::client::H2cChannel; +use crate::client::GrpcChannel; use crate::pb::HelloRequest; -type RpcClient = pb::ReplicationLogClient>; +type RpcClient = pb::ReplicationLogClient>; pub struct Replicator { pub frames_sender: Sender, @@ -155,7 +155,7 @@ impl Replicator { //.tls_config(tonic::transport::ClientTlsConfig::new())? //.connect_lazy(); - let channel = H2cChannel::new(); + let channel = GrpcChannel::new(); let mut client = pb::ReplicationLogClient::with_origin( InterceptedService::new(channel, AuthInterceptor(auth_token)),