From dc14b9e59f6586bf1bb35f71436950ea07d85424 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Mon, 16 Sep 2024 21:10:34 +0000 Subject: [PATCH 1/2] Attempt to handle "kstat-based metrics produce samples from the 1980's" Closes https://github.com/oxidecomputer/omicron/issues/5899. Instead of checking for dates or anything based on the data sampled, this update starts collecting kstat samples (i.e. being interested in them) once the sled agent is synchronized with NTP. We leverage the metrics manager associated with the agent to now look for a new message and update the tracked links we've added/tracked thus far. --- oximeter/instruments/src/kstat/link.rs | 135 ++++++++++++++++---- oximeter/instruments/src/kstat/mod.rs | 1 + oximeter/instruments/src/kstat/sampler.rs | 118 ++++++++++++++++-- sled-agent/src/metrics.rs | 142 ++++++++++++++++++---- sled-agent/src/services.rs | 138 ++++++++++++++++----- 5 files changed, 446 insertions(+), 88 deletions(-) diff --git a/oximeter/instruments/src/kstat/link.rs b/oximeter/instruments/src/kstat/link.rs index 4d045131da..3894665062 100644 --- a/oximeter/instruments/src/kstat/link.rs +++ b/oximeter/instruments/src/kstat/link.rs @@ -15,20 +15,22 @@ use kstat_rs::Data; use kstat_rs::Kstat; use kstat_rs::Named; use oximeter::types::Cumulative; +use oximeter::FieldType; +use oximeter::FieldValue; use oximeter::Sample; +use oximeter::Target; +use uuid::Uuid; oximeter::use_timeseries!("sled-data-link.toml"); +pub use self::sled_data_link::SledDataLink as SledDataLinkTarget; /// Helper function to extract the same kstat metrics from all link targets. -fn extract_link_kstats( - target: &T, +fn extract_link_kstats( + target: &SledDataLink, named_data: &Named, creation_time: DateTime, snapshot_time: DateTime, -) -> Option> -where - T: KstatTarget, -{ +) -> Option> { let Named { name, value } = named_data; if *name == "rbytes64" { Some(value.as_u64().and_then(|x| { @@ -83,23 +85,52 @@ where } } -// Helper trait for defining `KstatTarget` for all the link-based stats. -trait LinkKstatTarget: KstatTarget { - fn link_name(&self) -> &str; +#[derive(Clone, Debug)] +pub struct SledDataLink { + /// The target for this link. + pub target: SledDataLinkTarget, + /// Flag indicating whether the sled associated with this link is synced. + pub synced: bool, } -impl LinkKstatTarget for sled_data_link::SledDataLink { - fn link_name(&self) -> &str { - &self.link_name +impl SledDataLink { + /// Create a new `SledDataLink` with the given target and synchronization + /// flag. + pub fn new(target: SledDataLinkTarget, synced: bool) -> Self { + Self { target, synced } + } + + /// Create a new `SledDataLink` with the given target and no synchronization + /// flag set to `false` by default. + pub fn fresh(target: SledDataLinkTarget) -> Self { + Self { target, synced: false } + } + + /// Return the name of the link. + pub fn link_name(&self) -> &str { + &self.target.link_name + } + + /// Return the zone name of the link. + pub fn zone_name(&self) -> &str { + &self.target.zone_name + } + + /// Return the kind of link. + pub fn kind(&self) -> &str { + &self.target.kind + } + + /// Return the idenity of the sled. + pub fn sled_id(&self) -> Uuid { + self.target.sled_id } } -impl KstatTarget for T -where - T: LinkKstatTarget, -{ +impl KstatTarget for SledDataLink { fn interested(&self, kstat: &Kstat<'_>) -> bool { - kstat.ks_module == "link" + self.synced + && kstat.ks_module == "link" && kstat.ks_instance == 0 && kstat.ks_name == self.link_name() } @@ -124,6 +155,25 @@ where } } +// NOTE: Delegate to the inner target type for this implementation. +impl Target for SledDataLink { + fn name(&self) -> &'static str { + self.target.name() + } + + fn field_names(&self) -> &'static [&'static str] { + self.target.field_names() + } + + fn field_types(&self) -> Vec { + self.target.field_types() + } + + fn field_values(&self) -> Vec { + self.target.field_values() + } +} + #[cfg(all(test, target_os = "illumos"))] mod tests { use super::*; @@ -225,10 +275,40 @@ mod tests { } } + #[test] + fn test_kstat_interested() { + let link = TestEtherstub::new(); + let target = SledDataLinkTarget { + rack_id: RACK_ID, + sled_id: SLED_ID, + sled_serial: SLED_SERIAL.into(), + link_name: link.name.clone().into(), + kind: KIND.into(), + sled_model: SLED_MODEL.into(), + sled_revision: SLED_REVISION, + zone_name: ZONE_NAME.into(), + }; + // not with a synced sled (by default) + let mut dl = SledDataLink::fresh(target); + + let ctl = Ctl::new().unwrap(); + let ctl = ctl.update().unwrap(); + let kstat = ctl + .filter(Some("link"), Some(0), Some(link.name.as_str())) + .next() + .unwrap(); + + assert!(!dl.interested(&kstat)); + + // with a synced sled + dl.synced = true; + assert!(dl.interested(&kstat)); + } + #[test] fn test_sled_datalink() { let link = TestEtherstub::new(); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -238,6 +318,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let ctl = Ctl::new().unwrap(); let ctl = ctl.update().unwrap(); let mut kstat = ctl @@ -254,7 +335,7 @@ mod tests { async fn test_kstat_sampler() { let mut sampler = KstatSampler::new(&test_logger()).unwrap(); let link = TestEtherstub::new(); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -264,6 +345,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let details = CollectionDetails::never(Duration::from_secs(1)); let id = sampler.add_target(dl, details).await.unwrap(); let samples: Vec<_> = sampler.produce().unwrap().collect(); @@ -304,7 +386,7 @@ mod tests { let mut sampler = KstatSampler::with_sample_limit(&test_logger(), limit).unwrap(); let link = TestEtherstub::new(); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -314,6 +396,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let details = CollectionDetails::never(Duration::from_secs(1)); sampler.add_target(dl, details).await.unwrap(); let samples: Vec<_> = sampler.produce().unwrap().collect(); @@ -373,7 +456,7 @@ mod tests { let mut sampler = KstatSampler::new(&log).unwrap(); let link = TestEtherstub::new(); info!(log, "created test etherstub"; "name" => &link.name); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -383,6 +466,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); let details = CollectionDetails::duration(collection_interval, expiry); @@ -432,7 +516,7 @@ mod tests { let mut sampler = KstatSampler::new(&log).unwrap(); let link = TestEtherstub::new(); info!(log, "created test etherstub"; "name" => &link.name); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -442,6 +526,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); let details = CollectionDetails::duration(collection_interval, expiry); @@ -483,7 +568,7 @@ mod tests { name: link.name.clone(), }; info!(log, "created test etherstub"; "name" => &link.name); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -493,6 +578,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); let details = CollectionDetails::duration(collection_interval, expiry); @@ -532,7 +618,7 @@ mod tests { name: link.name.clone(), }; info!(log, "created test etherstub"; "name" => &link.name); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -542,6 +628,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); let details = CollectionDetails::duration(collection_interval, expiry); diff --git a/oximeter/instruments/src/kstat/mod.rs b/oximeter/instruments/src/kstat/mod.rs index a5020b9b61..7b0082a396 100644 --- a/oximeter/instruments/src/kstat/mod.rs +++ b/oximeter/instruments/src/kstat/mod.rs @@ -91,6 +91,7 @@ use std::time::Duration; pub mod link; mod sampler; +pub use link::SledDataLink; pub use sampler::CollectionDetails; pub use sampler::ExpirationBehavior; pub use sampler::KstatSampler; diff --git a/oximeter/instruments/src/kstat/sampler.rs b/oximeter/instruments/src/kstat/sampler.rs index 92466758c1..c6101ac06a 100644 --- a/oximeter/instruments/src/kstat/sampler.rs +++ b/oximeter/instruments/src/kstat/sampler.rs @@ -173,6 +173,12 @@ enum Request { id: TargetId, reply_tx: oneshot::Sender>, }, + /// Update a target. + UpdateTarget { + target: Box, + details: CollectionDetails, + reply_tx: oneshot::Sender>, + }, /// Remove a target. RemoveTarget { id: TargetId, reply_tx: oneshot::Sender> }, /// Return the creation times of all tracked / extant kstats. @@ -550,6 +556,45 @@ impl KstatSamplerWorker { } } } + Request::UpdateTarget { target, details, reply_tx } => { + match self.update_target(target, details) { + Ok(id) => { + let timeout = YieldIdAfter::new(id, details.interval); + sample_timeouts.push(timeout); + trace!( + self.log, + "updated target with timeout"; + "id" => ?id, + "details" => ?details, + ); + match reply_tx.send(Ok(id)) { + Ok(_) => trace!(self.log, "sent reply"), + Err(e) => error!( + self.log, + "failed to send reply"; + "id" => ?id, + "error" => ?e, + ) + } + } + Err(e) => { + error!( + self.log, + "failed to update target"; + "error" => ?e, + ); + match reply_tx.send(Err(e)) { + Ok(_) => trace!(self.log, "sent reply"), + Err(e) => error!( + self.log, + "failed to send reply"; + "error" => ?e, + ) + } + } + } + + } Request::RemoveTarget { id, reply_tx } => { self.targets.remove(&id); if let Some(remaining_samples) = self.samples.lock().unwrap().remove(&id) { @@ -966,6 +1011,52 @@ impl KstatSamplerWorker { } None => {} } + + self.insert_target(id, target, details) + } + + fn update_target( + &mut self, + target: Box, + details: CollectionDetails, + ) -> Result { + let id = hash_target(&*target); + match self.targets.get(&id) { + // If the target is already expired, we'll replace it with the new + // target and start sampling it again. + Some(SampledObject::Expired(e)) => { + warn!( + self.log, + "replacing expired kstat target"; + "id" => ?id, + "expiration_reason" => ?e.reason, + "error" => ?e.error, + "expired_at" => ?e.expired_at, + ); + } + Some(_) => {} + None => return Err(Error::NoSuchTarget), + } + + self.insert_target(id, target, details) + } + + fn update_chain(&mut self) -> Result<(), Error> { + let new_ctl = match self.ctl.take() { + None => Ctl::new(), + Some(old) => old.update(), + } + .map_err(Error::Kstat)?; + let _ = self.ctl.insert(new_ctl); + Ok(()) + } + + fn insert_target( + &mut self, + id: TargetId, + target: Box, + details: CollectionDetails, + ) -> Result { self.ensure_creation_times_for_target(&*target)?; let item = SampledKstat { target, @@ -995,17 +1086,8 @@ impl KstatSamplerWorker { "n_samples" => n, ), } - Ok(id) - } - fn update_chain(&mut self) -> Result<(), Error> { - let new_ctl = match self.ctl.take() { - None => Ctl::new(), - Some(old) => old.update(), - } - .map_err(Error::Kstat)?; - let _ = self.ctl.insert(new_ctl); - Ok(()) + Ok(id) } } @@ -1094,6 +1176,22 @@ impl KstatSampler { reply_rx.await.map_err(|_| Error::RecvError)? } + /// Update the details for a target. + pub async fn update_target( + &self, + target: impl KstatTarget, + details: CollectionDetails, + ) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + let request = Request::UpdateTarget { + target: Box::new(target), + details, + reply_tx, + }; + self.outbox.send(request).await.map_err(|_| Error::SendError)?; + reply_rx.await.map_err(|_| Error::RecvError)? + } + /// Fetch the status for a target. /// /// If the target is being collected normally, then `TargetStatus::Ok` is diff --git a/sled-agent/src/metrics.rs b/sled-agent/src/metrics.rs index 1039302248..5a7f21a8e0 100644 --- a/sled-agent/src/metrics.rs +++ b/sled-agent/src/metrics.rs @@ -8,7 +8,8 @@ use illumos_utils::running_zone::RunningZone; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::api::internal::nexus::ProducerKind; use omicron_common::api::internal::shared::SledIdentifiers; -use oximeter_instruments::kstat::link::sled_data_link::SledDataLink; +use oximeter_instruments::kstat::link::SledDataLink; +use oximeter_instruments::kstat::link::SledDataLinkTarget; use oximeter_instruments::kstat::CollectionDetails; use oximeter_instruments::kstat::Error as KstatError; use oximeter_instruments::kstat::KstatSampler; @@ -24,6 +25,8 @@ use std::time::Duration; use tokio::sync::mpsc; use uuid::Uuid; +type TrackedLinks = HashMap; + /// The interval on which we ask `oximeter` to poll us for metric data. const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(30); @@ -84,6 +87,8 @@ pub(crate) enum Message { TrackOptePort { zone_name: String, name: String }, /// Stop tracking the named OPTE port. UntrackOptePort { name: String }, + /// Notify the task that a sled has been synced. + SyncedSled { sled_id: Uuid, synced: bool }, // TODO-completeness: We will probably want to track other kinds of // statistics here too. For example, we could send messages when a zone is // created / destroyed to track zonestats; we might also want to support @@ -100,6 +105,39 @@ impl LinkKind { const OPTE: &'static str = "opte"; } +struct Target { + id: TargetId, + sled_datalink: SledDataLink, +} + +impl Target { + /// Create a new target. + fn new(id: TargetId, sled_datalink: SledDataLink) -> Self { + Self { id, sled_datalink } + } + + /// Return the target id associated with the kstat sampler. + fn id(&self) -> TargetId { + self.id + } + + /// Return the sled datalink's Uuid identifier. + fn sled_id(&self) -> Uuid { + self.sled_datalink.target.sled_id + } +} + +fn get_collection_details(kind: &str) -> CollectionDetails { + if is_transient_link(kind) { + CollectionDetails::duration( + LINK_SAMPLE_INTERVAL, + TRANSIENT_LINK_EXPIRATION_INTERVAL, + ) + } else { + CollectionDetails::never(LINK_SAMPLE_INTERVAL) + } +} + /// The main task used to collect and publish sled-agent metrics. async fn metrics_task( sled_identifiers: SledIdentifiers, @@ -108,7 +146,8 @@ async fn metrics_task( log: Logger, mut rx: mpsc::Receiver, ) { - let mut tracked_links: HashMap = HashMap::new(); + let mut tracked_links: TrackedLinks = HashMap::new(); + let mut sled_synced: bool = false; // Main polling loop, waiting for messages from other pieces of the code to // track various statistics. @@ -118,9 +157,10 @@ async fn metrics_task( return; }; trace!(log, "received message"; "message" => ?message); + match message { Message::TrackPhysical { zone_name, name } => { - let link = SledDataLink { + let target = SledDataLinkTarget { kind: LinkKind::PHYSICAL.into(), link_name: name.into(), rack_id: sled_identifiers.rack_id, @@ -130,11 +170,12 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; + let link = SledDataLink::new(target, sled_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } Message::TrackVnic { zone_name, name } => { - let link = SledDataLink { + let target = SledDataLinkTarget { kind: LinkKind::VNIC.into(), link_name: name.into(), rack_id: sled_identifiers.rack_id, @@ -144,6 +185,7 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; + let link = SledDataLink::new(target, sled_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } @@ -152,7 +194,7 @@ async fn metrics_task( .await } Message::TrackOptePort { zone_name, name } => { - let link = SledDataLink { + let target = SledDataLinkTarget { kind: LinkKind::OPTE.into(), link_name: name.into(), rack_id: sled_identifiers.rack_id, @@ -162,6 +204,7 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; + let link = SledDataLink::new(target, sled_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } @@ -169,6 +212,18 @@ async fn metrics_task( remove_datalink(&log, &mut tracked_links, &kstat_sampler, name) .await } + Message::SyncedSled { sled_id, synced } => { + if sled_id == sled_identifiers.sled_id && synced { + sled_synced = true; + sync_sled_datalinks( + &log, + &mut tracked_links, + &kstat_sampler, + sled_id, + ) + .await + } + } } } } @@ -176,12 +231,12 @@ async fn metrics_task( /// Stop tracking a link by name. async fn remove_datalink( log: &Logger, - tracked_links: &mut HashMap, + tracked_links: &mut HashMap, kstat_sampler: &KstatSampler, name: String, ) { match tracked_links.remove(&name) { - Some(id) => match kstat_sampler.remove_target(id).await { + Some(target) => match kstat_sampler.remove_target(target.id()).await { Ok(_) => { debug!( log, @@ -213,32 +268,24 @@ async fn remove_datalink( /// Start tracking a new link of the specified kind. async fn add_datalink( log: &Logger, - tracked_links: &mut HashMap, + tracked_links: &mut HashMap, kstat_sampler: &KstatSampler, link: SledDataLink, ) { - match tracked_links.entry(link.link_name.to_string()) { + match tracked_links.entry(link.link_name().to_string()) { Entry::Vacant(entry) => { - let details = if is_transient_link(&link.kind) { - CollectionDetails::duration( - LINK_SAMPLE_INTERVAL, - TRANSIENT_LINK_EXPIRATION_INTERVAL, - ) - } else { - CollectionDetails::never(LINK_SAMPLE_INTERVAL) - }; - let kind = link.kind.clone(); - let zone_name = link.zone_name.clone(); - match kstat_sampler.add_target(link, details).await { + let details = get_collection_details(link.kind()); + let link_to_add = link.clone(); + match kstat_sampler.add_target(link_to_add, details).await { Ok(id) => { debug!( log, "Added new link to kstat sampler"; "link_name" => entry.key(), - "link_kind" => %kind, - "zone_name" => %zone_name, + "link_kind" => %link.kind(), + "zone_name" => %link.zone_name(), ); - entry.insert(id); + entry.insert(Target::new(id, link)); } Err(err) => { error!( @@ -246,8 +293,8 @@ async fn add_datalink( "Failed to add VNIC to kstat sampler, \ no metrics will be collected for it"; "link_name" => entry.key(), - "link_kind" => %kind, - "zone_name" => %zone_name, + "link_kind" => %link.kind(), + "zone_name" => %link.zone_name(), "error" => ?err, ); } @@ -264,6 +311,41 @@ async fn add_datalink( } } +/// Update tracked links when a sled is synced. +async fn sync_sled_datalinks( + log: &Logger, + tracked_links: &mut TrackedLinks, + kstat_sampler: &KstatSampler, + sled_id: Uuid, +) { + for (link_name, target) in tracked_links.iter_mut() { + if target.sled_id() == sled_id { + target.sled_datalink.synced = true; + let details = get_collection_details(target.sled_datalink.kind()); + match kstat_sampler + .update_target(target.sled_datalink.clone(), details) + .await + { + Ok(_) => { + debug!( + log, + "Updated link already tracked by kstat sampler"; + "link_name" => link_name, + ); + } + Err(err) => { + error!( + log, + "Failed to update link already tracked by kstat sampler"; + "link_name" => link_name, + "error" => ?err, + ); + } + } + } + } +} + /// Return true if this is considered a transient link, from the perspective of /// its expiration behavior. fn is_transient_link(kind: &str) -> bool { @@ -278,7 +360,7 @@ fn is_transient_link(kind: &str) -> bool { /// `MetricsHandle`. #[derive(Debug)] pub struct MetricsManager { - /// Receive-side of a channel used to pass the background task messages. + /// Sender-side of a channel used to pass the background task messages. #[cfg_attr(test, allow(dead_code))] tx: mpsc::Sender, /// The background task itself. @@ -438,6 +520,14 @@ impl MetricsRequestQueue { } success } + + /// Notify the task that a sled's synced value has changed. + /// + /// Typically, this is used to notify the task that a sled has *indeed* been + /// synced with NTP. + pub async fn synced_sled(&self, sled_id: Uuid, synced: bool) -> bool { + self.0.send(Message::SyncedSled { sled_id, synced }).await.is_ok() + } } /// Start a metric producer server. diff --git a/sled-agent/src/services.rs b/sled-agent/src/services.rs index f386fd1d0f..c31ef4e923 100644 --- a/sled-agent/src/services.rs +++ b/sled-agent/src/services.rs @@ -954,6 +954,31 @@ impl ServiceManager { self.inner.sled_mode } + /// Returns the sled's identifier + fn sled_id(&self) -> Uuid { + self.inner + .sled_info + .get() + .expect("sled agent not started") + .config + .sled_id + } + + /// Returns the metrics queue for the sled agent if it is running. + fn maybe_metrics_queue(&self) -> Option<&MetricsRequestQueue> { + self.inner.sled_info.get().map(|info| &info.metrics_queue) + } + + /// Returns the metrics queue for the sled agent. + fn metrics_queue(&self) -> &MetricsRequestQueue { + &self + .inner + .sled_info + .get() + .expect("Sled agent should have started") + .metrics_queue + } + // Advertise the /64 prefix of `address`, unless we already have. // // This method only blocks long enough to check our HashSet of @@ -3105,9 +3130,7 @@ impl ServiceManager { // point. The only exception is the switch zone, during bootstrapping // but before we've either run RSS or unlocked the rack. In both those // cases, we have a `StartSledAgentRequest`, and so a metrics queue. - if let Some(queue) = - self.inner.sled_info.get().map(|sa| &sa.metrics_queue) - { + if let Some(queue) = self.maybe_metrics_queue() { if !queue.track_zone_links(&running_zone).await { error!( self.inner.log, @@ -3484,9 +3507,7 @@ impl ServiceManager { }; // Ensure that the sled agent's metrics task is not tracking the zone's // VNICs or OPTE ports. - if let Some(queue) = - self.inner.sled_info.get().map(|sa| &sa.metrics_queue) - { + if let Some(queue) = self.maybe_metrics_queue() { queue.untrack_zone_links(&zone.runtime).await; } debug!( @@ -3712,17 +3733,8 @@ impl ServiceManager { Ok(()) } + /// Adjust the system boot time to the latest boot time of all zones. pub fn boottime_rewrite(&self) { - if self - .inner - .time_synced - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { - // Already done. - return; - } - // Call out to the 'tmpx' utility program which will rewrite the wtmpx // and utmpx databases in every zone, including the global zone, to // reflect the adjusted system boot time. @@ -3754,7 +3766,7 @@ impl ServiceManager { if skip_timesync { info!(self.inner.log, "Configured to skip timesync checks"); - self.boottime_rewrite(); + self.if_timesynced().await; return Ok(TimeSync { sync: true, ref_id: 0, @@ -3809,7 +3821,7 @@ impl ServiceManager { && correction.abs() <= 0.05; if sync { - self.boottime_rewrite(); + self.if_timesynced().await; } Ok(TimeSync { @@ -3831,6 +3843,33 @@ impl ServiceManager { } } + /// Check if the system time is synchronized and if so, execute the + /// boottime_rewrite function and send messages. + async fn if_timesynced(&self) { + match self.inner.time_synced.compare_exchange( + false, + true, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Err(_) => { + debug!(self.inner.log, "Time was already synchronized"); + } + Ok(_) => { + debug!(self.inner.log, "Time is now synchronized"); + // We only want to rewrite the boot time once, so we do it here + // when we know the time is synchronized. + self.boottime_rewrite(); + + // We expect to have a metrics queue by this point, so + // we can safely send a message on it to say the sled has + // been synchronized. + let queue = self.metrics_queue(); + queue.synced_sled(self.sled_id(), true).await; + } + } + } + /// Ensures that a switch zone exists with the provided IP adddress. pub async fn activate_switch( &self, @@ -4867,7 +4906,7 @@ mod test { // Also send a message to the metrics task that the VNIC has been // deleted. - let queue = &mgr.inner.sled_info.get().unwrap().metrics_queue; + let queue = mgr.metrics_queue(); for zone in mgr.inner.zones.lock().await.values() { queue.untrack_zone_links(&zone.runtime).await; } @@ -5047,8 +5086,22 @@ mod test { assert_eq!(found.zones.len(), 1); assert_eq!(found.zones[0].id, id); - // Check that we received a message about the zone's VNIC. - let message = tokio::time::timeout( + // First check that we received the synced sled notification + let synced_message = tokio::time::timeout( + LINK_NOTIFICATION_TIMEOUT, + metrics_rx.recv(), + ).await.expect("Should have received a message about the sled being synced within the timeout") + .expect("Should have received a message about the sled being synced"); + assert_eq!( + synced_message, + metrics::Message::SyncedSled { + sled_id: mgr.sled_id(), + synced: true + }, + ); + + // Then, check that we received a message about the zone's VNIC. + let vnic_message = tokio::time::timeout( LINK_NOTIFICATION_TIMEOUT, metrics_rx.recv(), ) @@ -5059,7 +5112,7 @@ mod test { .expect("Should have received a message about the zone's VNIC"); let zone_name = format!("oxz_ntp_{}", id); assert_eq!( - message, + vnic_message, metrics::Message::TrackVnic { zone_name, name: "oxControlService0".into() @@ -5189,10 +5242,24 @@ mod test { assert_eq!(found.zones.len(), 1); assert_eq!(found.zones[0].id, id); + // First, we will get a message about the sled being synced. + let synced_message = tokio::time::timeout( + LINK_NOTIFICATION_TIMEOUT, + metrics_rx.recv(), + ).await.expect("Should have received a message about the sled being synced within the timeout") + .expect("Should have received a message about the sled being synced"); + assert_eq!( + synced_message, + metrics::Message::SyncedSled { + sled_id: mgr.sled_id(), + synced: true + } + ); + // In this case, the manager creates the zone once, and then "ensuring" // it a second time is a no-op. So we simply expect the same message // sequence as starting a zone for the first time. - let message = tokio::time::timeout( + let vnic_message = tokio::time::timeout( LINK_NOTIFICATION_TIMEOUT, metrics_rx.recv(), ) @@ -5203,7 +5270,7 @@ mod test { .expect("Should have received a message about the zone's VNIC"); let zone_name = format!("oxz_ntp_{}", id); assert_eq!( - message, + vnic_message, metrics::Message::TrackVnic { zone_name, name: "oxControlService0".into() @@ -5247,21 +5314,36 @@ mod test { String::from(test_config.config_dir.path().as_str()), ) .await; + + let sled_id = mgr.sled_id(); drop_service_manager(mgr).await; + // First, we will get a message about the sled being synced. + let synced_message = tokio::time::timeout( + LINK_NOTIFICATION_TIMEOUT, + metrics_rx.recv(), + ).await.expect("Should have received a message about the sled being synced within the timeout") + .expect("Should have received a message about the sled being synced"); + assert_eq!( + synced_message, + metrics::Message::SyncedSled { sled_id, synced: true } + ); + // Check that we received a message about the zone's VNIC. Since the // manager is being dropped, it should also send a message about the // VNIC being deleted. let zone_name = format!("oxz_ntp_{}", id); - for expected_message in [ + for expected_vnic_message in [ metrics::Message::TrackVnic { zone_name, name: "oxControlService0".into(), }, metrics::Message::UntrackVnic { name: "oxControlService0".into() }, ] { - println!("Expecting message from manager: {expected_message:#?}"); - let message = tokio::time::timeout( + println!( + "Expecting message from manager: {expected_vnic_message:#?}" + ); + let vnic_message = tokio::time::timeout( LINK_NOTIFICATION_TIMEOUT, metrics_rx.recv(), ) @@ -5270,7 +5352,7 @@ mod test { "Should have received a message about the zone's VNIC within the timeout" ) .expect("Should have received a message about the zone's VNIC"); - assert_eq!(message, expected_message,); + assert_eq!(vnic_message, expected_vnic_message,); } // Note that the manager has been dropped, so we should get // disconnected, not empty. From 4a3b8cc10a6d6da8274ad3142b31f60a27175e26 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Wed, 18 Sep 2024 13:28:46 +0000 Subject: [PATCH 2/2] [review] address review comments --- oximeter/instruments/src/kstat/link.rs | 23 ++++--- sled-agent/src/metrics.rs | 93 ++++++++++---------------- sled-agent/src/services.rs | 80 ++++++++++------------ 3 files changed, 83 insertions(+), 113 deletions(-) diff --git a/oximeter/instruments/src/kstat/link.rs b/oximeter/instruments/src/kstat/link.rs index 3894665062..18f05f5cfe 100644 --- a/oximeter/instruments/src/kstat/link.rs +++ b/oximeter/instruments/src/kstat/link.rs @@ -89,21 +89,22 @@ fn extract_link_kstats( pub struct SledDataLink { /// The target for this link. pub target: SledDataLinkTarget, - /// Flag indicating whether the sled associated with this link is synced. - pub synced: bool, + /// Flag indicating whether the sled associated with this link is synced with + /// NTP. + pub time_synced: bool, } impl SledDataLink { /// Create a new `SledDataLink` with the given target and synchronization /// flag. - pub fn new(target: SledDataLinkTarget, synced: bool) -> Self { - Self { target, synced } + pub fn new(target: SledDataLinkTarget, time_synced: bool) -> Self { + Self { target, time_synced } } - /// Create a new `SledDataLink` with the given target and no synchronization - /// flag set to `false` by default. - pub fn fresh(target: SledDataLinkTarget) -> Self { - Self { target, synced: false } + /// Create a new `SledDataLink` with the given target . + #[cfg(test)] + pub fn unsynced(target: SledDataLinkTarget) -> Self { + Self { target, time_synced: false } } /// Return the name of the link. @@ -129,7 +130,7 @@ impl SledDataLink { impl KstatTarget for SledDataLink { fn interested(&self, kstat: &Kstat<'_>) -> bool { - self.synced + self.time_synced && kstat.ks_module == "link" && kstat.ks_instance == 0 && kstat.ks_name == self.link_name() @@ -289,7 +290,7 @@ mod tests { zone_name: ZONE_NAME.into(), }; // not with a synced sled (by default) - let mut dl = SledDataLink::fresh(target); + let mut dl = SledDataLink::unsynced(target); let ctl = Ctl::new().unwrap(); let ctl = ctl.update().unwrap(); @@ -301,7 +302,7 @@ mod tests { assert!(!dl.interested(&kstat)); // with a synced sled - dl.synced = true; + dl.time_synced = true; assert!(dl.interested(&kstat)); } diff --git a/sled-agent/src/metrics.rs b/sled-agent/src/metrics.rs index 5a7f21a8e0..ef9edd9a07 100644 --- a/sled-agent/src/metrics.rs +++ b/sled-agent/src/metrics.rs @@ -87,8 +87,8 @@ pub(crate) enum Message { TrackOptePort { zone_name: String, name: String }, /// Stop tracking the named OPTE port. UntrackOptePort { name: String }, - /// Notify the task that a sled has been synced. - SyncedSled { sled_id: Uuid, synced: bool }, + /// Notify the task that a sled has been synced with NTP. + TimeSynced { sled_id: Uuid }, // TODO-completeness: We will probably want to track other kinds of // statistics here too. For example, we could send messages when a zone is // created / destroyed to track zonestats; we might also want to support @@ -110,23 +110,6 @@ struct Target { sled_datalink: SledDataLink, } -impl Target { - /// Create a new target. - fn new(id: TargetId, sled_datalink: SledDataLink) -> Self { - Self { id, sled_datalink } - } - - /// Return the target id associated with the kstat sampler. - fn id(&self) -> TargetId { - self.id - } - - /// Return the sled datalink's Uuid identifier. - fn sled_id(&self) -> Uuid { - self.sled_datalink.target.sled_id - } -} - fn get_collection_details(kind: &str) -> CollectionDetails { if is_transient_link(kind) { CollectionDetails::duration( @@ -147,7 +130,7 @@ async fn metrics_task( mut rx: mpsc::Receiver, ) { let mut tracked_links: TrackedLinks = HashMap::new(); - let mut sled_synced: bool = false; + let mut sled_time_synced: bool = false; // Main polling loop, waiting for messages from other pieces of the code to // track various statistics. @@ -170,7 +153,7 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; - let link = SledDataLink::new(target, sled_synced); + let link = SledDataLink::new(target, sled_time_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } @@ -185,7 +168,7 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; - let link = SledDataLink::new(target, sled_synced); + let link = SledDataLink::new(target, sled_time_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } @@ -204,7 +187,7 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; - let link = SledDataLink::new(target, sled_synced); + let link = SledDataLink::new(target, sled_time_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } @@ -212,14 +195,14 @@ async fn metrics_task( remove_datalink(&log, &mut tracked_links, &kstat_sampler, name) .await } - Message::SyncedSled { sled_id, synced } => { - if sled_id == sled_identifiers.sled_id && synced { - sled_synced = true; + Message::TimeSynced { sled_id } => { + assert!(!sled_time_synced, "This message should only be sent once (on first synchronization with NTP)"); + if sled_id == sled_identifiers.sled_id { + sled_time_synced = true; sync_sled_datalinks( &log, &mut tracked_links, &kstat_sampler, - sled_id, ) .await } @@ -236,7 +219,7 @@ async fn remove_datalink( name: String, ) { match tracked_links.remove(&name) { - Some(target) => match kstat_sampler.remove_target(target.id()).await { + Some(target) => match kstat_sampler.remove_target(target.id).await { Ok(_) => { debug!( log, @@ -285,7 +268,7 @@ async fn add_datalink( "link_kind" => %link.kind(), "zone_name" => %link.zone_name(), ); - entry.insert(Target::new(id, link)); + entry.insert(Target { id, sled_datalink: link }); } Err(err) => { error!( @@ -316,31 +299,28 @@ async fn sync_sled_datalinks( log: &Logger, tracked_links: &mut TrackedLinks, kstat_sampler: &KstatSampler, - sled_id: Uuid, ) { for (link_name, target) in tracked_links.iter_mut() { - if target.sled_id() == sled_id { - target.sled_datalink.synced = true; - let details = get_collection_details(target.sled_datalink.kind()); - match kstat_sampler - .update_target(target.sled_datalink.clone(), details) - .await - { - Ok(_) => { - debug!( - log, - "Updated link already tracked by kstat sampler"; - "link_name" => link_name, - ); - } - Err(err) => { - error!( - log, - "Failed to update link already tracked by kstat sampler"; - "link_name" => link_name, - "error" => ?err, - ); - } + target.sled_datalink.time_synced = true; + let details = get_collection_details(target.sled_datalink.kind()); + match kstat_sampler + .update_target(target.sled_datalink.clone(), details) + .await + { + Ok(_) => { + debug!( + log, + "Updated link already tracked by kstat sampler"; + "link_name" => link_name, + ); + } + Err(err) => { + error!( + log, + "Failed to update link already tracked by kstat sampler"; + "link_name" => link_name, + "error" => ?err, + ); } } } @@ -521,12 +501,9 @@ impl MetricsRequestQueue { success } - /// Notify the task that a sled's synced value has changed. - /// - /// Typically, this is used to notify the task that a sled has *indeed* been - /// synced with NTP. - pub async fn synced_sled(&self, sled_id: Uuid, synced: bool) -> bool { - self.0.send(Message::SyncedSled { sled_id, synced }).await.is_ok() + /// Notify the task that a sled's state has been synchronized with NTP. + pub async fn notify_time_synced_sled(&self, sled_id: Uuid) -> bool { + self.0.send(Message::TimeSynced { sled_id }).await.is_ok() } } diff --git a/sled-agent/src/services.rs b/sled-agent/src/services.rs index c31ef4e923..2c83fc5ea3 100644 --- a/sled-agent/src/services.rs +++ b/sled-agent/src/services.rs @@ -971,12 +971,7 @@ impl ServiceManager { /// Returns the metrics queue for the sled agent. fn metrics_queue(&self) -> &MetricsRequestQueue { - &self - .inner - .sled_info - .get() - .expect("Sled agent should have started") - .metrics_queue + &self.maybe_metrics_queue().expect("Sled agent should have started") } // Advertise the /64 prefix of `address`, unless we already have. @@ -3734,7 +3729,7 @@ impl ServiceManager { } /// Adjust the system boot time to the latest boot time of all zones. - pub fn boottime_rewrite(&self) { + fn boottime_rewrite(&self) { // Call out to the 'tmpx' utility program which will rewrite the wtmpx // and utmpx databases in every zone, including the global zone, to // reflect the adjusted system boot time. @@ -3766,7 +3761,7 @@ impl ServiceManager { if skip_timesync { info!(self.inner.log, "Configured to skip timesync checks"); - self.if_timesynced().await; + self.on_time_sync().await; return Ok(TimeSync { sync: true, ref_id: 0, @@ -3821,7 +3816,7 @@ impl ServiceManager { && correction.abs() <= 0.05; if sync { - self.if_timesynced().await; + self.on_time_sync().await; } Ok(TimeSync { @@ -3843,30 +3838,36 @@ impl ServiceManager { } } - /// Check if the system time is synchronized and if so, execute the - /// boottime_rewrite function and send messages. - async fn if_timesynced(&self) { - match self.inner.time_synced.compare_exchange( - false, - true, - Ordering::Acquire, - Ordering::Relaxed, - ) { - Err(_) => { - debug!(self.inner.log, "Time was already synchronized"); - } - Ok(_) => { - debug!(self.inner.log, "Time is now synchronized"); - // We only want to rewrite the boot time once, so we do it here - // when we know the time is synchronized. - self.boottime_rewrite(); - - // We expect to have a metrics queue by this point, so - // we can safely send a message on it to say the sled has - // been synchronized. - let queue = self.metrics_queue(); - queue.synced_sled(self.sled_id(), true).await; + /// Check if the synchronization state of the sled has shifted to true and + /// if so, execute the any out-of-band actions that need to be taken. + /// + /// This function only executes the out-of-band actions once, once the + /// synchronization state has shifted to true. + async fn on_time_sync(&self) { + if self + .inner + .time_synced + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + debug!(self.inner.log, "Time is now synchronized"); + // We only want to rewrite the boot time once, so we do it here + // when we know the time is synchronized. + self.boottime_rewrite(); + + // We expect to have a metrics queue by this point, so + // we can safely send a message on it to say the sled has + // been synchronized. + let queue = self.metrics_queue(); + if !queue.notify_time_synced_sled(self.sled_id()).await { + error!( + self.inner.log, + "Failed to notify metrics queue of sled \ + time synchronization, metrics may not be produced." + ); } + } else { + debug!(self.inner.log, "Time was already synchronized"); } } @@ -5094,10 +5095,7 @@ mod test { .expect("Should have received a message about the sled being synced"); assert_eq!( synced_message, - metrics::Message::SyncedSled { - sled_id: mgr.sled_id(), - synced: true - }, + metrics::Message::TimeSynced { sled_id: mgr.sled_id() }, ); // Then, check that we received a message about the zone's VNIC. @@ -5250,10 +5248,7 @@ mod test { .expect("Should have received a message about the sled being synced"); assert_eq!( synced_message, - metrics::Message::SyncedSled { - sled_id: mgr.sled_id(), - synced: true - } + metrics::Message::TimeSynced { sled_id: mgr.sled_id() } ); // In this case, the manager creates the zone once, and then "ensuring" @@ -5324,10 +5319,7 @@ mod test { metrics_rx.recv(), ).await.expect("Should have received a message about the sled being synced within the timeout") .expect("Should have received a message about the sled being synced"); - assert_eq!( - synced_message, - metrics::Message::SyncedSled { sled_id, synced: true } - ); + assert_eq!(synced_message, metrics::Message::TimeSynced { sled_id }); // Check that we received a message about the zone's VNIC. Since the // manager is being dropped, it should also send a message about the