Skip to content

Commit

Permalink
Merge pull request #49 from imandra-ai/wip-perf-2023-12-20
Browse files Browse the repository at this point in the history
perf and features
  • Loading branch information
c-cube authored Dec 21, 2023
2 parents 47f7f1d + 031b7bf commit b95eb21
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 53 deletions.
6 changes: 3 additions & 3 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ module type EMITTER = sig

val push_logs : Logs.resource_logs list -> unit

val set_on_tick_callbacks : (unit -> unit) list ref -> unit
val set_on_tick_callbacks : (unit -> unit) AList.t -> unit

val tick : unit -> unit

Expand Down Expand Up @@ -288,7 +288,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()

let on_tick_cbs_ = Atomic.make (ref [])
let on_tick_cbs_ = Atomic.make (AList.make ())

let set_on_tick_callbacks = Atomic.set on_tick_cbs_

Expand Down Expand Up @@ -384,7 +384,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
with e ->
Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e))
!(Atomic.get on_tick_cbs_);
(AList.get @@ Atomic.get on_tick_cbs_);
()

(* thread that calls [tick()] regularly, to help enforce timeouts *)
Expand Down
32 changes: 27 additions & 5 deletions src/client-ocurl/config.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,42 @@ type t = {
batch_timeout_ms: int;
bg_threads: int;
ticker_thread: bool;
ticker_interval_ms: int;
self_trace: bool;
}

let pp out self =
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } =
let {
debug;
url;
headers;
batch_timeout_ms;
bg_threads;
ticker_thread;
ticker_interval_ms;
self_trace;
} =
self
in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \
ticker_thread=%B @]}"
ticker_thread=%B;@ ticker_interval_ms=%d;@ self_trace=%B @]}"
debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread
ticker_interval_ms self_trace

let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) () : t =
let bg_threads = max 2 (min bg_threads 32) in
{ debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread }
?(batch_timeout_ms = 2_000) ?(bg_threads = 4) ?(ticker_thread = true)
?(ticker_interval_ms = 500) ?(self_trace = false) () : t =
let bg_threads = max 1 (min bg_threads 32) in
{
debug;
url;
headers;
batch_timeout_ms;
bg_threads;
ticker_thread;
ticker_interval_ms;
self_trace;
}
18 changes: 16 additions & 2 deletions src/client-ocurl/config.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,24 @@ type t = private {
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *)
only checked when a new event occurs or when a tick
is emitted. Default 2_000. *)
bg_threads: int;
(** Are there background threads, and how many? Default [4] *)
(** Are there background threads, and how many? Default [4].
This will be adjusted to be at least [1] and at most [32]. *)
ticker_thread: bool;
(** If true, start a thread that regularly checks if signals should
be sent to the collector. Default [true] *)
ticker_interval_ms: int;
(** Interval for ticker thread, in milliseconds. This is
only useful if [ticker_thread] is [true].
This will be clamped between [2 ms] and some longer
interval (maximum [60s] currently).
Default 500.
@since NEXT_RELEASE *)
self_trace: bool;
(** If true, the OTEL library will also emit its own spans. Default [false].
@since NEXT_RELEASE *)
}
(** Configuration.
Expand All @@ -31,6 +43,8 @@ val make :
?batch_timeout_ms:int ->
?bg_threads:int ->
?ticker_thread:bool ->
?ticker_interval_ms:int ->
?self_trace:bool ->
unit ->
t
(** Make a configuration.
Expand Down
127 changes: 92 additions & 35 deletions src/client-ocurl/opentelemetry_client_ocurl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,33 @@ let timeout_gc_metrics = Mtime.Span.(20 * s)
(** side channel for GC, appended to metrics batch data *)
let gc_metrics = AList.make ()

(** Mini tracing module (disabled if [config.self_trace=false]) *)
module Self_trace = struct
let enabled = Atomic.make true

let add_event (scope : Scope.t) ev = scope.events <- ev :: scope.events

let dummy_trace_id_ = Trace_id.create ()

let dummy_span_id = Span_id.create ()

let with_ ?kind ?attrs name f =
if Atomic.get enabled then
Opentelemetry.Trace.with_ ?kind ?attrs name f
else (
(* do nothing *)
let scope =
{
Scope.trace_id = dummy_trace_id_;
span_id = dummy_span_id;
attrs = [];
events = [];
}
in
f scope
)
end

(** capture current GC metrics if {!needs_gc_metrics} is true
or it has been a long time since the last GC metrics collection,
and push them into {!gc_metrics} for later collection *)
Expand Down Expand Up @@ -120,9 +147,19 @@ end = struct

let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit
=
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http"
in

let data =
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto"
in
Pbrt.Encoder.reset encoder;
encode x encoder;
Pbrt.Encoder.to_string encoder
in

let url =
let url = config.Config.url in
if url <> "" && String.get url (String.length url - 1) = '/' then
Expand All @@ -138,11 +175,18 @@ end = struct
("Content-Type", "application/x-protobuf") :: config.headers
in
match
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_internal "curl.post"
~attrs:[ "sz", `Int (String.length data); "url", `String url ]
in
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
with
| Ok { code; _ } when code >= 200 && code < 300 -> ()
| Ok { code; body; headers = _; info = _ } ->
Atomic.incr n_errors;
Self_trace.add_event _sc
@@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ];

if !debug_ || config.debug then (
let dec = Pbrt.Decoder.of_string body in
let body =
Expand Down Expand Up @@ -171,6 +215,11 @@ end = struct
let send_logs_http ~stop ~config (client : Curl.t) encoder
(l : Logs.resource_logs list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-logs"
~attrs:[ "n", `Int (List.length l) ]
in

let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
Expand All @@ -180,6 +229,11 @@ end = struct
let send_metrics_http ~stop ~config curl encoder
(l : Metrics.resource_metrics list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-metrics"
~attrs:[ "n", `Int (List.length l) ]
in

let x =
Metrics_service.default_export_metrics_service_request ~resource_metrics:l
()
Expand All @@ -190,6 +244,11 @@ end = struct
let send_traces_http ~stop ~config curl encoder
(l : Trace.resource_spans list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-traces"
~attrs:[ "n", `Int (List.length l) ]
in

let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
Expand Down Expand Up @@ -248,8 +307,8 @@ end = struct
in

let send_metrics () =
B_queue.push self.send_q
(To_send.Send_metric (Batch.pop_all batches.metrics))
let metrics = AList.pop_all gc_metrics :: Batch.pop_all batches.metrics in
B_queue.push self.send_q (To_send.Send_metric metrics)
in

let send_logs () =
Expand All @@ -266,39 +325,36 @@ end = struct
(* read multiple events at once *)
B_queue.pop_all self.q local_q;

let now = Mtime_clock.now () in
(* are we asked to flush all events? *)
let must_flush_all = ref false in

(* how to process a single event *)
let process_ev (ev : Event.t) : unit =
match ev with
| Event.E_metric m ->
Batch.push batches.metrics m;
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ()
| Event.E_trace tr ->
Batch.push batches.traces tr;
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_logs logs ->
Batch.push batches.logs logs;
if should_send_batch_ ~config ~now batches.logs then send_logs ()
| Event.E_metric m -> Batch.push batches.metrics m
| Event.E_trace tr -> Batch.push batches.traces tr
| Event.E_logs logs -> Batch.push batches.logs logs
| Event.E_tick ->
(* check for batches whose timeout expired *)
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ();
if should_send_batch_ ~config ~now batches.logs then send_logs ();
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_flush_all ->
if Batch.len batches.metrics > 0 then send_metrics ();
if Batch.len batches.logs > 0 then send_logs ();
if Batch.len batches.traces > 0 then send_traces ()
(* the only impact of "tick" is that it wakes us up regularly *)
()
| Event.E_flush_all -> must_flush_all := true
in

while not (Queue.is_empty local_q) do
let ev = Queue.pop local_q in
process_ev ev
done
Queue.iter process_ev local_q;
Queue.clear local_q;

if !must_flush_all then (
if Batch.len batches.metrics > 0 then send_metrics ();
if Batch.len batches.logs > 0 then send_logs ();
if Batch.len batches.traces > 0 then send_traces ()
) else (
let now = Mtime_clock.now () in
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ();

if should_send_batch_ ~config ~now batches.traces then send_traces ();
if should_send_batch_ ~config ~now batches.logs then send_logs ()
)
done
with B_queue.Closed -> ()

Expand Down Expand Up @@ -418,15 +474,14 @@ let create_backend ?(stop = Atomic.make false)
ret ());
}

let on_tick_cbs_ = Atomic.make (ref [])
let on_tick_cbs_ = Atomic.make (AList.make ())

let set_on_tick_callbacks = Atomic.set on_tick_cbs_

let tick () =
sample_gc_metrics_if_needed ();
Backend_impl.send_event backend Event.E_tick;
let l = Atomic.get on_tick_cbs_ in
List.iter (fun f -> f ()) !l
List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_)

let cleanup () = Backend_impl.shutdown backend
end in
Expand All @@ -451,9 +506,11 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
Opentelemetry.Collector.set_backend backend;

if config.url <> get_url () then set_url config.url;
Atomic.set Self_trace.enabled config.self_trace;

if config.ticker_thread then (
let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in
(* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t)
);

Expand Down
2 changes: 2 additions & 0 deletions src/client-ocurl/AList.ml → src/core/AList.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ type 'a t = 'a list Atomic.t

let make () = Atomic.make []

let get = Atomic.get

let add self x =
while
let old = Atomic.get self in
Expand Down
3 changes: 3 additions & 0 deletions src/client-ocurl/AList.mli → src/core/AList.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

type 'a t

val get : 'a t -> 'a list
(** Snapshot *)

val make : unit -> 'a t

val add : 'a t -> 'a -> unit
Expand Down
Loading

0 comments on commit b95eb21

Please sign in to comment.