Skip to content

Commit

Permalink
refactor: Switch to telemetry_batteries for telemetry management
Browse files Browse the repository at this point in the history
  • Loading branch information
notheotherben committed Sep 9, 2024
1 parent a66cd0c commit fab24cb
Show file tree
Hide file tree
Showing 16 changed files with 769 additions and 653 deletions.
1,220 changes: 713 additions & 507 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 1 addition & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,12 @@ futures = "0.3"
http = "1.1"
log = "0.4"
mime = "0.3"
opentelemetry = { version = "0.23" }
opentelemetry-otlp = { version = "0.16", features = ["tls-roots"] }
rand = "0.8"
reqwest = { version = "0.12" }
sentry = "0.34"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.39", features = ["sync"] }
tonic = { version = "0.11", features = ["tls-roots"] }
tracing = { version = "0.1.37", features = ["log"] }
tracing-attributes = { git="https://github.com/SierraSoftworks/tracing.git" }
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
tracing-opentelemetry = "0.24"
tracing-subscriber = { version = "0.3.18", features = ["tracing-log"] }
tracing-batteries = { git = "https://github.com/sierrasoftworks/tracing-batteries-rs.git" }
uuid = { version = "0.7", features = ["serde", "v4", "u128" ]}
opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"] }
11 changes: 4 additions & 7 deletions src/api/health/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ mod models;
mod test;

use actix_web::{get, web, HttpRequest};
use opentelemetry::trace::TraceContextExt;
use tracing::Span;
use tracing::instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_batteries::prelude::*;
use crate::telemetry::*;

use crate::models::*;
Expand All @@ -19,19 +16,19 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(tracing_v1);
}

#[instrument(err, skip(state), fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(state), fields(otel.kind = "internal"))]
#[get("/api/v1/health")]
pub async fn health_v1(state: web::Data<GlobalState>) -> Result<models::HealthV1, APIError> {
state.store.send(GetHealth {}.trace()).await?.map(|h| h.into())
}

#[instrument(err, skip(state), fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(state), fields(otel.kind = "internal"))]
#[get("/api/v2/health")]
pub async fn health_v2(state: web::Data<GlobalState>) -> Result<models::HealthV2, APIError> {
state.store.send(GetHealth {}.trace()).await?.map(|h| h.into())
}

#[instrument(err, skip(req), fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(req), fields(otel.kind = "internal"))]
#[get("/api/v1/tracing")]
pub async fn tracing_v1(req: HttpRequest) -> Result<String, APIError> {
let headers = req.headers().iter().map(|(k, v)| {
Expand Down
1 change: 1 addition & 0 deletions src/api/health/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{json_responder, models::*};
use tracing_batteries::prelude::*;

#[derive(Serialize, Deserialize)]
pub struct HealthV1 {
Expand Down
2 changes: 1 addition & 1 deletion src/api/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ macro_rules! json_responder {
impl actix_web::Responder for $t {
type Body = actix_web::body::BoxBody;

#[instrument(target="response.render", fields(http.content_type = "application/json"), skip(self, _req))]
#[tracing::instrument(target="response.render", fields(http.content_type = "application/json"), skip(self, _req))]
fn respond_to(self, _req: &actix_web::HttpRequest) -> actix_web::HttpResponse<Self::Body> {
actix_web::HttpResponse::Ok()
.content_type("application/json")
Expand Down
6 changes: 3 additions & 3 deletions src/api/quote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod models;
mod test;

use actix_web::{get, web, HttpRequest, Result};
use tracing::instrument;
use tracing_batteries::prelude::*;
use crate::telemetry::*;

use crate::models::*;
Expand All @@ -15,15 +15,15 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(quote_by_v1);
}

#[instrument(err, skip(state), fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(state), fields(otel.kind = "internal"))]
#[get("/api/v1/quote")]
pub async fn quote_v1(state: web::Data<GlobalState>) -> Result<models::QuoteV1, APIError> {
state.store.send(GetQuote{
who: "".to_string(),
}.trace()).await?.map(|q| q.into())
}

#[instrument(err, skip(state), fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(state), fields(otel.kind = "internal"))]
#[get("/api/v1/quote/{person}")]
pub async fn quote_by_v1(state: web::Data<GlobalState>, request: HttpRequest) -> Result<models::QuoteV1, APIError> {
state.store.send(GetQuote{
Expand Down
6 changes: 3 additions & 3 deletions src/api/quote/models.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::models::Quote;

use actix_web::{HttpRequest, HttpResponse, Responder, http::header, http::header::Header, body::BoxBody};
use tracing::Span;
use tracing_batteries::prelude::*;

#[derive(Serialize, Deserialize)]
pub struct QuoteV1 {
Expand Down Expand Up @@ -30,7 +30,7 @@ impl From<Quote> for QuoteV1 {
impl Responder for QuoteV1 {
type Body = BoxBody;

#[instrument(target="response.render", fields(http.content_type = tracing::field::Empty), skip(self, req))]
#[tracing::instrument(target="response.render", fields(http.content_type = tracing::field::Empty), skip(self, req))]
fn respond_to(self, req: &HttpRequest) -> HttpResponse<Self::Body> {
let content_type = header::Accept::parse(req).map(|header| {
for a in header.iter() {
Expand All @@ -46,7 +46,7 @@ impl Responder for QuoteV1 {
"application/json"
}).unwrap_or("application/json");

Span::current().record("http.content_type", &tracing::field::display(content_type));
Span::current().record("http.content_type", display(content_type));

info!({ http.content_type = %content_type}, "Rendering quote");

Expand Down
23 changes: 14 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ extern crate uuid;
extern crate mime;
extern crate tokio;
extern crate rand;
#[macro_use] extern crate tracing;
extern crate sentry;

use actix_web::{App, HttpServer, web::Data};
use telemetry::Session;
use tracing::{Instrument, info_span};
use tracing_batteries::prelude::*;

mod api;
#[macro_use] mod macros;
Expand All @@ -25,7 +22,7 @@ fn get_listening_port() -> u16 {

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
let _session = Session::new();
let session = telemetry::setup();

let state = api::GlobalState::new();

Expand All @@ -39,13 +36,21 @@ async fn main() -> std::io::Result<()> {
let listen_on = get_listening_port();

info!("Starting server on :{}", listen_on);
HttpServer::new(move || {
let result = HttpServer::new(move || {
App::new()
.app_data(Data::new(state.clone()))
.wrap(telemetry::TracingLogger)
.configure(api::configure)
})
.bind(format!("0.0.0.0:{}", listen_on))?
.run()
.await
.bind(format!("0.0.0.0:{}", listen_on))?
.run()
.await;

if let Err(error) = &result {
eprintln!("Error starting server: {}", error);
session.record_error(error);
}

session.shutdown();
result
}
7 changes: 3 additions & 4 deletions src/store/blob.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use actix::prelude::*;
use azure_storage::{ConnectionString, StorageCredentials};
use azure_storage_blobs::prelude::*;
use tracing::*;
use tracing_futures::Instrument;
use tracing_batteries::prelude::*;
use super::{Loader, Store};
use crate::{api::APIError, models::*};
use crate::telemetry::*;
Expand All @@ -16,7 +15,7 @@ pub struct BlobLoader {

#[async_trait::async_trait]
impl Loader for BlobLoader {
#[instrument(err, skip(self, state), fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(self, state), fields(otel.kind = "internal"))]
async fn load_quotes(&self, state: Addr<Store>) -> Result<(), APIError> {
debug!("Initializing Azure Blob storage client");

Expand Down Expand Up @@ -62,7 +61,7 @@ impl Loader for BlobLoader {
quotes: fc.iter().map(|q| q.clone().into()).collect()
}.trace()).await? {
Ok(_) => {
event!(Level::INFO, "Loaded {} quotes into the state store.", quote_count);
info!("Loaded {} quotes into the state store.", quote_count);
Ok(())
},
Err(err) => {
Expand Down
6 changes: 3 additions & 3 deletions src/store/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix::prelude::*;
use tracing::*;
use tracing_batteries::prelude::*;
use super::{Loader, Store};
use crate::{api::APIError, models::*};
use crate::telemetry::*;
Expand All @@ -11,7 +11,7 @@ pub struct FileLoader {

#[async_trait::async_trait]
impl Loader for FileLoader {
#[instrument(err, skip(self, state), fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(self, state), fields(otel.kind = "internal"))]
async fn load_quotes(&self, state: Addr<Store>) -> Result<(), APIError> {
info!("Loading quotes from {}", self.path.display());

Expand All @@ -36,7 +36,7 @@ impl Loader for FileLoader {
error!("Failed to load quotes from {}: {}", self.path.display(), err);
Err(err)
} else {
event!(Level::INFO, "Loaded {} quotes into the state store.", quote_count);
info!("Loaded {} quotes into the state store.", quote_count);
Ok(())
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::{RwLock, Arc};
use crate::{models::*, trace_handler};
use crate::telemetry::*;
use actix::prelude::*;
use tracing::*;
use tracing_batteries::prelude::*;
use crate::api::APIError;
use rand::seq::{SliceRandom, IteratorRandom};

Expand All @@ -29,7 +29,7 @@ trace_handler!(MemoryStore, AddQuote, Result<(), APIError>);
impl Handler<AddQuote> for MemoryStore {
type Result = Result<(), APIError>;

#[instrument(err, skip(self), name="add_quote", fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(self), name="add_quote", fields(otel.kind = "internal"))]
fn handle(&mut self, msg: AddQuote, _: &mut Self::Context) -> Self::Result {
let mut qs = info_span!("lock.acquire", "otel.kind" = "internal", db.instance="quotes", db.statement="WRITE").in_scope(|| {
self.quotes.write().map_err(|exception| {
Expand All @@ -52,7 +52,7 @@ trace_handler!(MemoryStore, AddQuotes, Result<(), APIError>);
impl Handler<AddQuotes> for MemoryStore {
type Result = Result<(), APIError>;

#[instrument(err, skip(self), name="add_quotes", fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(self), name="add_quotes", fields(otel.kind = "internal"))]
fn handle(&mut self, msg: AddQuotes, _: &mut Self::Context) -> Self::Result {
let mut qs = info_span!("lock.acquire", "otel.kind" = "internal", db.instance="quotes", db.statement="WRITE").in_scope(|| {
self.quotes.write().map_err(|exception| {
Expand All @@ -74,7 +74,7 @@ trace_handler!(MemoryStore, GetQuote, Result<Quote, APIError>);
impl Handler<GetQuote> for MemoryStore {
type Result = Result<Quote, APIError>;

#[instrument(err, skip(self), name="get_quote", fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(self), name="get_quote", fields(otel.kind = "internal"))]
fn handle(&mut self, msg: GetQuote, _: &mut Self::Context) -> Self::Result {

let qs = info_span!("lock.acquire", "otel.kind" = "internal", db.instance="quotes", db.statement="WRITE").in_scope(|| {
Expand Down Expand Up @@ -104,7 +104,7 @@ trace_handler!(MemoryStore, GetHealth, Result<Health, APIError>);
impl Handler<GetHealth> for MemoryStore {
type Result = Result<Health, APIError>;

#[instrument(err, skip(self), name="get_health", fields(otel.kind = "internal"))]
#[tracing::instrument(err, skip(self), name="get_health", fields(otel.kind = "internal"))]
fn handle(&mut self, _: GetHealth, _: &mut Self::Context) -> Self::Result {
Ok(Health {
ok: true,
Expand Down
6 changes: 3 additions & 3 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ pub mod file;
pub mod memory;

use actix::prelude::*;
use tracing::instrument;
use tracing_batteries::prelude::*;
use crate::api::APIError;

use super::api::{GlobalState};
use super::api::GlobalState;

pub type Store = memory::MemoryStore;

Expand All @@ -15,7 +15,7 @@ pub trait Loader {
async fn load_quotes(&self, state: Addr<Store>) -> Result<(), APIError>;
}

#[instrument(err, skip(loader, state))]
#[tracing::instrument(err, skip(loader, state))]
pub async fn load_global_state(loader: &dyn Loader, state: &GlobalState) -> Result<(), APIError> {
loader.load_quotes(state.store.clone()).await
}
2 changes: 1 addition & 1 deletion src/telemetry/actix_message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix::Message;
use tracing::Span;
use tracing_batteries::prelude::*;

pub struct TraceMessage<M: Message> {
pub message: M,
Expand Down
11 changes: 5 additions & 6 deletions src/telemetry/actix_web_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use actix_web::{Error, http::header::HeaderMap};
use actix_service::*;
use actix_web::dev::*;
use futures::{Future, future::{ok, Ready}, FutureExt};
use opentelemetry::{propagation::Extractor, global};
use tracing::{Instrument, Span, field::display};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use opentelemetry::propagation::Extractor;
use tracing_batteries::prelude::*;

pub struct TracingLogger;

Expand Down Expand Up @@ -51,22 +50,22 @@ where
.and_then(|h| h.to_str().ok())
.unwrap_or("");

let span = tracing::info_span!(
let span = info_span!(
"request",
"otel.kind" = "server",
"otel.name" = req.match_pattern().unwrap_or_else(|| req.uri().path().to_string()),
"net.transport" = "IP.TCP",
"net.peer.ip" = %req.connection_info().realip_remote_addr().unwrap_or(""),
"http.target" = %req.uri(),
"http.user_agent" = %user_agent,
"http.status_code" = tracing::field::Empty,
"http.status_code" = EmptyField,
"http.method" = %req.method(),
"http.url" = %req.match_pattern().unwrap_or_else(|| req.path().into()),
"http.headers" = %req.headers().iter().map(|(k, v)| format!("{k}: {v:?}")).collect::<Vec<_>>().join("\n"),
);

// Propagate OpenTelemetry parent span context information
let context = global::get_text_map_propagator(|propagator| {
let context = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderMapExtractor::from(req.headers()))
});

Expand Down
11 changes: 9 additions & 2 deletions src/telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
mod actix_message;
mod actix_web_tracing;
mod session;

pub use actix_web_tracing::TracingLogger;
pub use actix_message::*;
pub use session::Session;

use tracing_batteries::{Session, Sentry, OpenTelemetry};

pub fn setup() -> Session {
Session::new("bender", version!())
.with_battery(Sentry::new("https://[email protected]/1362607"))
.with_battery(OpenTelemetry::new("https://api.honeycomb.io:443")
.with_header("x-honeycomb-team", std::env::var("HONEYCOMB_KEY").unwrap_or_default()))
}
Loading

0 comments on commit fab24cb

Please sign in to comment.