diff --git a/src/output/emitter.rs b/src/output/emitter.rs index e26acd2..9bfd8bd 100644 --- a/src/output/emitter.rs +++ b/src/output/emitter.rs @@ -84,9 +84,9 @@ impl StdoutWriter { } pub struct JsonEmitter { - sequence_no: Arc, timezone: chrono_tz::Tz, writer: WriterType, + seqno: Arc, } impl JsonEmitter { @@ -94,7 +94,7 @@ impl JsonEmitter { JsonEmitter { timezone, writer, - sequence_no: Arc::new(atomic::AtomicU64::new(0)), + seqno: Arc::new(atomic::AtomicU64::new(0)), } } @@ -110,8 +110,8 @@ impl JsonEmitter { } fn next_sequence_no(&self) -> u64 { - self.sequence_no.fetch_add(1, atomic::Ordering::SeqCst); - self.sequence_no.load(atomic::Ordering::SeqCst) + self.seqno.fetch_add(1, atomic::Ordering::SeqCst); + self.seqno.load(atomic::Ordering::SeqCst) } pub async fn emit(&self, object: &spec::RootArtifact) -> Result<(), WriterError> { diff --git a/src/output/measurement.rs b/src/output/measurement.rs index 249df84..a874bd9 100644 --- a/src/output/measurement.rs +++ b/src/output/measurement.rs @@ -15,22 +15,25 @@ use tokio::sync::Mutex; use crate::output as tv; use crate::spec; -use tv::{dut, emitter, state}; +use tv::{dut, emitter, step}; /// The measurement series. /// A Measurement Series is a time-series list of measurements. /// /// ref: https://github.com/opencomputeproject/ocp-diag-core/tree/main/json_spec#measurementseriesstart -pub struct MeasurementSeries { - state: Arc>, +pub struct MeasurementSeries<'a> { + // note: intentional design to only allow 1 thread to output; may need + // revisiting in the future, if there's a case for multithreaded writers + emitter: &'a step::StepEmitter, + seq_no: Arc>, start: MeasurementSeriesStart, } -impl MeasurementSeries { - pub(crate) fn new(series_id: &str, name: &str, state: Arc>) -> Self { +impl<'a> MeasurementSeries<'a> { + pub(crate) fn new(series_id: &str, name: &str, emitter: &'a step::StepEmitter) -> Self { Self { - state, + emitter, seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))), start: MeasurementSeriesStart::new(name, series_id), } @@ -38,10 +41,10 @@ impl MeasurementSeries { pub(crate) fn new_with_details( start: MeasurementSeriesStart, - state: Arc>, + emitter: &'a step::StepEmitter, ) -> Self { Self { - state, + emitter, seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))), start, } @@ -78,11 +81,10 @@ impl MeasurementSeries { /// # }); /// ``` pub async fn start(&self) -> Result<(), emitter::WriterError> { - self.state - .lock() - .await - .emitter - .emit(&self.start.to_artifact()) + self.emitter + .emit(&spec::TestStepArtifactDescendant::MeasurementSeriesStart( + self.start.to_artifact(), + )) .await?; Ok(()) } @@ -110,12 +112,12 @@ impl MeasurementSeries { pub async fn end(&self) -> Result<(), emitter::WriterError> { let end = MeasurementSeriesEnd::new(self.start.get_series_id(), self.current_sequence_no().await); - self.state - .lock() - .await - .emitter - .emit(&end.to_artifact()) + self.emitter + .emit(&spec::TestStepArtifactDescendant::MeasurementSeriesEnd( + end.to_artifact(), + )) .await?; + Ok(()) } @@ -147,12 +149,13 @@ impl MeasurementSeries { None, ); self.increment_sequence_no().await; - self.state - .lock() - .await - .emitter - .emit(&element.to_artifact()) + + self.emitter + .emit(&spec::TestStepArtifactDescendant::MeasurementSeriesElement( + element.to_artifact(), + )) .await?; + Ok(()) } @@ -191,12 +194,13 @@ impl MeasurementSeries { )), ); self.increment_sequence_no().await; - self.state - .lock() - .await - .emitter - .emit(&element.to_artifact()) + + self.emitter + .emit(&spec::TestStepArtifactDescendant::MeasurementSeriesElement( + element.to_artifact(), + )) .await?; + Ok(()) } @@ -228,10 +232,10 @@ impl MeasurementSeries { /// # Ok::<(), WriterError>(()) /// # }); /// ``` - pub async fn scope<'a, F, R>(&'a self, func: F) -> Result<(), emitter::WriterError> + pub async fn scope<'s, F, R>(&'s self, func: F) -> Result<(), emitter::WriterError> where R: Future>, - F: std::ops::FnOnce(&'a MeasurementSeries) -> R, + F: std::ops::FnOnce(&'s MeasurementSeries) -> R, { self.start().await?; func(self).await?; @@ -406,27 +410,25 @@ impl Measurement { /// let measurement = Measurement::new("name", 50.into()); /// let _ = measurement.to_artifact(); /// ``` - pub fn to_artifact(&self) -> spec::RootArtifact { - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::Measurement(spec::Measurement { - name: self.name.clone(), - unit: self.unit.clone(), - value: self.value.clone(), - validators: self - .validators - .clone() - .map(|vals| vals.iter().map(|val| val.to_spec()).collect()), - hardware_info_id: self - .hardware_info - .as_ref() - .map(|hardware_info| hardware_info.id().to_owned()), - subcomponent: self - .subcomponent - .as_ref() - .map(|subcomponent| subcomponent.to_spec()), - metadata: self.metadata.clone(), - }), - }) + pub fn to_artifact(&self) -> spec::Measurement { + spec::Measurement { + name: self.name.clone(), + unit: self.unit.clone(), + value: self.value.clone(), + validators: self + .validators + .clone() + .map(|vals| vals.iter().map(|val| val.to_spec()).collect()), + hardware_info_id: self + .hardware_info + .as_ref() + .map(|hardware_info| hardware_info.id().to_owned()), + subcomponent: self + .subcomponent + .as_ref() + .map(|subcomponent| subcomponent.to_spec()), + metadata: self.metadata.clone(), + } } } @@ -634,29 +636,25 @@ impl MeasurementSeriesStart { MeasurementSeriesStartBuilder::new(name, series_id) } - pub fn to_artifact(&self) -> spec::RootArtifact { - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::MeasurementSeriesStart( - spec::MeasurementSeriesStart { - name: self.name.clone(), - unit: self.unit.clone(), - series_id: self.series_id.clone(), - validators: self - .validators - .clone() - .map(|vals| vals.iter().map(|val| val.to_spec()).collect()), - hardware_info: self - .hardware_info - .as_ref() - .map(|hardware_info| hardware_info.to_spec()), - subcomponent: self - .subcomponent - .as_ref() - .map(|subcomponent| subcomponent.to_spec()), - metadata: self.metadata.clone(), - }, - ), - }) + pub fn to_artifact(&self) -> spec::MeasurementSeriesStart { + spec::MeasurementSeriesStart { + name: self.name.clone(), + unit: self.unit.clone(), + series_id: self.series_id.clone(), + validators: self + .validators + .clone() + .map(|vals| vals.iter().map(|val| val.to_spec()).collect()), + hardware_info: self + .hardware_info + .as_ref() + .map(|hardware_info| hardware_info.to_spec()), + subcomponent: self + .subcomponent + .as_ref() + .map(|subcomponent| subcomponent.to_spec()), + metadata: self.metadata.clone(), + } } pub fn get_series_id(&self) -> &str { @@ -759,15 +757,11 @@ impl MeasurementSeriesEnd { } } - pub fn to_artifact(&self) -> spec::RootArtifact { - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::MeasurementSeriesEnd( - spec::MeasurementSeriesEnd { - series_id: self.series_id.clone(), - total_count: self.total_count, - }, - ), - }) + pub fn to_artifact(&self) -> spec::MeasurementSeriesEnd { + spec::MeasurementSeriesEnd { + series_id: self.series_id.clone(), + total_count: self.total_count, + } } } @@ -795,18 +789,14 @@ impl MeasurementSeriesElement { } } - pub fn to_artifact(&self) -> spec::RootArtifact { - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::MeasurementSeriesElement( - spec::MeasurementSeriesElement { - index: self.index, - value: self.value.clone(), - timestamp: self.timestamp, - series_id: self.series_id.clone(), - metadata: self.metadata.clone(), - }, - ), - }) + pub fn to_artifact(&self) -> spec::MeasurementSeriesElement { + spec::MeasurementSeriesElement { + index: self.index, + value: self.value.clone(), + timestamp: self.timestamp, + series_id: self.series_id.clone(), + metadata: self.metadata.clone(), + } } } @@ -829,17 +819,15 @@ mod tests { let artifact = measurement.to_artifact(); assert_eq!( artifact, - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::Measurement(spec::Measurement { - name: name.to_string(), - unit: None, - value, - validators: None, - hardware_info_id: None, - subcomponent: None, - metadata: None, - }), - }) + spec::Measurement { + name: name.to_string(), + unit: None, + value, + validators: None, + hardware_info_id: None, + subcomponent: None, + metadata: None, + } ); Ok(()) @@ -874,17 +862,15 @@ mod tests { let artifact = measurement.to_artifact(); assert_eq!( artifact, - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::Measurement(spec::Measurement { - name, - unit: Some(unit.to_string()), - value, - validators: Some(vec![validator.to_spec(), validator.to_spec()]), - hardware_info_id: Some(hardware_info.to_spec().id.clone()), - subcomponent: Some(subcomponent.to_spec()), - metadata: Some(metadata), - }), - }) + spec::Measurement { + name, + unit: Some(unit.to_string()), + value, + validators: Some(vec![validator.to_spec(), validator.to_spec()]), + hardware_info_id: Some(hardware_info.to_spec().id.clone()), + subcomponent: Some(subcomponent.to_spec()), + metadata: Some(metadata), + } ); Ok(()) @@ -899,19 +885,15 @@ mod tests { let artifact = series.to_artifact(); assert_eq!( artifact, - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::MeasurementSeriesStart( - spec::MeasurementSeriesStart { - name: name.to_string(), - unit: None, - series_id: series_id.to_string(), - validators: None, - hardware_info: None, - subcomponent: None, - metadata: None, - } - ), - }) + spec::MeasurementSeriesStart { + name: name.to_string(), + unit: None, + series_id: series_id.to_string(), + validators: None, + hardware_info: None, + subcomponent: None, + metadata: None, + } ); Ok(()) @@ -938,22 +920,18 @@ mod tests { let artifact = series.to_artifact(); assert_eq!( artifact, - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::MeasurementSeriesStart( - spec::MeasurementSeriesStart { - name, - unit: Some("unit".to_string()), - series_id: series_id.to_string(), - validators: Some(vec![validator.to_spec(), validator2.to_spec()]), - hardware_info: Some(hw_info.to_spec()), - subcomponent: Some(subcomponent.to_spec()), - metadata: Some(serde_json::Map::from_iter([ - ("key".to_string(), "value".into()), - ("key2".to_string(), "value2".into()) - ])), - } - ), - }) + spec::MeasurementSeriesStart { + name, + unit: Some("unit".to_string()), + series_id: series_id.to_string(), + validators: Some(vec![validator.to_spec(), validator2.to_spec()]), + hardware_info: Some(hw_info.to_spec()), + subcomponent: Some(subcomponent.to_spec()), + metadata: Some(serde_json::Map::from_iter([ + ("key".to_string(), "value".into()), + ("key2".to_string(), "value2".into()) + ])), + } ); Ok(()) @@ -967,14 +945,10 @@ mod tests { let artifact = series.to_artifact(); assert_eq!( artifact, - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::MeasurementSeriesEnd( - spec::MeasurementSeriesEnd { - series_id: series_id.to_string(), - total_count: 1, - } - ), - }) + spec::MeasurementSeriesEnd { + series_id: series_id.to_string(), + total_count: 1, + } ); Ok(()) diff --git a/src/output/run.rs b/src/output/run.rs index c3cc844..d468d9f 100644 --- a/src/output/run.rs +++ b/src/output/run.rs @@ -5,6 +5,8 @@ // https://opensource.org/licenses/MIT. use std::env; +use std::sync::atomic; +use std::sync::atomic::Ordering; use std::sync::Arc; use serde_json::Map; @@ -115,7 +117,7 @@ impl TestRun { .emit(&start.to_artifact()) .await?; - Ok(StartedTestRun { run: self }) + Ok(StartedTestRun::new(self)) } // disabling this for the moment so we don't publish api that's unusable. @@ -283,9 +285,17 @@ impl TestRunBuilder { /// ref: https://github.com/opencomputeproject/ocp-diag-core/tree/main/json_spec#testrunstart pub struct StartedTestRun { run: TestRun, + + step_seqno: atomic::AtomicU64, } impl StartedTestRun { + fn new(run: TestRun) -> StartedTestRun { + StartedTestRun { + run, + step_seqno: atomic::AtomicU64::new(0), + } + } /// Ends the test run. /// /// ref: https://github.com/opencomputeproject/ocp-diag-core/tree/main/json_spec#testrunend @@ -505,7 +515,8 @@ impl StartedTestRun { } pub fn step(&self, name: &str) -> TestStep { - TestStep::new(name, self.run.state.clone()) + let step_id = format!("step_{}", self.step_seqno.fetch_add(1, Ordering::AcqRel)); + TestStep::new(&step_id, name, self.run.state.clone()) } } diff --git a/src/output/step.rs b/src/output/step.rs index 20fa755..6b9fa36 100644 --- a/src/output/step.rs +++ b/src/output/step.rs @@ -12,21 +12,27 @@ use tokio::sync::Mutex; use crate::output as tv; use crate::spec; use tv::measurement::MeasurementSeries; -use tv::{emitter, error, log, measurement, state, step}; +use tv::{emitter, error, log, measurement, state}; + +use super::WriterError; /// A single test step in the scope of a [`TestRun`]. /// /// ref: https://github.com/opencomputeproject/ocp-diag-core/tree/main/json_spec#test-step-artifacts pub struct TestStep { name: String, - state: Arc>, + + emitter: StepEmitter, } impl TestStep { - pub(crate) fn new(name: &str, state: Arc>) -> TestStep { + pub(crate) fn new(id: &str, name: &str, state: Arc>) -> TestStep { TestStep { - name: name.to_string(), - state, + name: name.to_owned(), + emitter: StepEmitter { + state, + step_id: id.to_owned(), + }, } } @@ -47,13 +53,8 @@ impl TestStep { /// # }); /// ``` pub async fn start(self) -> Result { - let start = step::TestStepStart::new(&self.name); - self.state - .lock() - .await - .emitter - .emit(&start.to_artifact()) - .await?; + let start = TestStepStart::new(&self.name); + self.emitter.emit(&start.to_artifact()).await?; Ok(StartedTestStep { step: self, @@ -125,14 +126,8 @@ impl StartedTestStep { /// # }); /// ``` pub async fn end(&self, status: spec::TestStatus) -> Result<(), emitter::WriterError> { - let end = step::TestStepEnd::new(status); - self.step - .state - .lock() - .await - .emitter - .emit(&end.to_artifact()) - .await?; + let end = TestStepEnd::new(status); + self.step.emitter.emit(&end.to_artifact()).await?; Ok(()) } @@ -183,13 +178,10 @@ impl StartedTestStep { msg: &str, ) -> Result<(), emitter::WriterError> { let log = log::Log::builder(msg).severity(severity).build(); - let emitter = &self.step.state.lock().await.emitter; - let artifact = spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::Log(log.to_artifact()), - }; - emitter - .emit(&spec::RootArtifact::TestStepArtifact(artifact)) + self.step + .emitter + .emit(&spec::TestStepArtifactDescendant::Log(log.to_artifact())) .await?; Ok(()) @@ -221,13 +213,9 @@ impl StartedTestStep { /// # }); /// ``` pub async fn log_with_details(&self, log: &log::Log) -> Result<(), emitter::WriterError> { - let emitter = &self.step.state.lock().await.emitter; - - let artifact = spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::Log(log.to_artifact()), - }; - emitter - .emit(&spec::RootArtifact::TestStepArtifact(artifact)) + self.step + .emitter + .emit(&spec::TestStepArtifactDescendant::Log(log.to_artifact())) .await?; Ok(()) @@ -273,13 +261,12 @@ impl StartedTestStep { /// ``` pub async fn error(&self, symptom: &str) -> Result<(), emitter::WriterError> { let error = error::Error::builder(symptom).build(); - let emitter = &self.step.state.lock().await.emitter; - let artifact = spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::Error(error.to_artifact()), - }; - emitter - .emit(&spec::RootArtifact::TestStepArtifact(artifact)) + self.step + .emitter + .emit(&spec::TestStepArtifactDescendant::Error( + error.to_artifact(), + )) .await?; Ok(()) @@ -330,13 +317,12 @@ impl StartedTestStep { msg: &str, ) -> Result<(), emitter::WriterError> { let error = error::Error::builder(symptom).message(msg).build(); - let emitter = &self.step.state.lock().await.emitter; - let artifact = spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::Error(error.to_artifact()), - }; - emitter - .emit(&spec::RootArtifact::TestStepArtifact(artifact)) + self.step + .emitter + .emit(&spec::TestStepArtifactDescendant::Error( + error.to_artifact(), + )) .await?; Ok(()) @@ -372,13 +358,11 @@ impl StartedTestStep { &self, error: &error::Error, ) -> Result<(), emitter::WriterError> { - let emitter = &self.step.state.lock().await.emitter; - - let artifact = spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::Error(error.to_artifact()), - }; - emitter - .emit(&spec::RootArtifact::TestStepArtifact(artifact)) + self.step + .emitter + .emit(&spec::TestStepArtifactDescendant::Error( + error.to_artifact(), + )) .await?; Ok(()) @@ -409,13 +393,14 @@ impl StartedTestStep { value: Value, ) -> Result<(), emitter::WriterError> { let measurement = measurement::Measurement::new(name, value); + self.step - .state - .lock() - .await .emitter - .emit(&measurement.to_artifact()) + .emit(&spec::TestStepArtifactDescendant::Measurement( + measurement.to_artifact(), + )) .await?; + Ok(()) } @@ -451,12 +436,12 @@ impl StartedTestStep { measurement: &measurement::Measurement, ) -> Result<(), emitter::WriterError> { self.step - .state - .lock() - .await .emitter - .emit(&measurement.to_artifact()) + .emit(&spec::TestStepArtifactDescendant::Measurement( + measurement.to_artifact(), + )) .await?; + Ok(()) } @@ -487,7 +472,7 @@ impl StartedTestStep { self.measurement_id_no.load(atomic::Ordering::SeqCst) ); - MeasurementSeries::new(&series_id, name, self.step.state.clone()) + MeasurementSeries::new(&series_id, name, &self.step.emitter) } /// Starts a Measurement Series (a time-series list of measurements). @@ -513,7 +498,7 @@ impl StartedTestStep { &self, start: measurement::MeasurementSeriesStart, ) -> MeasurementSeries { - MeasurementSeries::new_with_details(start, self.step.state.clone()) + MeasurementSeries::new_with_details(start, &self.step.emitter) } } @@ -528,11 +513,9 @@ impl TestStepStart { } } - pub fn to_artifact(&self) -> spec::RootArtifact { - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::TestStepStart(spec::TestStepStart { - name: self.name.clone(), - }), + pub fn to_artifact(&self) -> spec::TestStepArtifactDescendant { + spec::TestStepArtifactDescendant::TestStepStart(spec::TestStepStart { + name: self.name.clone(), }) } } @@ -546,14 +529,33 @@ impl TestStepEnd { TestStepEnd { status } } - pub fn to_artifact(&self) -> spec::RootArtifact { - spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { - descendant: spec::TestStepArtifactDescendant::TestStepEnd(spec::TestStepEnd { - status: self.status.clone(), - }), + pub fn to_artifact(&self) -> spec::TestStepArtifactDescendant { + spec::TestStepArtifactDescendant::TestStepEnd(spec::TestStepEnd { + status: self.status.clone(), }) } } +// TODO: move this away from here; extract trait Emitter, dont rely on json +// it will be used in measurement series +pub struct StepEmitter { + // emitter: JsonEmitter, + state: Arc>, + step_id: String, +} + +impl StepEmitter { + pub async fn emit(&self, object: &spec::TestStepArtifactDescendant) -> Result<(), WriterError> { + let root = spec::RootArtifact::TestStepArtifact(spec::TestStepArtifact { + id: self.step_id.clone(), + // TODO: can these copies be avoided? + descendant: object.clone(), + }); + self.state.lock().await.emitter.emit(&root).await?; + + Ok(()) + } +} + #[cfg(test)] mod tests {} diff --git a/src/spec.rs b/src/spec.rs index dc91774..0161cb8 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -494,6 +494,9 @@ pub struct SourceLocation { /// schema ref: https://github.com/opencomputeproject/ocp-diag-core/testStepArtifact #[derive(Debug, Serialize, PartialEq, Clone)] pub struct TestStepArtifact { + #[serde(rename = "testStepId")] + pub id: String, + #[serde(flatten)] pub descendant: TestStepArtifactDescendant, } diff --git a/tests/output/runner.rs b/tests/output/runner.rs index 82426fc..b1f5413 100644 --- a/tests/output/runner.rs +++ b/tests/output/runner.rs @@ -498,6 +498,7 @@ async fn test_testrun_step_error_with_details() -> Result<()> { json!({ "sequenceNumber": 4, "testStepArtifact": { + "testStepId": "step_0", "error": { "message": "Error message", "softwareInfoIds":[{ @@ -579,12 +580,14 @@ async fn test_step_with_measurement() -> Result<()> { json_run_default_start(), json_step_default_start(), json!({ - "sequenceNumber": 4, "testStepArtifact": { + "testStepId": "step_0", "measurement": { - "name": "name", "value": 50 + "name": "name", + "value": 50 } - } + }, + "sequenceNumber": 4 }), json_step_complete(5), json_run_pass(6), @@ -601,6 +604,8 @@ async fn test_step_with_measurement() -> Result<()> { .await } +// TODO: intentionally leaving these tests broken so that it's obvious later that the +// assert_json_includes was not sufficient; this case is missing `testStepId` field #[tokio::test] async fn test_step_with_measurement_builder() -> Result<()> { let expected = [