Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf and features #49

Merged
merged 11 commits into from
Dec 21, 2023
6 changes: 3 additions & 3 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ module type EMITTER = sig

val push_logs : Logs.resource_logs list -> unit

val set_on_tick_callbacks : (unit -> unit) list ref -> unit
val set_on_tick_callbacks : (unit -> unit) AList.t -> unit

val tick : unit -> unit

Expand Down Expand Up @@ -288,7 +288,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()

let on_tick_cbs_ = Atomic.make (ref [])
let on_tick_cbs_ = Atomic.make (AList.make ())

let set_on_tick_callbacks = Atomic.set on_tick_cbs_

Expand Down Expand Up @@ -384,7 +384,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
with e ->
Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e))
!(Atomic.get on_tick_cbs_);
(AList.get @@ Atomic.get on_tick_cbs_);
()

(* thread that calls [tick()] regularly, to help enforce timeouts *)
Expand Down
30 changes: 26 additions & 4 deletions src/client-ocurl/config.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,42 @@ type t = {
batch_timeout_ms: int;
bg_threads: int;
ticker_thread: bool;
ticker_interval_ms: int;
self_trace: bool;
}

let pp out self =
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } =
let {
debug;
url;
headers;
batch_timeout_ms;
bg_threads;
ticker_thread;
ticker_interval_ms;
self_trace;
} =
self
in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \
ticker_thread=%B @]}"
ticker_thread=%B;@ ticker_interval_ms=%d;@ self_trace=%B @]}"
debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread
ticker_interval_ms self_trace

let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) () : t =
?(batch_timeout_ms = 2_000) ?(bg_threads = 4) ?(ticker_thread = true)
?(ticker_interval_ms = 500) ?(self_trace = false) () : t =
let bg_threads = max 2 (min bg_threads 32) in
{ debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread }
{
debug;
url;
headers;
batch_timeout_ms;
bg_threads;
ticker_thread;
ticker_interval_ms;
self_trace;
}
13 changes: 12 additions & 1 deletion src/client-ocurl/config.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ type t = private {
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *)
only checked when a new event occurs or when a tick
is emitted. Default 2_000. *)
bg_threads: int;
(** Are there background threads, and how many? Default [4] *)
ticker_thread: bool;
(** If true, start a thread that regularly checks if signals should
be sent to the collector. Default [true] *)
ticker_interval_ms: int;
(** Interval for ticker thread, in milliseconds. This is
only useful if [ticker_thread] is [true].
c-cube marked this conversation as resolved.
Show resolved Hide resolved
Default 500.
@since NEXT_RELEASE *)
self_trace: bool;
(** If true, the OTEL library will also emit its own spans. Default [false].
@since NEXT_RELEASE *)
}
(** Configuration.

Expand All @@ -31,6 +40,8 @@ val make :
?batch_timeout_ms:int ->
?bg_threads:int ->
?ticker_thread:bool ->
?ticker_interval_ms:int ->
?self_trace:bool ->
unit ->
t
(** Make a configuration.
Expand Down
127 changes: 92 additions & 35 deletions src/client-ocurl/opentelemetry_client_ocurl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,33 @@ let timeout_gc_metrics = Mtime.Span.(20 * s)
(** side channel for GC, appended to metrics batch data *)
let gc_metrics = AList.make ()

(** Mini tracing module (disabled if [config.self_trace=false]) *)
module Self_trace = struct
let enabled = Atomic.make true

let add_event (scope : Scope.t) ev = scope.events <- ev :: scope.events

let dummy_trace_id_ = Trace_id.create ()

let dummy_span_id = Span_id.create ()

let with_ ?kind ?attrs name f =
if Atomic.get enabled then
Opentelemetry.Trace.with_ ?kind ?attrs name f
else (
(* do nothing *)
let scope =
{
Scope.trace_id = dummy_trace_id_;
span_id = dummy_span_id;
attrs = [];
events = [];
}
in
f scope
)
end

(** capture current GC metrics if {!needs_gc_metrics} is true
or it has been a long time since the last GC metrics collection,
and push them into {!gc_metrics} for later collection *)
Expand Down Expand Up @@ -120,9 +147,19 @@ end = struct

let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit
=
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http"
in

let data =
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto"
in
Pbrt.Encoder.reset encoder;
encode x encoder;
Pbrt.Encoder.to_string encoder
in

let url =
let url = config.Config.url in
if url <> "" && String.get url (String.length url - 1) = '/' then
Expand All @@ -138,11 +175,18 @@ end = struct
("Content-Type", "application/x-protobuf") :: config.headers
in
match
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_internal "curl.post"
~attrs:[ "sz", `Int (String.length data); "url", `String url ]
in
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
with
| Ok { code; _ } when code >= 200 && code < 300 -> ()
| Ok { code; body; headers = _; info = _ } ->
Atomic.incr n_errors;
Self_trace.add_event _sc
@@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ];

if !debug_ || config.debug then (
let dec = Pbrt.Decoder.of_string body in
let body =
Expand Down Expand Up @@ -171,6 +215,11 @@ end = struct
let send_logs_http ~stop ~config (client : Curl.t) encoder
(l : Logs.resource_logs list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-logs"
~attrs:[ "n", `Int (List.length l) ]
in

let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
Expand All @@ -180,6 +229,11 @@ end = struct
let send_metrics_http ~stop ~config curl encoder
(l : Metrics.resource_metrics list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-metrics"
~attrs:[ "n", `Int (List.length l) ]
in

let x =
Metrics_service.default_export_metrics_service_request ~resource_metrics:l
()
Expand All @@ -190,6 +244,11 @@ end = struct
let send_traces_http ~stop ~config curl encoder
(l : Trace.resource_spans list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-traces"
~attrs:[ "n", `Int (List.length l) ]
in

let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
Expand Down Expand Up @@ -248,8 +307,8 @@ end = struct
in

let send_metrics () =
B_queue.push self.send_q
(To_send.Send_metric (Batch.pop_all batches.metrics))
let metrics = AList.pop_all gc_metrics :: Batch.pop_all batches.metrics in
B_queue.push self.send_q (To_send.Send_metric metrics)
in

let send_logs () =
Expand All @@ -266,39 +325,36 @@ end = struct
(* read multiple events at once *)
B_queue.pop_all self.q local_q;

let now = Mtime_clock.now () in
(* are we asked to flush all events? *)
let must_flush_all = ref false in

(* how to process a single event *)
let process_ev (ev : Event.t) : unit =
match ev with
| Event.E_metric m ->
Batch.push batches.metrics m;
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ()
| Event.E_trace tr ->
Batch.push batches.traces tr;
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_logs logs ->
Batch.push batches.logs logs;
if should_send_batch_ ~config ~now batches.logs then send_logs ()
| Event.E_metric m -> Batch.push batches.metrics m
| Event.E_trace tr -> Batch.push batches.traces tr
| Event.E_logs logs -> Batch.push batches.logs logs
| Event.E_tick ->
(* check for batches whose timeout expired *)
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ();
if should_send_batch_ ~config ~now batches.logs then send_logs ();
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_flush_all ->
if Batch.len batches.metrics > 0 then send_metrics ();
if Batch.len batches.logs > 0 then send_logs ();
if Batch.len batches.traces > 0 then send_traces ()
(* the only impact of "tick" is that it wakes us up regularly *)
()
| Event.E_flush_all -> must_flush_all := true
in

while not (Queue.is_empty local_q) do
let ev = Queue.pop local_q in
process_ev ev
done
Queue.iter process_ev local_q;
Queue.clear local_q;

if !must_flush_all then (
if Batch.len batches.metrics > 0 then send_metrics ();
if Batch.len batches.logs > 0 then send_logs ();
if Batch.len batches.traces > 0 then send_traces ()
) else (
let now = Mtime_clock.now () in
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ();

if should_send_batch_ ~config ~now batches.traces then send_traces ();
if should_send_batch_ ~config ~now batches.logs then send_logs ()
)
done
with B_queue.Closed -> ()

Expand Down Expand Up @@ -418,15 +474,14 @@ let create_backend ?(stop = Atomic.make false)
ret ());
}

let on_tick_cbs_ = Atomic.make (ref [])
let on_tick_cbs_ = Atomic.make (AList.make ())

let set_on_tick_callbacks = Atomic.set on_tick_cbs_

let tick () =
sample_gc_metrics_if_needed ();
Backend_impl.send_event backend Event.E_tick;
let l = Atomic.get on_tick_cbs_ in
List.iter (fun f -> f ()) !l
List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_)

let cleanup () = Backend_impl.shutdown backend
end in
Expand All @@ -451,9 +506,11 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
Opentelemetry.Collector.set_backend backend;

if config.url <> get_url () then set_url config.url;
Atomic.set Self_trace.enabled config.self_trace;

if config.ticker_thread then (
let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in
(* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t)
);

Expand Down
2 changes: 2 additions & 0 deletions src/client-ocurl/AList.ml → src/core/AList.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ type 'a t = 'a list Atomic.t

let make () = Atomic.make []

let get = Atomic.get

let add self x =
while
let old = Atomic.get self in
Expand Down
3 changes: 3 additions & 0 deletions src/client-ocurl/AList.mli → src/core/AList.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

type 'a t

val get : 'a t -> 'a list
(** Snapshot *)

val make : unit -> 'a t

val add : 'a t -> 'a -> unit
Expand Down
Loading
Loading