Skip to content

Commit

Permalink
refactor(controlplane): remove prometheus caching, parallelize httpsd…
Browse files Browse the repository at this point in the history
… api
  • Loading branch information
Meshiest committed Nov 26, 2024
1 parent 96ddc5d commit 67e7739
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 109 deletions.
5 changes: 0 additions & 5 deletions crates/controlplane/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ impl Environment {
documents: Vec<ItemDocument>,
state: Arc<GlobalState>,
) -> Result<EnvId, EnvError> {
state.prom_httpsd.lock().await.set_dirty();

let prev_env = state.get_env(env_id);

let mut storage_doc = None;
Expand Down Expand Up @@ -453,9 +451,6 @@ impl Environment {
info!("[env {id}] unloaded storage {}", storage.id);
}

trace!("[env {id}] marking prom as dirty");
state.prom_httpsd.lock().await.set_dirty();

trace!("[env {id}] inventorying agents...");

if let Err(e) = state
Expand Down
142 changes: 44 additions & 98 deletions crates/controlplane/src/server/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{collections::HashMap, fmt::Write};
use std::collections::HashMap;

use axum::{extract::State, response::IntoResponse, routing::get, Json, Router};
use rayon::iter::{ParallelBridge, ParallelIterator};
use serde::Serialize;
use snops_common::state::AgentState;
use tracing::debug;

use super::AppState;
use crate::{cli::PrometheusLocation, env::EnvPeer};
use crate::cli::PrometheusLocation;
pub(super) fn routes() -> Router<AppState> {
Router::new().route("/httpsd", get(get_httpsd))
}
Expand All @@ -17,102 +17,48 @@ pub struct StaticConfig {
pub labels: HashMap<String, String>,
}

/// Caching container for the Prometheus HTTP service discovery response. Marked
/// 'dirty' when environment agents are reallocated.
#[derive(Debug, Clone, Default)]
pub enum HttpsdResponse {
#[default]
Dirty,
Clean(Vec<StaticConfig>),
}

impl HttpsdResponse {
pub fn set_dirty(&mut self) {
*self = Self::Dirty;
}
}

async fn get_httpsd(State(state): State<AppState>) -> impl IntoResponse {
let mut prom_httpsd = state.prom_httpsd.lock().await;

let static_configs = match &*prom_httpsd {
// use the cached response
HttpsdResponse::Clean(static_configs) => static_configs.to_owned(),

// recompute the response and save it
HttpsdResponse::Dirty => {
debug!("httpsd response is dirty, regenerating...");
let mut static_configs = vec![];

for agent in state.pool.iter() {
let Some(mut agent_addr) =
(match (state.cli.prometheus_location, agent.has_label_str("local")) {
// agent is external: serve its external IP
(_, false) => agent
.addrs()
.and_then(|addrs| addrs.external.as_ref())
.map(ToString::to_string),

// prometheus and agent are local: use internal IP
(PrometheusLocation::Internal, true) => agent
.addrs()
.and_then(|addrs| addrs.internal.first())
.map(ToString::to_string),

// prometheus in docker but agent is local: use host.docker.internal
(PrometheusLocation::Docker, true) => {
Some(String::from("host.docker.internal"))
}

// prometheus is external but agent is local: agent might not be forwarded;
// TODO
(PrometheusLocation::External, true) => continue,
})
else {
continue;
};

match agent.state() {
AgentState::Node(env_id, _) => {
// get the environment this agent belongs to
let Some(env) = state.get_env(*env_id) else {
continue;
};

// get the node key that corresponds to this agent
let Some(node_key) =
env.node_peers.get_by_right(&EnvPeer::Internal(agent.id()))
else {
continue;
};

agent_addr
.write_fmt(format_args!(":{}", agent.metrics_port()))
.unwrap();

static_configs.push(StaticConfig {
targets: [agent_addr],
labels: [
("env_id".into(), env_id.to_string()),
("node_key".into(), node_key.to_string()),
]
.into_iter()
.collect(),
});
}

_ => {
// future-proofing; this comment also disables the
// clippy lint
}
}
}

*prom_httpsd = HttpsdResponse::Clean(static_configs.to_owned());

static_configs
}
};
let static_configs = state
.pool
.iter()
.par_bridge()
.filter_map(|agent| {
let agent_addr = (match (state.cli.prometheus_location, agent.has_label_str("local")) {
// agent is external: serve its external IP
(_, false) => agent
.addrs()
.and_then(|addrs| addrs.external.as_ref())
.map(ToString::to_string),

// prometheus and agent are local: use internal IP
(PrometheusLocation::Internal, true) => agent
.addrs()
.and_then(|addrs| addrs.internal.first())
.map(ToString::to_string),

// prometheus in docker but agent is local: use host.docker.internal
(PrometheusLocation::Docker, true) => Some(String::from("host.docker.internal")),

// prometheus is external but agent is local: agent might not be forwarded;
// TODO
(PrometheusLocation::External, true) => return None,
})?;

let AgentState::Node(env_id, node) = agent.state() else {
return None;
};

Some(StaticConfig {
targets: [format!("{agent_addr}:{}", agent.metrics_port())],
labels: [
("env_id".into(), env_id.to_string()),
("node_key".into(), node.node_key.to_string()),
]
.into_iter()
.collect(),
})
})
.collect::<Vec<_>>();

Json(static_configs)
}
6 changes: 2 additions & 4 deletions crates/controlplane/src/state/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use snops_common::{
},
util::OpaqueDebug,
};
use tokio::sync::{Mutex, Semaphore};
use tokio::sync::Semaphore;
use tracing::info;

use super::{
Expand All @@ -26,7 +26,7 @@ use crate::{
env::{cache::NetworkCache, error::EnvRequestError, Environment, PortType},
error::StateError,
schema::storage::{LoadedStorage, STORAGE_DIR},
server::{error::StartError, prometheus::HttpsdResponse},
server::error::StartError,
ReloadHandler,
};

Expand All @@ -45,7 +45,6 @@ pub struct GlobalState {
pub envs: EnvMap,
pub env_network_cache: OpaqueDebug<DashMap<EnvId, NetworkCache>>,

pub prom_httpsd: Mutex<HttpsdResponse>,
pub prometheus: OpaqueDebug<Option<PrometheusClient>>,

pub log_level_handler: ReloadHandler,
Expand Down Expand Up @@ -95,7 +94,6 @@ impl GlobalState {
pool,
storage,
envs: EnvMap::default(),
prom_httpsd: Default::default(),
prometheus: OpaqueDebug(prometheus),
db: OpaqueDebug(db),
env_network_cache: Default::default(),
Expand Down
2 changes: 0 additions & 2 deletions crates/controlplane/src/state/reconcile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ impl GlobalState {
num_reconciliations
);

self.prom_httpsd.lock().await.set_dirty();

if success == num_reconciliations {
Ok(())
} else {
Expand Down

0 comments on commit 67e7739

Please sign in to comment.