diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 5894f8b..1cf7eeb 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -258,7 +258,8 @@ end exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = +let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t) + () : (module EMITTER) = let open Proto in let open Lwt.Syntax in (* local helpers *) @@ -448,6 +449,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Lwt.async (fun () -> let* () = emit_all_force httpc encoder in Httpc.cleanup httpc; + (* resolve [after_cleanup], if provided *) + Option.iter (fun prom -> Lwt.wakeup_later prom ()) after_cleanup; Lwt.return ()) end in (module M) @@ -457,9 +460,13 @@ module Backend val stop : bool Atomic.t val config : Config.t + + val after_cleanup : unit Lwt.u option end) () : Opentelemetry.Collector.BACKEND = struct - include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) + include + (val mk_emitter ~after_cleanup:Arg.after_cleanup ~stop:Arg.stop + ~config:Arg.config ()) open Opentelemetry.Proto open Opentelemetry.Collector @@ -551,7 +558,8 @@ module Backend } end -let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = +let create_backend ?after_cleanup ?(stop = Atomic.make false) + ?(config = Config.make ()) () = debug_ := config.debug; let module B = @@ -560,25 +568,43 @@ let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = let stop = stop let config = config + + let after_cleanup = after_cleanup end) () in (module B : OT.Collector.BACKEND) -let setup_ ?stop ?config () = - let backend = create_backend ?stop ?config () in +let setup_ ?stop ?config () : (unit -> unit) * unit Lwt.t = + let cleanup_done, cleanup_done_prom = Lwt.wait () in + let backend = + create_backend ~after_cleanup:cleanup_done_prom ?stop ?config () + in OT.Collector.set_backend backend; - OT.Collector.remove_backend + + OT.Collector.remove_backend, cleanup_done let setup ?stop ?config ?(enable = true) () = if enable then ( - let cleanup = setup_ ?stop ?config () in + let cleanup, _lwt = setup_ ?stop ?config () in at_exit cleanup ) -let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f = - if enable then ( - let cleanup = setup_ ?stop ~config () in - Fun.protect ~finally:cleanup f - ) else +let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t + = + if enable then + let open Lwt.Syntax in + let cleanup, cleanup_done = setup_ ?stop ~config () in + + Lwt.catch + (fun () -> + let* res = f () in + cleanup (); + let+ () = cleanup_done in + res) + (fun exn -> + cleanup (); + let* () = cleanup_done in + Lwt.reraise exn) + else f () diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli index 09f64e7..c57332a 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli @@ -13,10 +13,13 @@ val set_headers : (string * string) list -> unit module Config = Config val create_backend : + ?after_cleanup:unit Lwt.u -> ?stop:bool Atomic.t -> ?config:Config.t -> unit -> (module Opentelemetry.Collector.BACKEND) +(** Create a new backend using lwt and cohttp + @param after_cleanup if provided, this is resolved into [()] after cleanup is done (since NEXT_RELEASE) *) val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit @@ -34,8 +37,8 @@ val with_setup : ?config:Config.t -> ?enable:bool -> unit -> - (unit -> 'a) -> - 'a + (unit -> 'a Lwt.t) -> + 'a Lwt.t (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after [f()] returns See {!setup} for more details. *) diff --git a/tests/bin/cohttp_client.ml b/tests/bin/cohttp_client.ml index dcd395e..8cd4dbb 100644 --- a/tests/bin/cohttp_client.ml +++ b/tests/bin/cohttp_client.ml @@ -70,5 +70,4 @@ let () = "Check HTTP requests at \ https://requestbin.com/r/enec1hql02hz/26qShWryt5vJc1JfrOwalhr5vQt@."; - Opentelemetry_client_cohttp_lwt.with_setup ~config () (fun () -> - Lwt_main.run (run ())) + Opentelemetry_client_cohttp_lwt.with_setup ~config () run |> Lwt_main.run diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index 5d0caf2..1558b6b 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -140,5 +140,5 @@ let () = Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!" (Atomic.get num_tr) elapsed n_per_sec) in - Opentelemetry_client_cohttp_lwt.with_setup ~stop ~config () @@ fun () -> - Lwt_main.run @@ run () + Opentelemetry_client_cohttp_lwt.with_setup ~stop ~config () run + |> Lwt_main.run