Skip to content

Commit

Permalink
Merge pull request #7 from opencomputeproject/fix/state_not_necessary
Browse files Browse the repository at this point in the history
fix/state not necessary
  • Loading branch information
mimir-d authored Oct 8, 2024
2 parents 36e9298 + f8a664e commit aa07128
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 67 deletions.
12 changes: 5 additions & 7 deletions src/output/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ use tv::{dut, emitter, step};
/// A Measurement Series is a time-series list of measurements.
///
/// ref: https://github.com/opencomputeproject/ocp-diag-core/tree/main/json_spec#measurementseriesstart
pub struct MeasurementSeries<'a> {
// note: intentional design to only allow 1 thread to output; may need
// revisiting in the future, if there's a case for multithreaded writers
emitter: &'a step::StepEmitter,
pub struct MeasurementSeries {
emitter: Arc<step::StepEmitter>,

seq_no: Arc<Mutex<atomic::AtomicU64>>,
start: MeasurementSeriesStart,
}

impl<'a> MeasurementSeries<'a> {
pub(crate) fn new(series_id: &str, name: &str, emitter: &'a step::StepEmitter) -> Self {
impl MeasurementSeries {
pub(crate) fn new(series_id: &str, name: &str, emitter: Arc<step::StepEmitter>) -> Self {
Self {
emitter,
seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))),
Expand All @@ -40,7 +38,7 @@ impl<'a> MeasurementSeries<'a> {

pub(crate) fn new_with_details(
start: MeasurementSeriesStart,
emitter: &'a step::StepEmitter,
emitter: Arc<step::StepEmitter>,
) -> Self {
Self {
emitter,
Expand Down
1 change: 0 additions & 1 deletion src/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ mod log;
mod macros;
mod measure;
mod run;
mod state;
mod step;

pub use crate::spec::LogSeverity;
Expand Down
49 changes: 22 additions & 27 deletions src/output/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ use std::sync::Arc;

use serde_json::Map;
use serde_json::Value;
use tokio::sync::Mutex;

use crate::output as tv;
use crate::spec;
use tv::step::TestStep;
use tv::{config, dut, emitter, error, log, state};
use tv::{config, dut, emitter, error, log};

use super::JsonEmitter;

/// The outcome of a TestRun.
/// It's returned when the scope method of the [`TestRun`] object is used.
Expand All @@ -37,7 +38,8 @@ pub struct TestRun {
dut: dut::DutInfo,
command_line: String,
metadata: Option<serde_json::Map<String, tv::Value>>,
state: Arc<Mutex<state::TestState>>,

emitter: Arc<JsonEmitter>,
}

impl TestRun {
Expand Down Expand Up @@ -88,10 +90,7 @@ impl TestRun {
/// ```
pub async fn start(self) -> Result<StartedTestRun, emitter::WriterError> {
// TODO: this likely will go into the emitter since it's not the run's job to emit the schema version
self.state
.lock()
.await
.emitter
self.emitter
.emit(&spec::RootImpl::SchemaVersion(
spec::SchemaVersion::default(),
))
Expand All @@ -108,7 +107,7 @@ impl TestRun {
}),
});

self.state.lock().await.emitter.emit(&start).await?;
self.emitter.emit(&start).await?;

Ok(StartedTestRun::new(self))
}
Expand Down Expand Up @@ -260,15 +259,16 @@ impl TestRunBuilder {
pub fn build(self) -> TestRun {
let config = self.config.unwrap_or(config::Config::builder().build());
let emitter = emitter::JsonEmitter::new(config.timezone, config.writer);
let state = state::TestState::new(emitter);

TestRun {
name: self.name,
dut: self.dut,
version: self.version,
parameters: self.parameters,
command_line: self.command_line,
metadata: self.metadata,
state: Arc::new(Mutex::new(state)),

emitter: Arc::new(emitter),
}
}
}
Expand Down Expand Up @@ -314,9 +314,7 @@ impl StartedTestRun {
artifact: spec::TestRunArtifactImpl::TestRunEnd(spec::TestRunEnd { status, result }),
});

let emitter = &self.run.state.lock().await.emitter;

emitter.emit(&end).await?;
self.run.emitter.emit(&end).await?;
Ok(())
}

Expand Down Expand Up @@ -349,12 +347,11 @@ impl StartedTestRun {
) -> Result<(), emitter::WriterError> {
let log = log::Log::builder(msg).severity(severity).build();

let emitter = &self.run.state.lock().await.emitter;

let artifact = spec::TestRunArtifact {
artifact: spec::TestRunArtifactImpl::Log(log.to_artifact()),
};
emitter
self.run
.emitter
.emit(&spec::RootImpl::TestRunArtifact(artifact))
.await?;

Expand Down Expand Up @@ -385,12 +382,11 @@ impl StartedTestRun {
/// # });
/// ```
pub async fn log_with_details(&self, log: &log::Log) -> Result<(), emitter::WriterError> {
let emitter = &self.run.state.lock().await.emitter;

let artifact = spec::TestRunArtifact {
artifact: spec::TestRunArtifactImpl::Log(log.to_artifact()),
};
emitter
self.run
.emitter
.emit(&spec::RootImpl::TestRunArtifact(artifact))
.await?;

Expand All @@ -417,12 +413,12 @@ impl StartedTestRun {
/// ```
pub async fn error(&self, symptom: &str) -> Result<(), emitter::WriterError> {
let error = error::Error::builder(symptom).build();
let emitter = &self.run.state.lock().await.emitter;

let artifact = spec::TestRunArtifact {
artifact: spec::TestRunArtifactImpl::Error(error.to_artifact()),
};
emitter
self.run
.emitter
.emit(&spec::RootImpl::TestRunArtifact(artifact))
.await?;

Expand Down Expand Up @@ -454,12 +450,12 @@ impl StartedTestRun {
msg: &str,
) -> Result<(), emitter::WriterError> {
let error = error::Error::builder(symptom).message(msg).build();
let emitter = &self.run.state.lock().await.emitter;

let artifact = spec::TestRunArtifact {
artifact: spec::TestRunArtifactImpl::Error(error.to_artifact()),
};
emitter
self.run
.emitter
.emit(&spec::RootImpl::TestRunArtifact(artifact))
.await?;

Expand Down Expand Up @@ -494,12 +490,11 @@ impl StartedTestRun {
&self,
error: &error::Error,
) -> Result<(), emitter::WriterError> {
let emitter = &self.run.state.lock().await.emitter;

let artifact = spec::TestRunArtifact {
artifact: spec::TestRunArtifactImpl::Error(error.to_artifact()),
};
emitter
self.run
.emitter
.emit(&spec::RootImpl::TestRunArtifact(artifact))
.await?;

Expand All @@ -508,6 +503,6 @@ impl StartedTestRun {

pub fn step(&self, name: &str) -> TestStep {
let step_id = format!("step_{}", self.step_seqno.fetch_add(1, Ordering::AcqRel));
TestStep::new(&step_id, name, self.run.state.clone())
TestStep::new(&step_id, name, Arc::clone(&self.run.emitter))
}
}
18 changes: 0 additions & 18 deletions src/output/state.rs

This file was deleted.

25 changes: 11 additions & 14 deletions src/output/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
use serde_json::Value;
use std::sync::atomic;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::output as tv;
use crate::spec::TestStepStart;
use crate::spec::{self, TestStepArtifactImpl};
use tv::measure::MeasurementSeries;
use tv::{emitter, error, log, measure, state};
use tv::{emitter, error, log, measure};

use super::JsonEmitter;
use super::WriterError;

/// A single test step in the scope of a [`TestRun`].
Expand All @@ -23,17 +23,17 @@ use super::WriterError;
pub struct TestStep {
name: String,

emitter: StepEmitter,
emitter: Arc<StepEmitter>,
}

impl TestStep {
pub(crate) fn new(id: &str, name: &str, state: Arc<Mutex<state::TestState>>) -> TestStep {
pub(crate) fn new(id: &str, name: &str, run_emitter: Arc<JsonEmitter>) -> Self {
TestStep {
name: name.to_owned(),
emitter: StepEmitter {
state,
emitter: Arc::new(StepEmitter {
step_id: id.to_owned(),
},
run_emitter,
}),
}
}

Expand Down Expand Up @@ -471,7 +471,7 @@ impl StartedTestStep {
self.measurement_id_no.load(atomic::Ordering::SeqCst)
);

MeasurementSeries::new(&series_id, name, &self.step.emitter)
MeasurementSeries::new(&series_id, name, Arc::clone(&self.step.emitter))
}

/// Starts a Measurement Series (a time-series list of measurements).
Expand All @@ -497,16 +497,13 @@ impl StartedTestStep {
&self,
start: measure::MeasurementSeriesStart,
) -> MeasurementSeries {
MeasurementSeries::new_with_details(start, &self.step.emitter)
MeasurementSeries::new_with_details(start, Arc::clone(&self.step.emitter))
}
}

// TODO: move this away from here; extract trait Emitter, dont rely on json
// it will be used in measurement series
pub struct StepEmitter {
// emitter: JsonEmitter,
state: Arc<Mutex<state::TestState>>,
step_id: String,
run_emitter: Arc<JsonEmitter>,
}

impl StepEmitter {
Expand All @@ -516,7 +513,7 @@ impl StepEmitter {
// TODO: can these copies be avoided?
artifact: object.clone(),
});
self.state.lock().await.emitter.emit(&root).await?;
self.run_emitter.emit(&root).await?;

Ok(())
}
Expand Down

0 comments on commit aa07128

Please sign in to comment.