From 96733179b7ff613af4afb4f984e2c61d58d5d79e Mon Sep 17 00:00:00 2001 From: Mathieu Barbin Date: Sat, 6 Jan 2024 15:41:45 +0100 Subject: [PATCH] simplify connection (unused untyped api) --- lib/grpc-eio/client.ml | 6 +-- lib/grpc-eio/connection.ml | 100 ++++++++++++++++-------------------- lib/grpc-eio/connection.mli | 33 +++++------- lib/grpc-eio/server.ml | 5 +- 4 files changed, 61 insertions(+), 83 deletions(-) diff --git a/lib/grpc-eio/client.ml b/lib/grpc-eio/client.ml index 95d20b2..7d2aa47 100644 --- a/lib/grpc-eio/client.ml +++ b/lib/grpc-eio/client.ml @@ -69,15 +69,15 @@ let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request) let make_handler ~encode_request ~decode_response ~f write_body read_body = let response_reader, response_writer = Seq.create_reader_writer () in let request_reader, request_writer = Seq.create_reader_writer () in - Connection.Typed.grpc_recv_streaming ~decode:decode_response read_body + Connection.grpc_recv_streaming ~decode:decode_response read_body response_writer; let res, res_notify = Eio.Promise.create () in Eio.Fiber.both (fun () -> Eio.Promise.resolve res_notify (f request_writer response_reader)) (fun () -> - Connection.Typed.grpc_send_streaming_client ~encode:encode_request - write_body request_reader); + Connection.grpc_send_streaming_client ~encode:encode_request write_body + request_reader); Eio.Promise.await res module Typed_rpc = struct diff --git a/lib/grpc-eio/connection.ml b/lib/grpc-eio/connection.ml index 9fa8292..31f6930 100644 --- a/lib/grpc-eio/connection.ml +++ b/lib/grpc-eio/connection.ml @@ -1,59 +1,47 @@ -module Typed = struct - let grpc_recv_streaming ~decode body message_buffer_writer = - let request_buffer = Grpc.Buffer.v () in - let on_eof () = Seq.close_writer message_buffer_writer in - let rec on_read buffer ~off ~len = - Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer - ~dst:request_buffer ~length:len; - Grpc.Message.extract_all - (fun message -> Seq.write message_buffer_writer (decode message)) - request_buffer; - H2.Body.Reader.schedule_read body ~on_read ~on_eof - in +let grpc_recv_streaming ~decode body message_buffer_writer = + let request_buffer = Grpc.Buffer.v () in + let on_eof () = Seq.close_writer message_buffer_writer in + let rec on_read buffer ~off ~len = + Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer + ~dst:request_buffer ~length:len; + Grpc.Message.extract_all + (fun message -> Seq.write message_buffer_writer (decode message)) + request_buffer; H2.Body.Reader.schedule_read body ~on_read ~on_eof + in + H2.Body.Reader.schedule_read body ~on_read ~on_eof - let grpc_send_streaming_client ~encode body encoder_stream = - Seq.iter - (fun encoder -> - let payload = Grpc.Message.make (encode encoder) in - H2.Body.Writer.write_string body payload) - encoder_stream; - H2.Body.Writer.close body +let grpc_send_streaming_client ~encode body encoder_stream = + Seq.iter + (fun encoder -> + let payload = Grpc.Message.make (encode encoder) in + H2.Body.Writer.write_string body payload) + encoder_stream; + H2.Body.Writer.close body - let grpc_send_streaming ~encode request encoder_stream status_promise = - let body = - H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request - (H2.Response.create - ~headers: - (H2.Headers.of_list [ ("content-type", "application/grpc+proto") ]) - `OK) - in - Seq.iter - (fun input -> - let payload = Grpc.Message.make (encode input) in - H2.Body.Writer.write_string body payload; - H2.Body.Writer.flush body (fun () -> ())) - encoder_stream; - let status = Eio.Promise.await status_promise in - H2.Reqd.schedule_trailers request - (H2.Headers.of_list - ([ - ( "grpc-status", - string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) - ); - ] - @ - match Grpc.Status.message status with - | None -> [] - | Some message -> [ ("grpc-message", message) ])); - H2.Body.Writer.close body -end - -module Untyped = struct - let grpc_recv_streaming body message_buffer_writer = - Typed.grpc_recv_streaming ~decode:Fun.id body message_buffer_writer - - let grpc_send_streaming request encoder_stream status_promise = - Typed.grpc_send_streaming ~encode:Fun.id request encoder_stream - status_promise -end +let grpc_send_streaming ~encode request encoder_stream status_promise = + let body = + H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request + (H2.Response.create + ~headers: + (H2.Headers.of_list [ ("content-type", "application/grpc+proto") ]) + `OK) + in + Seq.iter + (fun input -> + let payload = Grpc.Message.make (encode input) in + H2.Body.Writer.write_string body payload; + H2.Body.Writer.flush body (fun () -> ())) + encoder_stream; + let status = Eio.Promise.await status_promise in + H2.Reqd.schedule_trailers request + (H2.Headers.of_list + ([ + ( "grpc-status", + string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) ); + ] + @ + match Grpc.Status.message status with + | None -> [] + | Some message -> [ ("grpc-message", message) ])); + H2.Body.Writer.close body diff --git a/lib/grpc-eio/connection.mli b/lib/grpc-eio/connection.mli index a5d35c8..fee84be 100644 --- a/lib/grpc-eio/connection.mli +++ b/lib/grpc-eio/connection.mli @@ -1,21 +1,12 @@ -module Typed : sig - val grpc_recv_streaming : - decode:(string -> 'a) -> H2.Body.Reader.t -> 'a Seq.writer -> unit - - val grpc_send_streaming_client : - encode:('a -> string) -> H2.Body.Writer.t -> 'a Seq.reader -> unit - - val grpc_send_streaming : - encode:('a -> string) -> - H2.Reqd.t -> - 'a Seq.reader -> - Grpc.Status.t Eio.Promise.t -> - unit -end - -module Untyped : sig - val grpc_recv_streaming : H2.Body.Reader.t -> string Seq.writer -> unit - - val grpc_send_streaming : - H2.Reqd.t -> string Seq.reader -> Grpc.Status.t Eio.Promise.t -> unit -end +val grpc_recv_streaming : + decode:(string -> 'a) -> H2.Body.Reader.t -> 'a Seq.writer -> unit + +val grpc_send_streaming_client : + encode:('a -> string) -> H2.Body.Writer.t -> 'a Seq.reader -> unit + +val grpc_send_streaming : + encode:('a -> string) -> + H2.Reqd.t -> + 'a Seq.reader -> + Grpc.Status.t Eio.Promise.t -> + unit diff --git a/lib/grpc-eio/server.ml b/lib/grpc-eio/server.ml index b0fdfb9..fff6503 100644 --- a/lib/grpc-eio/server.ml +++ b/lib/grpc-eio/server.ml @@ -49,8 +49,7 @@ let implement_rpc ~decode_request ~encode_response ~f reqd = let body = H2.Reqd.request_body reqd in let request_reader, request_writer = Seq.create_reader_writer () in let response_reader, response_writer = Seq.create_reader_writer () in - Connection.Typed.grpc_recv_streaming ~decode:decode_request body - request_writer; + Connection.grpc_recv_streaming ~decode:decode_request body request_writer; let status_promise, status_notify = Eio.Promise.create () in Eio.Fiber.both (fun () -> @@ -60,7 +59,7 @@ let implement_rpc ~decode_request ~encode_response ~f reqd = Eio.Promise.resolve status_notify status) (fun () -> try - Connection.Typed.grpc_send_streaming ~encode:encode_response reqd + Connection.grpc_send_streaming ~encode:encode_response reqd response_reader status_promise with exn -> (* https://github.com/anmonteiro/ocaml-h2/issues/175 *)