Skip to content

Commit

Permalink
Fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
ikrivosheev committed Jan 13, 2024
1 parent 60eef43 commit 03a428d
Show file tree
Hide file tree
Showing 16 changed files with 130 additions and 140 deletions.
2 changes: 1 addition & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ h2 = { version = "0.4", optional = true }
hyper = { version = "1.0", features = ["full"], optional = true }
hyper-util = { version = "0.1", features = ["full"] }
hyper-timeout = { version = "0.5", optional = true }
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["net"] }
tower = { version = "0.4.7", default-features = false, features = [
"balance",
"buffer",
Expand Down
29 changes: 16 additions & 13 deletions tonic/benches/decode.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use bencher::{benchmark_group, benchmark_main, Bencher};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http_body::Body;
use std::{
fmt::{Error, Formatter},
pin::Pin,
task::{Context, Poll},
};

use bencher::{benchmark_group, benchmark_main, Bencher};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http_body::{Body, Frame, SizeHint};

use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming};

macro_rules! bench {
Expand Down Expand Up @@ -58,23 +60,24 @@ impl Body for MockBody {
type Data = Bytes;
type Error = Status;

fn poll_data(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.data.has_remaining() {
let split = std::cmp::min(self.chunk_size, self.data.remaining());
Poll::Ready(Some(Ok(self.data.split_to(split))))
Poll::Ready(Some(Ok(Frame::data(self.data.split_to(split)))))
} else {
Poll::Ready(None)
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
fn is_end_stream(&self) -> bool {
!self.data.is_empty()
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(self.data.len() as u64)
}
}

Expand Down
7 changes: 5 additions & 2 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{body::BoxBody, metadata::MetadataMap, Code, Status};
use bytes::{Buf, BufMut, BytesMut};
use http::StatusCode;
use http_body::Body;
use http_body_util::BodyExt;
use std::{
fmt, future,
pin::Pin,
Expand Down Expand Up @@ -122,7 +123,9 @@ impl<T> Streaming<T> {
decoder: Box::new(decoder),
inner: StreamingInner {
body: body
.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
.map_frame(|mut frame| {
frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
})
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
Expand Down Expand Up @@ -231,7 +234,7 @@ impl StreamingInner {

// Returns Some(()) if data was found or None if the loop in `poll_next` should break
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(status)) => {
if self.direction == Direction::Request && status.code() == Code::Cancelled {
Expand Down
1 change: 0 additions & 1 deletion tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,3 @@ where
self.state.is_end_stream
}
}

1 change: 0 additions & 1 deletion tonic/src/codec/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,3 @@ mod tests {
}
}
}

1 change: 0 additions & 1 deletion tonic/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,3 @@ impl GrpcMethod {
self.method
}
}

1 change: 0 additions & 1 deletion tonic/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,4 +1003,3 @@ mod tests {
assert_eq!(status.details(), DETAILS);
}
}

12 changes: 6 additions & 6 deletions tonic/src/transport/channel/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::transport::service::TlsConnector;
use crate::transport::{service::SharedExec, Error, Executor};
use bytes::Bytes;
use http::{uri::Uri, HeaderValue};
use hyper::rt;
use std::{fmt, future::Future, pin::Pin, str::FromStr, time::Duration};
use tower::make::MakeConnection;
// use crate::transport::E
use tower_service::Service;

/// Channel builder.
///
Expand Down Expand Up @@ -359,8 +359,8 @@ impl Endpoint {
/// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied.
pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
where
C: MakeConnection<Uri> + Send + 'static,
C::Connection: Unpin + Send + 'static,
C: Service<Uri> + Send + 'static,
C::Response: rt::Read + rt::Write + Send + Unpin + 'static,
C::Future: Send + 'static,
crate::Error: From<C::Error> + Send + 'static,
{
Expand All @@ -384,8 +384,8 @@ impl Endpoint {
/// uses a Unix socket transport.
pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel
where
C: MakeConnection<Uri> + Send + 'static,
C::Connection: Unpin + Send + 'static,
C: Service<Uri> + Send + 'static,
C::Response: rt::Read + rt::Write + Send + Unpin + 'static,
C::Future: Send + 'static,
crate::Error: From<C::Error> + Send + 'static,
{
Expand Down
12 changes: 5 additions & 7 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::{channel, Sender},
};
use tokio::sync::mpsc::{channel, Sender};

use axum::{extract::Request, response::Response, body::Body};
use axum::{body::Body, extract::Request, response::Response};
use hyper::rt;
use tower::balance::p2c::Balance;
use tower::{
buffer::{self, Buffer},
Expand Down Expand Up @@ -149,7 +147,7 @@ impl Channel {
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
{
let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
let executor = endpoint.executor.clone();
Expand All @@ -166,7 +164,7 @@ impl Channel {
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
{
let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
let executor = endpoint.executor.clone();
Expand Down
5 changes: 3 additions & 2 deletions tonic/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ pub use self::service::grpc_timeout::TimeoutExpired;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::tls::Certificate;
pub use axum::{body::BoxBody as AxumBoxBody, Router as AxumRouter};
pub use hyper::{Body, Uri};
pub use axum::{body::Body as AxumBoxBody, Router as AxumRouter};
pub use hyper::body::Body;
pub use hyper::Uri;

pub(crate) use self::service::executor::Executor;

Expand Down
9 changes: 5 additions & 4 deletions tonic/src/transport/server/conn.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::net::SocketAddr;
use tokio::net::TcpStream;

#[cfg(feature = "tls")]
use crate::transport::Certificate;
#[cfg(feature = "tls")]
use std::sync::Arc;

use tokio::net::TcpStream;
#[cfg(feature = "tls")]
use tokio_rustls::server::TlsStream;

#[cfg(feature = "tls")]
use crate::transport::Certificate;

/// Trait that connected IO resources implement and use to produce info about the connection.
///
/// The goal for this trait is to allow users to implement
Expand Down
Loading

0 comments on commit 03a428d

Please sign in to comment.