Skip to content

Commit

Permalink
Merge pull request #76 from imandra-ai/simon/fix-41
Browse files Browse the repository at this point in the history
fix: wait for cleanup in cohttp client
  • Loading branch information
c-cube authored Oct 22, 2024
2 parents 3a22a93 + 5e925d6 commit 9813ec6
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 18 deletions.
50 changes: 38 additions & 12 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ end
exceptions inside should be caught, see
https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t)
() : (module EMITTER) =
let open Proto in
let open Lwt.Syntax in
(* local helpers *)
Expand Down Expand Up @@ -448,6 +449,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Lwt.async (fun () ->
let* () = emit_all_force httpc encoder in
Httpc.cleanup httpc;
(* resolve [after_cleanup], if provided *)
Option.iter (fun prom -> Lwt.wakeup_later prom ()) after_cleanup;
Lwt.return ())
end in
(module M)
Expand All @@ -457,9 +460,13 @@ module Backend
val stop : bool Atomic.t

val config : Config.t

val after_cleanup : unit Lwt.u option
end)
() : Opentelemetry.Collector.BACKEND = struct
include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())
include
(val mk_emitter ~after_cleanup:Arg.after_cleanup ~stop:Arg.stop
~config:Arg.config ())

open Opentelemetry.Proto
open Opentelemetry.Collector
Expand Down Expand Up @@ -551,7 +558,8 @@ module Backend
}
end

let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
let create_backend ?after_cleanup ?(stop = Atomic.make false)
?(config = Config.make ()) () =
debug_ := config.debug;

let module B =
Expand All @@ -560,25 +568,43 @@ let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
let stop = stop

let config = config

let after_cleanup = after_cleanup
end)
()
in
(module B : OT.Collector.BACKEND)

let setup_ ?stop ?config () =
let backend = create_backend ?stop ?config () in
let setup_ ?stop ?config () : (unit -> unit) * unit Lwt.t =
let cleanup_done, cleanup_done_prom = Lwt.wait () in
let backend =
create_backend ~after_cleanup:cleanup_done_prom ?stop ?config ()
in
OT.Collector.set_backend backend;
OT.Collector.remove_backend

OT.Collector.remove_backend, cleanup_done

let setup ?stop ?config ?(enable = true) () =
if enable then (
let cleanup = setup_ ?stop ?config () in
let cleanup, _lwt = setup_ ?stop ?config () in
at_exit cleanup
)

let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f =
if enable then (
let cleanup = setup_ ?stop ~config () in
Fun.protect ~finally:cleanup f
) else
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t
=
if enable then
let open Lwt.Syntax in
let cleanup, cleanup_done = setup_ ?stop ~config () in

Lwt.catch
(fun () ->
let* res = f () in
cleanup ();
let+ () = cleanup_done in
res)
(fun exn ->
cleanup ();
let* () = cleanup_done in
Lwt.reraise exn)
else
f ()
7 changes: 5 additions & 2 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ val set_headers : (string * string) list -> unit
module Config = Config

val create_backend :
?after_cleanup:unit Lwt.u ->
?stop:bool Atomic.t ->
?config:Config.t ->
unit ->
(module Opentelemetry.Collector.BACKEND)
(** Create a new backend using lwt and cohttp
@param after_cleanup if provided, this is resolved into [()] after cleanup is done (since NEXT_RELEASE) *)

val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
Expand All @@ -34,8 +37,8 @@ val with_setup :
?config:Config.t ->
?enable:bool ->
unit ->
(unit -> 'a) ->
'a
(unit -> 'a Lwt.t) ->
'a Lwt.t
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up
after [f()] returns
See {!setup} for more details. *)
3 changes: 1 addition & 2 deletions tests/bin/cohttp_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,4 @@ let () =
"Check HTTP requests at \
https://requestbin.com/r/enec1hql02hz/26qShWryt5vJc1JfrOwalhr5vQt@.";

Opentelemetry_client_cohttp_lwt.with_setup ~config () (fun () ->
Lwt_main.run (run ()))
Opentelemetry_client_cohttp_lwt.with_setup ~config () run |> Lwt_main.run
4 changes: 2 additions & 2 deletions tests/bin/emit1_cohttp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,5 @@ let () =
Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!"
(Atomic.get num_tr) elapsed n_per_sec)
in
Opentelemetry_client_cohttp_lwt.with_setup ~stop ~config () @@ fun () ->
Lwt_main.run @@ run ()
Opentelemetry_client_cohttp_lwt.with_setup ~stop ~config () run
|> Lwt_main.run

0 comments on commit 9813ec6

Please sign in to comment.