Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor use of tokio-stream #1560

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/src/mock/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn(async move {
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
});

Expand Down
6 changes: 3 additions & 3 deletions interop/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAsserti
}

pub async fn empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
let stream = tokio_stream::iter(Vec::new());
let stream = tokio_stream::empty();
let result = client.full_duplex_call(Request::new(stream)).await;

assertions.push(test_assert!(
Expand Down Expand Up @@ -270,7 +270,7 @@ pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut V
let result = client.unary_call(Request::new(simple_req)).await;
validate_response(result, assertions);

let stream = tokio_stream::iter(vec![duplex_req]);
let stream = tokio_stream::once(duplex_req);
let result = match client.full_duplex_call(Request::new(stream)).await {
Ok(response) => {
let stream = response.into_inner();
Expand Down Expand Up @@ -356,7 +356,7 @@ pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestA
req_unary.metadata_mut().insert(key1, value1.clone());
req_unary.metadata_mut().insert_bin(key2, value2.clone());

let stream = tokio_stream::iter(vec![make_ping_pong_request(0)]);
let stream = tokio_stream::once(make_ping_pong_request(0));
let mut req_stream = Request::new(stream);
req_stream.metadata_mut().insert(key1, value1.clone());
req_stream.metadata_mut().insert_bin(key2, value2.clone());
Expand Down
2 changes: 1 addition & 1 deletion interop/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl pb::test_service_server::TestService for TestService {
return Err(status);
}

let single_message = tokio_stream::iter(vec![Ok(first_msg)]);
let single_message = tokio_stream::once(Ok(first_msg));
let mut stream = single_message.chain(stream);

let stream = try_stream! {
Expand Down
2 changes: 1 addition & 1 deletion tests/compression/src/bidirectional_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down
11 changes: 5 additions & 6 deletions tests/compression/src/client_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -75,7 +75,7 @@ async fn client_disabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand All @@ -102,7 +102,7 @@ async fn client_enabled_server_disabled() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn compressing_response_from_client_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand All @@ -156,8 +156,7 @@ async fn compressing_response_from_client_stream() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);

let stream = tokio_stream::iter(vec![]);
let req = Request::new(Box::pin(stream));
let req = Request::new(Box::pin(tokio_stream::empty()));

let res = client.compress_output_client_stream(req).await.unwrap();
assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/compressing_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn client_enabled_server_disabled() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn client_mark_compressed_without_header_server_enabled() {
async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down
17 changes: 8 additions & 9 deletions tests/compression/src/compressing_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn client_enabled_server_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -160,7 +160,7 @@ async fn client_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn server_replying_with_unsupported_encoding() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down Expand Up @@ -240,7 +240,7 @@ async fn disabling_compression_on_single_response() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -337,7 +337,7 @@ async fn disabling_compression_on_response_from_client_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand All @@ -346,8 +346,7 @@ async fn disabling_compression_on_response_from_client_stream() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);

let stream = tokio_stream::iter(vec![]);
let req = Request::new(Box::pin(stream));
let req = Request::new(Box::pin(tokio_stream::empty()));

let res = client.compress_output_client_stream(req).await.unwrap();
assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/server_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -80,7 +80,7 @@ async fn client_disabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn client_enabled_server_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/tests/max_message_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn response_stream_limit() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down Expand Up @@ -317,7 +317,7 @@ async fn max_message_run(case: &TestCase) -> Result<(), Status> {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/tests/routes_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn multiple_service_using_routes_builder() {
let output = Output1 {
buf: request.into_inner().buf,
};
let stream = tokio_stream::iter(vec![Ok(output)]);
let stream = tokio_stream::once(Ok(output));

Ok(Response::new(Box::pin(stream)))
}
Expand Down
2 changes: 1 addition & 1 deletion tonic-reflection/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn make_test_reflection_request(request: ServerReflectionRequest) -> Messa
.unwrap();
let mut client = ServerReflectionClient::new(conn);

let request = Request::new(tokio_stream::iter(vec![request]));
let request = Request::new(tokio_stream::once(request));
let mut inbound = client
.server_reflection_info(request)
.await
Expand Down
Loading