Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for per-signal urls #56

Merged
merged 7 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/client-cohttp-lwt/common_.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,30 @@ let debug_ =

let default_url = "http://localhost:4318"

let url =
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)

let get_url () = !url

let set_url s = url := s
let make_get_from_env env_name =
let value = ref None in
fun () ->
match !value with
| None ->
value := Sys.getenv_opt env_name;
!value
| Some value -> Some value

let get_url_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_ENDPOINT"

let get_url_traces_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"

let get_url_metrics_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"

let get_url_logs_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"

let remove_trailing_slash url =
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url

let parse_headers s =
let parse_header s =
Expand Down
59 changes: 49 additions & 10 deletions src/client-cohttp-lwt/config.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ open Common_

type t = {
debug: bool;
url: string;
url_traces: string;
url_metrics: string;
url_logs: string;
headers: (string * string) list;
batch_traces: int option;
batch_metrics: int option;
Expand All @@ -16,7 +18,9 @@ let pp out self : unit =
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
url;
url_traces;
url_metrics;
url_logs;
headers;
batch_traces;
batch_metrics;
Expand All @@ -26,17 +30,52 @@ let pp out self : unit =
self
in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d; @]}"
debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt
batch_logs batch_timeout_ms
"{@[ debug=%B;@ url_traces=%S;@ url_metrics=%S;@ url_logs=%S;@ \
headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \
batch_timeout_ms=%d; @]}"
debug url_traces url_metrics url_logs ppheaders headers ppiopt batch_traces
ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms

let make ?(debug = !debug_) ?url ?url_traces ?url_metrics ?url_logs
?(headers = get_headers ()) ?(batch_traces = Some 400)
?(batch_metrics = Some 20) ?(batch_logs = Some 400)
?(batch_timeout_ms = 500) () : t =
let url_traces, url_metrics, url_logs =
let base_url =
match url with
| None -> Option.value (get_url_from_env ()) ~default:default_url
| Some url -> remove_trailing_slash url
in
let url_traces =
match url_traces with
| None ->
Option.value
(get_url_traces_from_env ())
~default:(base_url ^ "/v1/traces")
| Some url -> url
in
let url_metrics =
match url_metrics with
| None ->
Option.value
(get_url_metrics_from_env ())
~default:(base_url ^ "/v1/metrics")
| Some url -> url
in
let url_logs =
match url_logs with
| None ->
Option.value (get_url_logs_from_env ()) ~default:(base_url ^ "/v1/logs")
| Some url -> url
in
url_traces, url_metrics, url_logs
in

let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_traces = Some 400) ?(batch_metrics = Some 20)
?(batch_logs = Some 400) ?(batch_timeout_ms = 500) () : t =
{
debug;
url;
url_traces;
url_metrics;
url_logs;
headers;
batch_traces;
batch_metrics;
Expand Down
28 changes: 25 additions & 3 deletions src/client-cohttp-lwt/config.mli
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
type t = private {
debug: bool;
url: string;
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
url_traces: string; (** Url to send traces *)
url_metrics: string; (** Url to send metrics*)
url_logs: string; (** Url to send logs *)
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
Expand Down Expand Up @@ -37,6 +37,9 @@ type t = private {
val make :
?debug:bool ->
?url:string ->
?url_traces:string ->
?url_metrics:string ->
?url_logs:string ->
?headers:(string * string) list ->
?batch_traces:int option ->
?batch_metrics:int option ->
Expand All @@ -49,6 +52,25 @@ val make :
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.

@param url base url used to construct per-signal urls. Per-signal url options take precedence over this base url.
Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set.

Example of constructed per-signal urls with the base url http://localhost:4318
- Traces: http://localhost:4318/v1/traces
- Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs

Use per-signal url options if different urls are needed for each signal type.

@param url_traces url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set.
The url is used as-is without any modification.

@param url_metrics url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set.
The url is used as-is without any modification.

@param url_logs url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set.
The url is used as-is without any modification.

*)

val pp : Format.formatter -> t -> unit
68 changes: 31 additions & 37 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ module Httpc : sig
val send :
t ->
config:Config.t ->
path:string ->
url:string ->
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
string ->
('a, error) result Lwt.t
Expand All @@ -91,17 +91,9 @@ end = struct
let cleanup _self = ()

(* send the content to the remote endpoint/path *)
let send (_self : t) ~(config : Config.t) ~path ~decode (bod : string) :
let send (_self : t) ~(config : Config.t) ~url ~decode (bod : string) :
('a, error) result Lwt.t =
let url =
let url = config.url in
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url
in
let full_url = url ^ path in
let uri = Uri.of_string full_url in
let uri = Uri.of_string url in

let open Cohttp in
let headers = Header.(add_list (init ()) !headers) in
Expand All @@ -121,7 +113,7 @@ end = struct
| Error e ->
let err =
`Failure
(spf "sending signals via http POST to %S\nfailed with:\n%s" full_url
(spf "sending signals via http POST to %S\nfailed with:\n%s" url
(Printexc.to_string e))
in
Lwt.return @@ Error err
Expand Down Expand Up @@ -158,7 +150,7 @@ end = struct
%s\n\
status: %S\n\
%s"
full_url code (Printexc.to_string e) body bt))
url code (Printexc.to_string e) body bt))
in
Lwt.return r
)
Expand Down Expand Up @@ -292,11 +284,11 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =

let set_on_tick_callbacks = Atomic.set on_tick_cbs_

let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit Lwt.t =
let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit Lwt.t =
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
let* r = Httpc.send httpc ~config ~path ~decode:(`Ret ()) data in
let* r = Httpc.send httpc ~config ~url ~decode:(`Ret ()) data in
match r with
| Ok () -> Lwt.return ()
| Error `Sysbreak ->
Expand All @@ -317,23 +309,26 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Metrics_service.default_export_metrics_service_request
~resource_metrics:l ()
in
send_http_ curl encoder ~path:"/v1/metrics"
let url = config.Config.url_metrics in
send_http_ curl encoder ~url
~encode:Metrics_service.encode_pb_export_metrics_service_request x

let send_traces_http curl encoder (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
send_http_ curl encoder ~path:"/v1/traces"
let url = config.Config.url_traces in
send_http_ curl encoder ~url
~encode:Trace_service.encode_pb_export_trace_service_request x

let send_logs_http curl encoder (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
send_http_ curl encoder ~path:"/v1/logs"
let url = config.Config.url_logs in
send_http_ curl encoder ~url
~encode:Logs_service.encode_pb_export_logs_service_request x

(* emit metrics, if the batch is full or timeout lapsed *)
Expand Down Expand Up @@ -459,12 +454,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
end in
(module M)

module Backend (Arg : sig
val stop : bool Atomic.t
module Backend
(Arg : sig
val stop : bool Atomic.t

val config : Config.t
end)
() : Opentelemetry.Collector.BACKEND = struct
val config : Config.t
end)
() : Opentelemetry.Collector.BACKEND = struct
include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())

open Opentelemetry.Proto
Expand All @@ -475,10 +471,10 @@ end)
send =
(fun l ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l);
let@ () = Lock.with_lock in
Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l);
push_trace l;
ret ());
}
Expand Down Expand Up @@ -532,10 +528,10 @@ end)
send =
(fun m ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m);
let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m);

let m = List.rev_append (additional_metrics ()) m in
push_metrics m;
Expand All @@ -547,10 +543,10 @@ end)
send =
(fun m ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m);
let@ () = Lock.with_lock in
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m);

push_logs m;
ret ());
Expand All @@ -560,8 +556,6 @@ end
let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
debug_ := config.debug;

if config.url <> get_url () then set_url config.url;

let module B =
Backend
(struct
Expand Down
6 changes: 0 additions & 6 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@

open Common_

val get_url : unit -> string

val set_url : string -> unit
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)

val get_headers : unit -> (string * string) list

val set_headers : (string * string) list -> unit
Expand Down
30 changes: 24 additions & 6 deletions src/client-ocurl/common_.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,30 @@ let debug_ =

let default_url = "http://localhost:4318"

let url =
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)

let get_url () = !url

let set_url s = url := s
let make_get_from_env env_name =
let value = ref None in
fun () ->
match !value with
| None ->
value := Sys.getenv_opt env_name;
!value
| Some value -> Some value
c-cube marked this conversation as resolved.
Show resolved Hide resolved

let get_url_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_ENDPOINT"

let get_url_traces_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"

let get_url_metrics_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"

let get_url_logs_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"

let remove_trailing_slash url =
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url

let parse_headers s =
let parse_header s =
Expand Down
Loading
Loading