Skip to content

Commit

Permalink
Merge pull request #4861 from systeminit/layer_metrics
Browse files Browse the repository at this point in the history
feat: add metrics to layer cache hits
  • Loading branch information
sprutton1 authored Oct 23, 2024
2 parents 08b258e + 6a2ea81 commit dd229c7
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion component/deploy/userdata
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export INIT_VERSION=$(aws ssm get-parameter --query "Parameter.Value" --output t
# prep attached storage
mkfs -t xfs /dev/nvme1n1
mkdir -p /layer_cache
mount /dev/nvme1n1 /layer_cache
mount -o logbsize=256k /dev/nvme1n1 /layer_cache

# get build metadata
METADATA=$(curl -Ls https://artifacts.systeminit.com/${SI_SERVICE}/${SI_VERSION}/omnibus/linux/x86_64/${SI_SERVICE}-${SI_VERSION}-omnibus-linux-x86_64.tar.gz.metadata.json)
Expand Down
1 change: 1 addition & 0 deletions lib/si-layer-cache/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rust_library(
"//lib/si-runtime-rs:si-runtime",
"//lib/si-std:si-std",
"//lib/telemetry-rs:telemetry",
"//lib/telemetry-utils-rs:telemetry-utils",
"//third-party/rust:async-trait",
"//third-party/rust:blake3",
"//third-party/rust:bytes",
Expand Down
1 change: 1 addition & 0 deletions lib/si-layer-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ si-runtime = { path = "../../lib/si-runtime-rs" }
si-std = { path = "../../lib/si-std" }
strum = { workspace = true }
telemetry = { path = "../../lib/telemetry-rs" }
telemetry-utils = { path = "../../lib/telemetry-utils-rs" }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
5 changes: 4 additions & 1 deletion lib/si-layer-cache/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use serde::{Deserialize, Serialize};
use telemetry::prelude::*;
use telemetry_utils::metric;
use tokio::select;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -39,7 +40,9 @@ impl DiskCache {
}

pub async fn get(&self, key: Arc<str>) -> LayerDbResult<Vec<u8>> {
Ok(cacache::read(self.write_path.as_ref(), key.clone()).await?)
let data = cacache::read(self.write_path.as_ref(), key.clone()).await?;
metric!(counter.layer_cache.hit.disk = 1);
Ok(data)
}

pub async fn contains_key(&self, key: Arc<str>) -> LayerDbResult<bool> {
Expand Down
19 changes: 2 additions & 17 deletions lib/si-layer-cache/src/layer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,16 @@ where
Ok(())
}

#[instrument(
name = "layer_cache.get",
level = "debug",
skip_all,
fields(
si.layer_cache.key = key.as_ref(),
si.layer_cache.layer.hit = Empty,
),
)]
pub async fn get(&self, key: Arc<str>) -> LayerDbResult<Option<V>> {
let span = current_span_for_instrument_at!("debug");

Ok(match self.memory_cache.get(&key).await {
Some(memory_value) => {
span.record("si.layer_cache.layer.hit", "memory");
Some(memory_value)
}
Some(memory_value) => Some(memory_value),

None => match self.disk_cache.get(key.clone()).await {
Ok(value) => {
let deserialized: V = serialize::from_bytes(&value[..])?;

self.memory_cache.insert(key, deserialized.clone()).await;

span.record("si.layer_cache.layer.hit", "disk");
Some(deserialized)
}
Err(_) => match self.pg.get(&key).await? {
Expand All @@ -88,7 +74,6 @@ where
.await;
self.spawn_disk_cache_write_vec(key.clone(), value).await?;

span.record("si.layer_cache.layer.hit", "disk");
Some(deserialized)
}
None => None,
Expand Down
8 changes: 7 additions & 1 deletion lib/si-layer-cache/src/memory_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use moka::future::Cache;
use serde::{de::DeserializeOwned, Serialize};

use crate::db::serialize;
use telemetry::tracing::info;
use telemetry_utils::metric;

#[derive(Clone, Debug)]
enum MaybeDeserialized<V>
Expand Down Expand Up @@ -47,13 +49,17 @@ where

pub async fn get(&self, key: &str) -> Option<V> {
match self.cache.get(key).await {
Some(MaybeDeserialized::DeserializedValue(value)) => Some(value),
Some(MaybeDeserialized::DeserializedValue(value)) => {
metric!(counter.layer_cache.hit.memory = 1);
Some(value)
}
Some(MaybeDeserialized::RawBytes(bytes)) => {
// If we fail to deserialize the raw bytes for some reason, pretend that we never
// had the key in the first place, and also remove it from the cache.
match serialize::from_bytes_async::<V>(&bytes).await {
Ok(deserialized) => {
self.insert(key.into(), deserialized.clone()).await;
metric!(counter.layer_cache.hit.memory = 1);
Some(deserialized)
}
Err(e) => {
Expand Down
8 changes: 7 additions & 1 deletion lib/si-layer-cache/src/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::collections::HashMap;
use std::sync::Arc;

use si_data_pg::{postgres_types::ToSql, PgPool, PgPoolConfig, PgRow};
use telemetry::tracing::info;
use telemetry_utils::metric;

use crate::error::LayerDbResult;

Expand Down Expand Up @@ -64,7 +66,10 @@ impl PgLayer {
let maybe_row = client.query_opt(&self.get_value_query, &[&key]).await?;

match maybe_row {
Some(row) => Ok(Some(row.get("value"))),
Some(row) => {
metric!(counter.layer_cache.hit.pg = 1);
Ok(Some(row.get("value")))
}
None => Ok(None),
}
}
Expand Down Expand Up @@ -94,6 +99,7 @@ impl PgLayer {
.query(&self.get_value_many_query, &[&key_refs])
.await?
{
metric!(counter.layer_cache.hit.pg = 1);
result.insert(
row.get::<&str, String>("key").to_owned(),
row.get::<&str, Vec<u8>>("value"),
Expand Down

0 comments on commit dd229c7

Please sign in to comment.