Skip to content

Commit

Permalink
simplify connection (unused untyped api)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbarbin committed Jan 6, 2024
1 parent 9b80a3a commit 9673317
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 83 deletions.
6 changes: 3 additions & 3 deletions lib/grpc-eio/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request)
let make_handler ~encode_request ~decode_response ~f write_body read_body =
let response_reader, response_writer = Seq.create_reader_writer () in
let request_reader, request_writer = Seq.create_reader_writer () in
Connection.Typed.grpc_recv_streaming ~decode:decode_response read_body
Connection.grpc_recv_streaming ~decode:decode_response read_body
response_writer;
let res, res_notify = Eio.Promise.create () in
Eio.Fiber.both
(fun () ->
Eio.Promise.resolve res_notify (f request_writer response_reader))
(fun () ->
Connection.Typed.grpc_send_streaming_client ~encode:encode_request
write_body request_reader);
Connection.grpc_send_streaming_client ~encode:encode_request write_body
request_reader);
Eio.Promise.await res

module Typed_rpc = struct
Expand Down
100 changes: 44 additions & 56 deletions lib/grpc-eio/connection.ml
Original file line number Diff line number Diff line change
@@ -1,59 +1,47 @@
module Typed = struct
let grpc_recv_streaming ~decode body message_buffer_writer =
let request_buffer = Grpc.Buffer.v () in
let on_eof () = Seq.close_writer message_buffer_writer in
let rec on_read buffer ~off ~len =
Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer
~dst:request_buffer ~length:len;
Grpc.Message.extract_all
(fun message -> Seq.write message_buffer_writer (decode message))
request_buffer;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
let grpc_recv_streaming ~decode body message_buffer_writer =
let request_buffer = Grpc.Buffer.v () in
let on_eof () = Seq.close_writer message_buffer_writer in
let rec on_read buffer ~off ~len =
Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer
~dst:request_buffer ~length:len;
Grpc.Message.extract_all
(fun message -> Seq.write message_buffer_writer (decode message))
request_buffer;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
H2.Body.Reader.schedule_read body ~on_read ~on_eof

let grpc_send_streaming_client ~encode body encoder_stream =
Seq.iter
(fun encoder ->
let payload = Grpc.Message.make (encode encoder) in
H2.Body.Writer.write_string body payload)
encoder_stream;
H2.Body.Writer.close body
let grpc_send_streaming_client ~encode body encoder_stream =
Seq.iter
(fun encoder ->
let payload = Grpc.Message.make (encode encoder) in
H2.Body.Writer.write_string body payload)
encoder_stream;
H2.Body.Writer.close body

let grpc_send_streaming ~encode request encoder_stream status_promise =
let body =
H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request
(H2.Response.create
~headers:
(H2.Headers.of_list [ ("content-type", "application/grpc+proto") ])
`OK)
in
Seq.iter
(fun input ->
let payload = Grpc.Message.make (encode input) in
H2.Body.Writer.write_string body payload;
H2.Body.Writer.flush body (fun () -> ()))
encoder_stream;
let status = Eio.Promise.await status_promise in
H2.Reqd.schedule_trailers request
(H2.Headers.of_list
([
( "grpc-status",
string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status))
);
]
@
match Grpc.Status.message status with
| None -> []
| Some message -> [ ("grpc-message", message) ]));
H2.Body.Writer.close body
end

module Untyped = struct
let grpc_recv_streaming body message_buffer_writer =
Typed.grpc_recv_streaming ~decode:Fun.id body message_buffer_writer

let grpc_send_streaming request encoder_stream status_promise =
Typed.grpc_send_streaming ~encode:Fun.id request encoder_stream
status_promise
end
let grpc_send_streaming ~encode request encoder_stream status_promise =
let body =
H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request
(H2.Response.create
~headers:
(H2.Headers.of_list [ ("content-type", "application/grpc+proto") ])
`OK)
in
Seq.iter
(fun input ->
let payload = Grpc.Message.make (encode input) in
H2.Body.Writer.write_string body payload;
H2.Body.Writer.flush body (fun () -> ()))
encoder_stream;
let status = Eio.Promise.await status_promise in
H2.Reqd.schedule_trailers request
(H2.Headers.of_list
([
( "grpc-status",
string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) );
]
@
match Grpc.Status.message status with
| None -> []
| Some message -> [ ("grpc-message", message) ]));
H2.Body.Writer.close body
33 changes: 12 additions & 21 deletions lib/grpc-eio/connection.mli
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
module Typed : sig
val grpc_recv_streaming :
decode:(string -> 'a) -> H2.Body.Reader.t -> 'a Seq.writer -> unit

val grpc_send_streaming_client :
encode:('a -> string) -> H2.Body.Writer.t -> 'a Seq.reader -> unit

val grpc_send_streaming :
encode:('a -> string) ->
H2.Reqd.t ->
'a Seq.reader ->
Grpc.Status.t Eio.Promise.t ->
unit
end

module Untyped : sig
val grpc_recv_streaming : H2.Body.Reader.t -> string Seq.writer -> unit

val grpc_send_streaming :
H2.Reqd.t -> string Seq.reader -> Grpc.Status.t Eio.Promise.t -> unit
end
val grpc_recv_streaming :
decode:(string -> 'a) -> H2.Body.Reader.t -> 'a Seq.writer -> unit

val grpc_send_streaming_client :
encode:('a -> string) -> H2.Body.Writer.t -> 'a Seq.reader -> unit

val grpc_send_streaming :
encode:('a -> string) ->
H2.Reqd.t ->
'a Seq.reader ->
Grpc.Status.t Eio.Promise.t ->
unit
5 changes: 2 additions & 3 deletions lib/grpc-eio/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ let implement_rpc ~decode_request ~encode_response ~f reqd =
let body = H2.Reqd.request_body reqd in
let request_reader, request_writer = Seq.create_reader_writer () in
let response_reader, response_writer = Seq.create_reader_writer () in
Connection.Typed.grpc_recv_streaming ~decode:decode_request body
request_writer;
Connection.grpc_recv_streaming ~decode:decode_request body request_writer;
let status_promise, status_notify = Eio.Promise.create () in
Eio.Fiber.both
(fun () ->
Expand All @@ -60,7 +59,7 @@ let implement_rpc ~decode_request ~encode_response ~f reqd =
Eio.Promise.resolve status_notify status)
(fun () ->
try
Connection.Typed.grpc_send_streaming ~encode:encode_response reqd
Connection.grpc_send_streaming ~encode:encode_response reqd
response_reader status_promise
with exn ->
(* https://github.com/anmonteiro/ocaml-h2/issues/175 *)
Expand Down

0 comments on commit 9673317

Please sign in to comment.