From 332cf5b8e04d8903fd0032d6a01438516adc31ab Mon Sep 17 00:00:00 2001 From: tottoto Date: Mon, 25 Nov 2024 22:59:17 +0900 Subject: [PATCH] feat(reflection): Expose ReflectionService --- tonic-reflection/Cargo.toml | 1 + tonic-reflection/src/server/v1.rs | 41 ++++++++++++++++++++++---- tonic-reflection/src/server/v1alpha.rs | 41 ++++++++++++++++++++++---- 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/tonic-reflection/Cargo.toml b/tonic-reflection/Cargo.toml index df14e971a..81bced328 100644 --- a/tonic-reflection/Cargo.toml +++ b/tonic-reflection/Cargo.toml @@ -26,6 +26,7 @@ server = ["prost-types", "dep:tokio", "dep:tokio-stream"] default = ["server"] [dependencies] +pin-project = "1" prost = "0.13" prost-types = {version = "0.13", optional = true} tokio = { version = "1.0", features = ["sync", "rt"], optional = true } diff --git a/tonic-reflection/src/server/v1.rs b/tonic-reflection/src/server/v1.rs index 3a2b708a5..0b2b822d0 100644 --- a/tonic-reflection/src/server/v1.rs +++ b/tonic-reflection/src/server/v1.rs @@ -1,7 +1,8 @@ -use std::sync::Arc; +use std::{fmt, sync::Arc}; +use pin_project::pin_project; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; use super::ReflectionServiceState; @@ -13,14 +14,15 @@ use crate::pb::v1::{ ServerReflectionResponse, ServiceResponse, }; +/// An implementation for `ServerReflection`. #[derive(Debug)] -pub(super) struct ReflectionService { +pub struct ReflectionService { state: Arc, } #[tonic::async_trait] impl ServerReflection for ReflectionService { - type ServerReflectionInfoStream = ReceiverStream>; + type ServerReflectionInfoStream = ServerReflectionInfoStream; async fn server_reflection_info( &self, @@ -91,7 +93,9 @@ impl ServerReflection for ReflectionService { } }); - Ok(Response::new(ReceiverStream::new(resp_rx))) + Ok(Response::new(ServerReflectionInfoStream( + ReceiverStream::new(resp_rx), + ))) } } @@ -102,3 +106,30 @@ impl From for ReflectionService { } } } + +/// A response stream. +#[pin_project] +pub struct ServerReflectionInfoStream( + #[pin] ReceiverStream>, +); + +impl Stream for ServerReflectionInfoStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().0.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl fmt::Debug for ServerReflectionInfoStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("ServerReflectionInfoStream").finish() + } +} diff --git a/tonic-reflection/src/server/v1alpha.rs b/tonic-reflection/src/server/v1alpha.rs index 877a6a192..a6ba7c609 100644 --- a/tonic-reflection/src/server/v1alpha.rs +++ b/tonic-reflection/src/server/v1alpha.rs @@ -1,7 +1,8 @@ -use std::sync::Arc; +use std::{fmt, sync::Arc}; +use pin_project::pin_project; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; use super::ReflectionServiceState; @@ -13,14 +14,15 @@ use crate::pb::v1alpha::{ ServerReflectionResponse, ServiceResponse, }; +/// An implementation for `ServerReflection`. #[derive(Debug)] -pub(super) struct ReflectionService { +pub struct ReflectionService { state: Arc, } #[tonic::async_trait] impl ServerReflection for ReflectionService { - type ServerReflectionInfoStream = ReceiverStream>; + type ServerReflectionInfoStream = ServerReflectionInfoStream; async fn server_reflection_info( &self, @@ -91,7 +93,9 @@ impl ServerReflection for ReflectionService { } }); - Ok(Response::new(ReceiverStream::new(resp_rx))) + Ok(Response::new(ServerReflectionInfoStream( + ReceiverStream::new(resp_rx), + ))) } } @@ -102,3 +106,30 @@ impl From for ReflectionService { } } } + +/// A response stream. +#[pin_project] +pub struct ServerReflectionInfoStream( + #[pin] ReceiverStream>, +); + +impl Stream for ServerReflectionInfoStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().0.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl fmt::Debug for ServerReflectionInfoStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("ServerReflectionInfoStream").finish() + } +}