diff --git a/Cargo.lock b/Cargo.lock index 67f141648c..c088125107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6110,6 +6110,7 @@ dependencies = [ "si-std", "strum 0.26.3", "telemetry", + "telemetry-utils", "tempfile", "thiserror", "tokio", diff --git a/component/deploy/userdata b/component/deploy/userdata index 2c522fa832..86418e4900 100644 --- a/component/deploy/userdata +++ b/component/deploy/userdata @@ -8,7 +8,7 @@ export INIT_VERSION=$(aws ssm get-parameter --query "Parameter.Value" --output t # prep attached storage mkfs -t xfs /dev/nvme1n1 mkdir -p /layer_cache -mount /dev/nvme1n1 /layer_cache +mount -o logbsize=256k /dev/nvme1n1 /layer_cache # get build metadata METADATA=$(curl -Ls https://artifacts.systeminit.com/${SI_SERVICE}/${SI_VERSION}/omnibus/linux/x86_64/${SI_SERVICE}-${SI_VERSION}-omnibus-linux-x86_64.tar.gz.metadata.json) diff --git a/lib/si-layer-cache/BUCK b/lib/si-layer-cache/BUCK index 7ddfcad449..6e298f4326 100644 --- a/lib/si-layer-cache/BUCK +++ b/lib/si-layer-cache/BUCK @@ -13,6 +13,7 @@ rust_library( "//lib/si-runtime-rs:si-runtime", "//lib/si-std:si-std", "//lib/telemetry-rs:telemetry", + "//lib/telemetry-utils-rs:telemetry-utils", "//third-party/rust:async-trait", "//third-party/rust:blake3", "//third-party/rust:bytes", diff --git a/lib/si-layer-cache/Cargo.toml b/lib/si-layer-cache/Cargo.toml index fdc4e8a3ad..05db895474 100644 --- a/lib/si-layer-cache/Cargo.toml +++ b/lib/si-layer-cache/Cargo.toml @@ -29,6 +29,7 @@ si-runtime = { path = "../../lib/si-runtime-rs" } si-std = { path = "../../lib/si-std" } strum = { workspace = true } telemetry = { path = "../../lib/telemetry-rs" } +telemetry-utils = { path = "../../lib/telemetry-utils-rs" } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/lib/si-layer-cache/src/disk_cache.rs b/lib/si-layer-cache/src/disk_cache.rs index 666e1c9401..a8a90f2806 100644 --- a/lib/si-layer-cache/src/disk_cache.rs +++ b/lib/si-layer-cache/src/disk_cache.rs @@ -6,6 +6,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; use telemetry::prelude::*; +use telemetry_utils::metric; use tokio::select; use tokio_util::sync::CancellationToken; @@ -39,7 +40,9 @@ impl DiskCache { } pub async fn get(&self, key: Arc) -> LayerDbResult> { - Ok(cacache::read(self.write_path.as_ref(), key.clone()).await?) + let data = cacache::read(self.write_path.as_ref(), key.clone()).await?; + metric!(counter.layer_cache.hit.disk = 1); + Ok(data) } pub async fn contains_key(&self, key: Arc) -> LayerDbResult { diff --git a/lib/si-layer-cache/src/layer_cache.rs b/lib/si-layer-cache/src/layer_cache.rs index 48aa89669e..1a0184290a 100644 --- a/lib/si-layer-cache/src/layer_cache.rs +++ b/lib/si-layer-cache/src/layer_cache.rs @@ -53,30 +53,16 @@ where Ok(()) } - #[instrument( - name = "layer_cache.get", - level = "debug", - skip_all, - fields( - si.layer_cache.key = key.as_ref(), - si.layer_cache.layer.hit = Empty, - ), - )] pub async fn get(&self, key: Arc) -> LayerDbResult> { - let span = current_span_for_instrument_at!("debug"); - Ok(match self.memory_cache.get(&key).await { - Some(memory_value) => { - span.record("si.layer_cache.layer.hit", "memory"); - Some(memory_value) - } + Some(memory_value) => Some(memory_value), + None => match self.disk_cache.get(key.clone()).await { Ok(value) => { let deserialized: V = serialize::from_bytes(&value[..])?; self.memory_cache.insert(key, deserialized.clone()).await; - span.record("si.layer_cache.layer.hit", "disk"); Some(deserialized) } Err(_) => match self.pg.get(&key).await? { @@ -88,7 +74,6 @@ where .await; self.spawn_disk_cache_write_vec(key.clone(), value).await?; - span.record("si.layer_cache.layer.hit", "disk"); Some(deserialized) } None => None, diff --git a/lib/si-layer-cache/src/memory_cache.rs b/lib/si-layer-cache/src/memory_cache.rs index 9d83133ace..b84f3986b4 100644 --- a/lib/si-layer-cache/src/memory_cache.rs +++ b/lib/si-layer-cache/src/memory_cache.rs @@ -6,6 +6,8 @@ use moka::future::Cache; use serde::{de::DeserializeOwned, Serialize}; use crate::db::serialize; +use telemetry::tracing::info; +use telemetry_utils::metric; #[derive(Clone, Debug)] enum MaybeDeserialized @@ -47,13 +49,17 @@ where pub async fn get(&self, key: &str) -> Option { match self.cache.get(key).await { - Some(MaybeDeserialized::DeserializedValue(value)) => Some(value), + Some(MaybeDeserialized::DeserializedValue(value)) => { + metric!(counter.layer_cache.hit.memory = 1); + Some(value) + } Some(MaybeDeserialized::RawBytes(bytes)) => { // If we fail to deserialize the raw bytes for some reason, pretend that we never // had the key in the first place, and also remove it from the cache. match serialize::from_bytes_async::(&bytes).await { Ok(deserialized) => { self.insert(key.into(), deserialized.clone()).await; + metric!(counter.layer_cache.hit.memory = 1); Some(deserialized) } Err(e) => { diff --git a/lib/si-layer-cache/src/pg.rs b/lib/si-layer-cache/src/pg.rs index fb58c20f47..b232ef841e 100644 --- a/lib/si-layer-cache/src/pg.rs +++ b/lib/si-layer-cache/src/pg.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; use si_data_pg::{postgres_types::ToSql, PgPool, PgPoolConfig, PgRow}; +use telemetry::tracing::info; +use telemetry_utils::metric; use crate::error::LayerDbResult; @@ -64,7 +66,10 @@ impl PgLayer { let maybe_row = client.query_opt(&self.get_value_query, &[&key]).await?; match maybe_row { - Some(row) => Ok(Some(row.get("value"))), + Some(row) => { + metric!(counter.layer_cache.hit.pg = 1); + Ok(Some(row.get("value"))) + } None => Ok(None), } } @@ -94,6 +99,7 @@ impl PgLayer { .query(&self.get_value_many_query, &[&key_refs]) .await? { + metric!(counter.layer_cache.hit.pg = 1); result.insert( row.get::<&str, String>("key").to_owned(), row.get::<&str, Vec>("value"),