diff --git a/README.md b/README.md index 8da08ad..8771992 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ # Event Tracker -A Rust port of [vinted/event-tracker](https://github.com/vinted/event-tracker) +An abstraction for event tracking ## Installation -Add this line to your application's `Cargo.toml`: +Add event tracker as a dependency to your `Cargo.toml`: ```toml vinted_event_tracker = { git = "https://github.com/vinted/event-tracker-rs" } @@ -12,19 +12,18 @@ vinted_event_tracker = { git = "https://github.com/vinted/event-tracker-rs" } ## Usage -Configure the crate in your application executable, e.g. `src/main.rs` or `src/bin/executable_name.rs`. +Configure relay once during the startup. ```rust use serde::Serialize; use vinted_event_tracker::*; -#[tokio::main] -async fn main() -> Result<(), Box> { - let udp_relay = Udp::new("0.0.0.0:5005").await?; - set_relay(udp_relay)?; +#[tokio::main] +async fn main() { + let udp_relay = UdpRelay::new("0.0.0.0:5005").await.unwrap(); - Ok(()) + set_relay(udp_relay).unwrap(); } ``` @@ -51,8 +50,6 @@ fn track_search_event() { query: "shoes", }; - let event = Event::new(event, portal, debug_pin, search_event); - - let _ = track(event); + track(event, portal, debug_pin, search_event); } ``` diff --git a/examples/http.rs b/examples/http.rs index 8a9febb..b9070be 100644 --- a/examples/http.rs +++ b/examples/http.rs @@ -5,15 +5,11 @@ use vinted_event_tracker::*; async fn main() { tracing_subscriber::fmt::init(); - let url = "https://0.0.0.0:9999".parse().expect("valid url"); + let relay = HttpRelay::new("https://0.0.0.0:9999".parse().unwrap()); - let http_relay = Http::new(url); + set_relay(relay).unwrap(); - if let Err(ref error) = set_relay(http_relay) { - tracing::error!(%error, "Couldn't set HTTP relay"); - } - - track_events(5); + track_events(1_000); } fn track_events(iterations: i32) { @@ -23,10 +19,6 @@ fn track_events(iterations: i32) { } for iteration in 1..iterations { - let event = Event::new("system.test", "fr", Some(1234), SearchEvent { iteration }); - - if let Err(ref error) = track(event) { - tracing::error!(%error, "Couldn't track an event"); - } + vinted_event_tracker::track("event", "portal", Some(1234), SearchEvent { iteration }); } } diff --git a/examples/udp.rs b/examples/udp.rs index 6e42fe0..8899300 100644 --- a/examples/udp.rs +++ b/examples/udp.rs @@ -5,11 +5,9 @@ use vinted_event_tracker::*; async fn main() { tracing_subscriber::fmt::init(); - let udp_relay = Udp::new("0.0.0.0:5005").await.expect("valid udp relay"); + let relay = UdpRelay::new("0.0.0.0:5005").await.unwrap(); - if let Err(ref error) = set_relay(udp_relay) { - tracing::error!(%error, "Couldn't set UDP relay"); - } + set_relay(relay).unwrap(); track_events(1_000) } @@ -21,10 +19,6 @@ fn track_events(iterations: i32) { } for iteration in 1..iterations { - let event = Event::new("event", "portal", Some(1234), SearchEvent { iteration }); - - if let Err(ref error) = track(event) { - tracing::error!(%error, "Couldn't track an event"); - } + vinted_event_tracker::track("event", "portal", Some(1234), SearchEvent { iteration }); } } diff --git a/src/error.rs b/src/error.rs deleted file mode 100644 index 7ff29d2..0000000 --- a/src/error.rs +++ /dev/null @@ -1,22 +0,0 @@ -/// Crate level error enum -#[derive(Debug)] -pub enum Error { - /// Occurs when an event cannot be serialized - SerdeJson(serde_json::Error), -} - -impl std::error::Error for Error {} - -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::SerdeJson(e) => e.fmt(f), - } - } -} - -impl From for Error { - fn from(error: serde_json::Error) -> Self { - Self::SerdeJson(error) - } -} diff --git a/src/event.rs b/src/event.rs deleted file mode 100644 index 11a67aa..0000000 --- a/src/event.rs +++ /dev/null @@ -1,59 +0,0 @@ -use serde::Serialize; -use std::{fmt::Debug, time::SystemTime}; - -/// Event base -#[derive(Debug, Serialize)] -pub struct EventBase { - /// Event name - pub event: &'static str, - - /// Portal it's happening in - pub portal: &'static str, - - /// Current time in milliseconds since unix epoch - pub time: u128, - - /// Debug pin - pub debug_pin: Option, -} - -/// Event to track -#[derive(Debug, Serialize)] -pub struct Event -where - T: Debug + Serialize, -{ - /// Event base - #[serde(flatten)] - pub base: EventBase, - - /// Additional tracking data - #[serde(flatten)] - pub tracking_data: T, -} - -impl Event -where - T: Debug + Serialize, -{ - /// Creates an instance of [`Event`] - pub fn new( - event: &'static str, - portal: &'static str, - debug_pin: Option, - tracking_data: T, - ) -> Self { - Self { - base: EventBase { - event, - portal, - time: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("SystemTime before UNIX_EPOCH") - .as_millis(), - debug_pin, - }, - tracking_data, - } - } -} diff --git a/src/http.rs b/src/http_relay.rs similarity index 62% rename from src/http.rs rename to src/http_relay.rs index f9b4676..763036c 100644 --- a/src/http.rs +++ b/src/http_relay.rs @@ -1,15 +1,15 @@ -use crate::{EventBase, Relay}; +use crate::{Metadata, Relay}; use reqwest::{header, Client, Url}; /// A [`Relay`] that will print events to HTTP listener #[derive(Debug, Clone)] -pub struct Http { +pub struct HttpRelay { client: Client, url: Url, } -impl Http { - /// Creates an instance of [`Http`] [`Relay`] +impl HttpRelay { + /// Creates an instance of [HttpRelay] pub fn new(url: Url) -> Self { Self { client: Client::new(), @@ -18,24 +18,22 @@ impl Http { } } -impl Relay for Http { - fn transport(&self, event_base: EventBase, bytes: Vec) { - let url = self.url.clone(); - let client = self.client.clone(); +impl Relay for HttpRelay { + fn transport(&self, metadata: Metadata, serialized_event: Vec) { + let mut request = self + .client + .post(self.url.clone()) + .body(serialized_event) + .header(header::CONTENT_TYPE, "application/json") + .header("X-Local-Time", metadata.time.to_string()) + .header("X-Platform", "web") + .header("X-Portal", metadata.portal); + + if let Some(debug_pin) = metadata.debug_pin { + request = request.header("X-Debug-Pin", debug_pin); + } let _ = tokio::spawn(async move { - let mut request = client - .post(url) - .body(bytes) - .header(header::CONTENT_TYPE, "application/json") - .header("X-Local-Time", event_base.time.to_string()) - .header("X-Platform", "web") - .header("X-Portal", event_base.portal); - - if let Some(debug_pin) = event_base.debug_pin { - request = request.header("X-Debug-Pin", debug_pin); - } - let response = match request.send().await { Ok(response) => response, Err(error) => { diff --git a/src/lib.rs b/src/lib.rs index fcae4af..8067662 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,17 +39,14 @@ clippy::wildcard_imports )] -mod error; -mod event; +mod http_relay; +mod udp_relay; -mod http; -mod udp; - -pub use self::error::*; -pub use self::event::*; -pub use self::http::*; -pub use self::udp::*; +pub use self::http_relay::*; +pub use self::udp_relay::*; +use serde::Serialize; +use std::time::SystemTime; use std::{ hint::spin_loop, sync::atomic::{AtomicUsize, Ordering}, @@ -123,19 +120,84 @@ impl std::fmt::Display for SetRelayError { impl std::error::Error for SetRelayError {} -/// Tracks the actual event -pub fn track(event: Event) -> Result<(), Error> +/// Tracks event +#[tracing::instrument(skip(debug_pin, tracking_data))] +pub fn track(event: &'static str, portal: &'static str, debug_pin: Option, tracking_data: T) where - T: std::fmt::Debug + serde::Serialize, + T: Serialize, { - let event_vec = serde_json::to_vec(&event)?; + #[derive(Serialize)] + struct EventWithTrackingData<'a, T> + where + T: Serialize, + { + #[serde(flatten)] + metadata: &'a Metadata, + + #[serde(flatten)] + tracking_data: T, + } + + let metadata = Metadata { + event, + portal, + time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX_EPOCH") + .as_millis(), + debug_pin, + }; + + let event_with_tracking_data = { + EventWithTrackingData { + metadata: &metadata, + tracking_data, + } + }; + + match serde_json::to_vec(&event_with_tracking_data) { + Ok(bytes) => relay().transport(metadata, bytes), + Err(error) => { + tracing::error!(%error, "Could not serialize event") + } + } +} - relay().transport(event.base, event_vec); +/// Event metadata +#[derive(Debug, Serialize)] +pub struct Metadata { + event: &'static str, - Ok(()) + portal: &'static str, + + time: u128, + + debug_pin: Option, +} + +impl Metadata { + /// Event name + pub fn event(&self) -> &'static str { + self.event + } + + /// Portal it's happening in + pub fn portal(&self) -> &'static str { + self.portal + } + + /// Current time in milliseconds since unix epoch + pub fn time(&self) -> u128 { + self.time + } + + /// Debug pin + pub fn debug_pin(&self) -> Option { + self.debug_pin + } } -/// Trait for event transportation +/// An abstraction for event transmission over the wire pub trait Relay { /// Accepts event, serialized in JSON, in a form of bytes. /// @@ -143,11 +205,11 @@ pub trait Relay { /// - HTTP /// - TCP /// - UDP - fn transport(&self, event_base: EventBase, event: Vec); + fn transport(&self, metadata: Metadata, serialized_event: Vec); } struct NopRelay; impl Relay for NopRelay { - fn transport(&self, _: EventBase, _: Vec) {} + fn transport(&self, _: Metadata, _: Vec) {} } diff --git a/src/udp.rs b/src/udp_relay.rs similarity index 75% rename from src/udp.rs rename to src/udp_relay.rs index f8ff74b..d710c7b 100644 --- a/src/udp.rs +++ b/src/udp_relay.rs @@ -1,4 +1,4 @@ -use crate::{EventBase, Relay}; +use crate::{Metadata, Relay}; use std::{ io, net::{SocketAddr, ToSocketAddrs}, @@ -8,12 +8,12 @@ use tokio::net::UdpSocket; /// A [`Relay`] that will print events to UDP listener #[derive(Debug)] -pub struct Udp { +pub struct UdpRelay { udp_socket: Arc, } -impl Udp { - /// [Udp] relay will bind to the given remote_addr +impl UdpRelay { + /// [UdpRelay] will bind to the given remote_addr pub async fn new(remote_addrs: S) -> Result where S: ToSocketAddrs, @@ -46,11 +46,11 @@ impl Udp { } } -impl Relay for Udp { - fn transport(&self, _: EventBase, event: Vec) { +impl Relay for UdpRelay { + fn transport(&self, _: Metadata, serialized_event: Vec) { let udp_socket = self.udp_socket.clone(); - let _ = tokio::spawn(async move { udp_socket.send(&event).await }); + let _ = tokio::spawn(async move { udp_socket.send(&serialized_event).await }); } } @@ -60,7 +60,7 @@ mod tests { #[test] fn parses_local_addr_correctly() { - let _ = Udp::local_addr_v4(); - let _ = Udp::local_addr_v6(); + let _ = UdpRelay::local_addr_v4(); + let _ = UdpRelay::local_addr_v6(); } }