From 1da9f7255eb110cb8be15ae52497978deb596a02 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Tue, 12 Nov 2024 16:50:35 +0100 Subject: [PATCH 1/2] Deduplicate Debugger diagnostics in sidecar --- live-debugger-ffi/src/send_data.rs | 8 +- live-debugger/src/debugger_defs.rs | 28 ++-- sidecar-ffi/src/lib.rs | 19 +++ sidecar/src/service/blocking.rs | 29 +++- .../debugger_diagnostics_bookkeeper.rs | 141 ++++++++++++++++++ sidecar/src/service/mod.rs | 1 + sidecar/src/service/sidecar_interface.rs | 16 ++ sidecar/src/service/sidecar_server.rs | 32 ++++ 8 files changed, 255 insertions(+), 19 deletions(-) create mode 100644 sidecar/src/service/debugger_diagnostics_bookkeeper.rs diff --git a/live-debugger-ffi/src/send_data.rs b/live-debugger-ffi/src/send_data.rs index d9e67b49e..a047776e2 100644 --- a/live-debugger-ffi/src/send_data.rs +++ b/live-debugger-ffi/src/send_data.rs @@ -100,7 +100,7 @@ pub extern "C" fn ddog_create_exception_snapshot<'a>( ) -> *mut DebuggerCapture<'a> { let snapshot = DebuggerPayload { service: service.to_utf8_lossy(), - ddsource: "dd_debugger", + ddsource: Cow::Borrowed("dd_debugger"), timestamp, message: None, debugger: DebuggerData::Snapshot(Snapshot { @@ -159,7 +159,7 @@ pub extern "C" fn ddog_create_log_probe_snapshot<'a>( ) -> Box> { Box::new(DebuggerPayload { service: service.to_utf8_lossy(), - ddsource: "dd_debugger", + ddsource: Cow::Borrowed("dd_debugger"), timestamp, message: message.map(|m| m.to_utf8_lossy()), debugger: DebuggerData::Snapshot(Snapshot { @@ -334,7 +334,7 @@ pub extern "C" fn ddog_evaluation_error_snapshot<'a>( ) -> Box> { Box::new(DebuggerPayload { service: service.to_utf8_lossy(), - ddsource: "dd_debugger", + ddsource: Cow::Borrowed("dd_debugger"), timestamp, message: Some(Cow::Owned(format!( "Evaluation errors for probe id {}", @@ -397,7 +397,7 @@ pub fn ddog_debugger_diagnostics_create_unboxed<'a>( } DebuggerPayload { service, - ddsource: "dd_debugger", + ddsource: Cow::Borrowed("dd_debugger"), timestamp, message: Some(if probe.diagnostic_msg.len() > 0 { probe.diagnostic_msg.to_utf8_lossy() diff --git a/live-debugger/src/debugger_defs.rs b/live-debugger/src/debugger_defs.rs index 4f15eb93f..a0e3f9334 100644 --- a/live-debugger/src/debugger_defs.rs +++ b/live-debugger/src/debugger_defs.rs @@ -5,16 +5,16 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::HashMap; -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct DebuggerPayload<'a> { pub service: Cow<'a, str>, - pub ddsource: &'static str, + pub ddsource: Cow<'static, str>, pub timestamp: u64, pub debugger: DebuggerData<'a>, pub message: Option>, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[allow(clippy::large_enum_variant)] pub enum DebuggerData<'a> { @@ -22,7 +22,7 @@ pub enum DebuggerData<'a> { Diagnostics(Diagnostics<'a>), } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ProbeMetadataLocation<'a> { #[serde(skip_serializing_if = "Option::is_none")] pub method: Option>, @@ -30,7 +30,7 @@ pub struct ProbeMetadataLocation<'a> { pub r#type: Option>, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ProbeMetadata<'a> { pub id: Cow<'a, str>, pub location: ProbeMetadataLocation<'a>, @@ -42,13 +42,13 @@ pub struct SnapshotEvaluationError { pub message: String, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct SnapshotStackFrame { pub expr: String, pub message: String, } -#[derive(Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Snapshot<'a> { pub language: Cow<'a, str>, @@ -70,7 +70,7 @@ pub struct Snapshot<'a> { pub stack: Vec, } -#[derive(Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct Captures<'a> { #[serde(skip_serializing_if = "HashMap::is_empty")] pub lines: HashMap>, @@ -81,7 +81,7 @@ pub struct Captures<'a> { } pub type Fields<'a> = HashMap, Value<'a>>; -#[derive(Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct Capture<'a> { #[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(rename = "staticFields")] @@ -94,10 +94,10 @@ pub struct Capture<'a> { pub throwable: Option>, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Entry<'a>(pub Value<'a>, pub Value<'a>); -#[derive(Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Value<'a> { pub r#type: Cow<'a, str>, @@ -119,7 +119,7 @@ pub struct Value<'a> { pub size: Option>, } -#[derive(Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Diagnostics<'a> { pub probe_id: Cow<'a, str>, @@ -134,7 +134,7 @@ pub struct Diagnostics<'a> { pub details: Option>, } -#[derive(Serialize, Deserialize, Default, Copy, Clone)] +#[derive(Serialize, Deserialize, Debug, Default, Copy, Clone, Eq, PartialEq)] #[serde(rename_all = "UPPERCASE")] #[repr(C)] pub enum ProbeStatus { @@ -147,7 +147,7 @@ pub enum ProbeStatus { Warning, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DiagnosticsError<'a> { pub r#type: Cow<'a, str>, diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index 5cf52f00c..0257a66f1 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -699,6 +699,25 @@ pub unsafe extern "C" fn ddog_sidecar_send_debugger_datum( ddog_sidecar_send_debugger_data(transport, instance_id, queue_id, vec![*payload]) } +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +#[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals +pub unsafe extern "C" fn ddog_sidecar_send_debugger_diagnostics<'a>( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: QueueId, + diagnostics_payload: DebuggerPayload<'a>, +) -> MaybeError { + try_c!(blocking::send_debugger_diagnostics( + transport, + instance_id, + queue_id, + diagnostics_payload, + )); + + MaybeError::None +} + #[no_mangle] #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn ddog_sidecar_set_remote_config_data( diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index 85f7ffc3d..503373bc0 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -7,6 +7,7 @@ use super::{ }; use datadog_ipc::platform::{Channel, FileBackedHandle, ShmHandle}; use datadog_ipc::transport::blocking::BlockingTransport; +use datadog_live_debugger::debugger_defs::DebuggerPayload; use datadog_live_debugger::sender::DebuggerType; use ddcommon::tag::Tag; use dogstatsd_client::DogStatsDActionOwned; @@ -318,7 +319,7 @@ pub fn send_debugger_data_shm_vec( transport: &mut SidecarTransport, instance_id: &InstanceId, queue_id: QueueId, - payloads: Vec, + payloads: Vec, ) -> anyhow::Result<()> { if payloads.is_empty() { return Ok(()); @@ -352,6 +353,32 @@ pub fn send_debugger_data_shm_vec( )?) } +/// Submits debugger diagnostics. +/// +/// # Arguments +/// +/// * `transport` - The transport used for communication. +/// * `instance_id` - The ID of the instance. +/// * `queue_id` - The unique identifier for the trace context. +/// * `handle` - The handle to the shared memory. +/// * `diagnostics_payload` - The diagnostics data to send. +/// +/// # Returns +/// +/// An `io::Result<()>` indicating the result of the operation. +pub fn send_debugger_diagnostics<'a>( + transport: &mut SidecarTransport, + instance_id: &InstanceId, + queue_id: QueueId, + diagnostics_payload: DebuggerPayload<'a>, +) -> io::Result<()> { + transport.send(SidecarInterfaceRequest::SendDebuggerDiagnostics { + instance_id: instance_id.clone(), + queue_id, + diagnostics_payload: serde_json::to_vec(&diagnostics_payload)?, + }) +} + /// Acquire an exception hash rate limiter /// /// # Arguments diff --git a/sidecar/src/service/debugger_diagnostics_bookkeeper.rs b/sidecar/src/service/debugger_diagnostics_bookkeeper.rs new file mode 100644 index 000000000..84bef06dd --- /dev/null +++ b/sidecar/src/service/debugger_diagnostics_bookkeeper.rs @@ -0,0 +1,141 @@ +use datadog_live_debugger::debugger_defs::{ + DebuggerData, DebuggerPayload, Diagnostics, ProbeStatus, +}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use tokio::select; +use tokio_util::sync::CancellationToken; + +pub struct DebuggerDiagnosticsBookkeeper { + active_by_runtime_id: Arc>>, + cancel: CancellationToken, +} + +struct LastProbeStatus { + status: ProbeStatus, + last_update: Instant, +} + +#[derive(Default)] +struct DebuggerActiveData { + pub active_probes: HashMap, +} + +const MAX_TIME_BEFORE_FALLBACK: Duration = Duration::from_secs(300); +const MAX_TIME_BEFORE_REMOVAL: Duration = Duration::from_secs(600); + +impl DebuggerDiagnosticsBookkeeper { + pub fn start() -> DebuggerDiagnosticsBookkeeper { + let buffer = DebuggerDiagnosticsBookkeeper { + active_by_runtime_id: Default::default(), + cancel: CancellationToken::new(), + }; + let active = buffer.active_by_runtime_id.clone(); + let cancel = buffer.cancel.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(MAX_TIME_BEFORE_REMOVAL / 2); + loop { + select! { + _ = interval.tick() => { + active.lock().unwrap().retain(|_, active| { + active.active_probes.retain(|_, status| { + status.last_update.elapsed() < MAX_TIME_BEFORE_REMOVAL + }); + active.active_probes.len() != 0 + }); + }, + _ = cancel.cancelled() => { + break; + }, + } + } + }); + buffer + } + + pub fn add_payload(&self, payload: &DebuggerPayload) -> bool { + if let DebuggerData::Diagnostics(diagnostics) = &payload.debugger { + let mut send = true; + + fn insert_probe(active_data: &mut DebuggerActiveData, diagnostics: &Diagnostics) { + active_data.active_probes.insert( + diagnostics.probe_id.to_string(), + LastProbeStatus { + status: diagnostics.status, + last_update: Instant::now(), + }, + ); + } + + let mut buffers = self.active_by_runtime_id.lock().unwrap(); + if let Some(buffer) = buffers.get_mut(diagnostics.runtime_id.as_ref()) { + if let Some(status) = buffer + .active_probes + .get_mut(diagnostics.runtime_id.as_ref()) + { + if !matches!(diagnostics.status, ProbeStatus::Received) { + if matches!(status.status, ProbeStatus::Received) + || (!matches!(diagnostics.status, ProbeStatus::Installed) + && matches!(status.status, ProbeStatus::Installed)) + { + if status.last_update.elapsed() < MAX_TIME_BEFORE_FALLBACK { + send = false; + } + } + } + if send { + status.last_update = Instant::now(); + if status.status != diagnostics.status { + status.status = diagnostics.status; + } else { + send = false; + } + } + } else { + insert_probe(buffer, &diagnostics); + } + } else { + buffers.insert(diagnostics.runtime_id.to_string(), { + let mut data = DebuggerActiveData::default(); + insert_probe(&mut data, &diagnostics); + data + }); + } + + send + } else { + unreachable!("This is only for diagnostics"); + } + } + + pub fn stats(&self) -> DebuggerDiagnosticsBookkeeperStats { + let buffers = self.active_by_runtime_id.lock().unwrap(); + DebuggerDiagnosticsBookkeeperStats { + runtime_ids: buffers.len() as u32, + total_probes: buffers + .iter() + .map(|(_, active)| active.active_probes.len() as u32) + .sum(), + } + } +} + +impl Default for DebuggerDiagnosticsBookkeeper { + fn default() -> Self { + Self::start() + } +} + +impl Drop for DebuggerDiagnosticsBookkeeper { + fn drop(&mut self) { + self.cancel.cancel(); + } +} + +#[derive(Serialize, Deserialize)] +pub struct DebuggerDiagnosticsBookkeeperStats { + runtime_ids: u32, + total_probes: u32, +} diff --git a/sidecar/src/service/mod.rs b/sidecar/src/service/mod.rs index a8e2ba8ef..b28d73a8f 100644 --- a/sidecar/src/service/mod.rs +++ b/sidecar/src/service/mod.rs @@ -29,6 +29,7 @@ use sidecar_interface::{SidecarInterface, SidecarInterfaceRequest, SidecarInterf pub mod agent_info; pub mod blocking; +mod debugger_diagnostics_bookkeeper; pub mod exception_hash_rate_limiter; mod instance_id; mod queue_id; diff --git a/sidecar/src/service/sidecar_interface.rs b/sidecar/src/service/sidecar_interface.rs index f03f4fe5e..90a56dcf0 100644 --- a/sidecar/src/service/sidecar_interface.rs +++ b/sidecar/src/service/sidecar_interface.rs @@ -130,6 +130,22 @@ pub trait SidecarInterface { debugger_type: DebuggerType, ); + /// Submits debugger diagnostics. + /// They are small and bounded in size, hence it's fine to send them without shm. + /// Also, the sidecar server deserializes them to inspect and filter and avoid sending redundant + /// diagnostics payloads. + /// + /// # Arguments + /// * `instance_id` - The ID of the instance. + /// * `queue_id` - The unique identifier for the trace context. + /// * `diagnostics_payload` - The diagnostics data to send. (Sent as u8 json due to bincode + /// limitations) + async fn send_debugger_diagnostics( + instance_id: InstanceId, + queue_id: QueueId, + diagnostics_payload: Vec, + ); + /// Acquire an exception hash rate limiter /// /// # Arguments diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index cd8c33d3c..059ff9460 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -40,6 +40,9 @@ use tokio::task::{JoinError, JoinHandle}; use crate::config::get_product_endpoint; use crate::service::agent_info::AgentInfos; +use crate::service::debugger_diagnostics_bookkeeper::{ + DebuggerDiagnosticsBookkeeper, DebuggerDiagnosticsBookkeeperStats, +}; use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::runtime_info::ActiveApplication; @@ -72,6 +75,7 @@ struct SidecarStats { enqueued_telemetry_data: EnqueuedTelemetryStats, remote_config_clients: u32, remote_configs: MultiTargetStats, + debugger_diagnostics_bookkeeping: DebuggerDiagnosticsBookkeeperStats, telemetry_metrics_contexts: u32, telemetry_worker: TelemetryWorkerStats, telemetry_worker_errors: u32, @@ -110,6 +114,8 @@ pub struct SidecarServer { pub agent_infos: AgentInfos, /// All remote config handling remote_configs: RemoteConfigs, + /// Diagnostics bookkeeper + debugger_diagnostics_bookkeeper: Arc, /// The ProcessHandle tied to the connection #[cfg(windows)] process_handle: Option, @@ -391,6 +397,7 @@ impl SidecarServer { }) .sum(), remote_configs: self.remote_configs.stats(), + debugger_diagnostics_bookkeeping: self.debugger_diagnostics_bookkeeper.stats(), telemetry_metrics_contexts: sessions .values() .map(|s| { @@ -844,6 +851,31 @@ impl SidecarInterface for SidecarServer { no_response() } + type SendDebuggerDiagnosticsFut = NoResponse; + + fn send_debugger_diagnostics( + self, + _: Context, + instance_id: InstanceId, + queue_id: QueueId, + diagnostics_payload: Vec, + ) -> Self::SendDebuggerDiagnosticsFut { + let session = self.get_session(&instance_id.session_id); + let payload = serde_json::from_slice(diagnostics_payload.as_slice()).unwrap(); + // We segregate RC by endpoint. + // So we assume that runtime ids are unique per endpoint and we can safely filter globally. + if self.debugger_diagnostics_bookkeeper.add_payload(&payload) { + session.send_debugger_data( + DebuggerType::Diagnostics, + &instance_id.runtime_id, + queue_id, + serde_json::to_vec(&vec![payload]).unwrap(), + ); + } + + no_response() + } + type AcquireExceptionHashRateLimiterFut = NoResponse; fn acquire_exception_hash_rate_limiter( From fc9228a066dc202fb28d7b57546670265fb220e7 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Wed, 13 Nov 2024 16:49:15 +0100 Subject: [PATCH 2/2] clippy & add tests Signed-off-by: Bob Weinand --- sidecar-ffi/src/lib.rs | 4 +- sidecar/src/service/blocking.rs | 4 +- .../debugger_diagnostics_bookkeeper.rs | 96 +++++++++++++++---- 3 files changed, 81 insertions(+), 23 deletions(-) diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index 0257a66f1..c5ce8c5f3 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -702,11 +702,11 @@ pub unsafe extern "C" fn ddog_sidecar_send_debugger_datum( #[no_mangle] #[allow(clippy::missing_safety_doc)] #[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals -pub unsafe extern "C" fn ddog_sidecar_send_debugger_diagnostics<'a>( +pub unsafe extern "C" fn ddog_sidecar_send_debugger_diagnostics( transport: &mut Box, instance_id: &InstanceId, queue_id: QueueId, - diagnostics_payload: DebuggerPayload<'a>, + diagnostics_payload: DebuggerPayload, ) -> MaybeError { try_c!(blocking::send_debugger_diagnostics( transport, diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index 503373bc0..72caea640 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -366,11 +366,11 @@ pub fn send_debugger_data_shm_vec( /// # Returns /// /// An `io::Result<()>` indicating the result of the operation. -pub fn send_debugger_diagnostics<'a>( +pub fn send_debugger_diagnostics( transport: &mut SidecarTransport, instance_id: &InstanceId, queue_id: QueueId, - diagnostics_payload: DebuggerPayload<'a>, + diagnostics_payload: DebuggerPayload, ) -> io::Result<()> { transport.send(SidecarInterfaceRequest::SendDebuggerDiagnostics { instance_id: instance_id.clone(), diff --git a/sidecar/src/service/debugger_diagnostics_bookkeeper.rs b/sidecar/src/service/debugger_diagnostics_bookkeeper.rs index 84bef06dd..489e9aa54 100644 --- a/sidecar/src/service/debugger_diagnostics_bookkeeper.rs +++ b/sidecar/src/service/debugger_diagnostics_bookkeeper.rs @@ -1,3 +1,5 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 use datadog_live_debugger::debugger_defs::{ DebuggerData, DebuggerPayload, Diagnostics, ProbeStatus, }; @@ -43,7 +45,7 @@ impl DebuggerDiagnosticsBookkeeper { active.active_probes.retain(|_, status| { status.last_update.elapsed() < MAX_TIME_BEFORE_REMOVAL }); - active.active_probes.len() != 0 + !active.active_probes.is_empty() }); }, _ = cancel.cancelled() => { @@ -70,21 +72,21 @@ impl DebuggerDiagnosticsBookkeeper { } let mut buffers = self.active_by_runtime_id.lock().unwrap(); - if let Some(buffer) = buffers.get_mut(diagnostics.runtime_id.as_ref()) { - if let Some(status) = buffer - .active_probes - .get_mut(diagnostics.runtime_id.as_ref()) - { - if !matches!(diagnostics.status, ProbeStatus::Received) { - if matches!(status.status, ProbeStatus::Received) - || (!matches!(diagnostics.status, ProbeStatus::Installed) - && matches!(status.status, ProbeStatus::Installed)) - { - if status.last_update.elapsed() < MAX_TIME_BEFORE_FALLBACK { - send = false; - } - } - } + let runtime_id = diagnostics + .parent_id + .as_ref() + .unwrap_or(&diagnostics.runtime_id); + if let Some(buffer) = buffers.get_mut(runtime_id.as_ref()) { + if let Some(status) = buffer.active_probes.get_mut(diagnostics.probe_id.as_ref()) { + // This is a bit confusing now, but clippy requested me to collapse this: + // Essentially, we shall send if the last emitted/error/installed/etc. is older + // than MAX_TIME_BEFORE_FALLBACK. If it's installed, we also + // send it the current status is Received. + send = matches!(status.status, ProbeStatus::Received) + || (!matches!(diagnostics.status, ProbeStatus::Received) + && (matches!(status.status, ProbeStatus::Installed) + || !matches!(diagnostics.status, ProbeStatus::Installed))) + || status.last_update.elapsed() > MAX_TIME_BEFORE_FALLBACK; if send { status.last_update = Instant::now(); if status.status != diagnostics.status { @@ -94,12 +96,12 @@ impl DebuggerDiagnosticsBookkeeper { } } } else { - insert_probe(buffer, &diagnostics); + insert_probe(buffer, diagnostics); } } else { - buffers.insert(diagnostics.runtime_id.to_string(), { + buffers.insert(runtime_id.to_string(), { let mut data = DebuggerActiveData::default(); - insert_probe(&mut data, &diagnostics); + insert_probe(&mut data, diagnostics); data }); } @@ -139,3 +141,59 @@ pub struct DebuggerDiagnosticsBookkeeperStats { runtime_ids: u32, total_probes: u32, } + +#[cfg(test)] +mod tests { + use super::*; + use datadog_live_debugger::debugger_defs::{ + DebuggerData, DebuggerPayload, Diagnostics, ProbeStatus, + }; + use std::borrow::Cow; + + fn create_payload<'a>( + probe_id: &'a str, + runtime_id: &'a str, + status: ProbeStatus, + ) -> DebuggerPayload<'a> { + DebuggerPayload { + service: Default::default(), + ddsource: Default::default(), + timestamp: 0, + debugger: DebuggerData::Diagnostics(Diagnostics { + probe_id: Cow::Borrowed(probe_id), + runtime_id: Cow::Borrowed(runtime_id), + parent_id: None, + probe_version: 0, + status, + exception: None, + details: None, + }), + message: None, + } + } + + #[tokio::test] + async fn test_bookkeeper() { + let bookkeeper = DebuggerDiagnosticsBookkeeper::start(); + assert!(bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Received))); + // Second insert of same thing is rejected + assert!(!bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Received))); + // Different thing is allowed + assert!(bookkeeper.add_payload(&create_payload("1", "3", ProbeStatus::Received))); + assert!(bookkeeper.add_payload(&create_payload("2", "2", ProbeStatus::Received))); + + // We can move to installed + assert!(bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Installed))); + // But not back + assert!(!bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Received))); + assert!(!bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Installed))); + + // We can move to e.g. error or emitting + assert!(bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Emitting))); + assert!(bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Error))); + assert!(bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Emitting))); + // But not back + assert!(!bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Received))); + assert!(!bookkeeper.add_payload(&create_payload("1", "2", ProbeStatus::Installed))); + } +}