Skip to content

Commit

Permalink
add typestate pattern for measurement series
Browse files Browse the repository at this point in the history
- commenting out the `scope` method as in the other typestate commits;
  in a future PR, these will be gated by a feature

Signed-off-by: mimir-d <[email protected]>
  • Loading branch information
mimir-d committed Oct 8, 2024
1 parent e850afc commit e8968c3
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 168 deletions.
163 changes: 94 additions & 69 deletions src/output/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use std::future::Future;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

Expand All @@ -20,34 +19,26 @@ use tv::{dut, emitter, step};
///
/// ref: https://github.com/opencomputeproject/ocp-diag-core/tree/main/json_spec#measurementseriesstart
pub struct MeasurementSeries {
emitter: Arc<step::StepEmitter>,

seq_no: Arc<atomic::AtomicU64>,
start: MeasurementSeriesStart,

emitter: Arc<step::StepEmitter>,
}

impl MeasurementSeries {
pub(crate) fn new(series_id: &str, name: &str, emitter: Arc<step::StepEmitter>) -> Self {
Self {
emitter,
seq_no: Arc::new(atomic::AtomicU64::new(0)),
start: MeasurementSeriesStart::new(name, series_id),
emitter,
}
}

// TODO: we should allow the user to start a series with details, but still have the series id on
// an auto-generator, since it's more of a spec detail
pub(crate) fn new_with_details(
start: MeasurementSeriesStart,
emitter: Arc<step::StepEmitter>,
) -> Self {
Self {
emitter,
seq_no: Arc::new(atomic::AtomicU64::new(0)),
start,
}
}

fn incr_seqno(&self) -> u64 {
self.seq_no.fetch_add(1, Ordering::AcqRel)
Self { start, emitter }
}

/// Starts the measurement series.
Expand All @@ -69,13 +60,68 @@ impl MeasurementSeries {
/// # Ok::<(), WriterError>(())
/// # });
/// ```
pub async fn start(&self) -> Result<(), emitter::WriterError> {
pub async fn start(self) -> Result<StartedMeasurementSeries, emitter::WriterError> {
self.emitter
.emit(&spec::TestStepArtifactImpl::MeasurementSeriesStart(
self.start.to_artifact(),
))
.await?;
Ok(())

Ok(StartedMeasurementSeries {
parent: self,
seqno: Arc::new(atomic::AtomicU64::new(0)),
})
}

// /// Builds a scope in the [`MeasurementSeries`] object, taking care of starting and
// /// ending it. View [`MeasurementSeries::start`] and [`MeasurementSeries::end`] methods.
// /// After the scope is constructed, additional objects may be added to it.
// /// This is the preferred usage for the [`MeasurementSeries`], since it guarantees
// /// all the messages are emitted between the start and end messages, the order
// /// is respected and no messages is lost.
// ///
// /// # Examples
// ///
// /// ```rust
// /// # tokio_test::block_on(async {
// /// # use ocptv::output::*;
// ///
// /// let run = TestRun::new("diagnostic_name", "my_dut", "1.0").start().await?;
// /// let step = run.step("step_name").start().await?;
// ///
// /// let series = step.measurement_series("name");
// /// series.start().await?;
// /// series.scope(|s| async {
// /// s.add_measurement(60.into()).await?;
// /// s.add_measurement(70.into()).await?;
// /// s.add_measurement(80.into()).await?;
// /// Ok(())
// /// }).await?;
// ///
// /// # Ok::<(), WriterError>(())
// /// # });
// /// ```
// pub async fn scope<'s, F, R>(&'s self, func: F) -> Result<(), emitter::WriterError>
// where
// R: Future<Output = Result<(), emitter::WriterError>>,
// F: std::ops::FnOnce(&'s MeasurementSeries) -> R,
// {
// self.start().await?;
// func(self).await?;
// self.end().await?;
// Ok(())
// }
}

pub struct StartedMeasurementSeries {
parent: MeasurementSeries,

seqno: Arc<atomic::AtomicU64>,
}

impl StartedMeasurementSeries {
fn incr_seqno(&self) -> u64 {
self.seqno.fetch_add(1, Ordering::AcqRel)
}

/// Ends the measurement series.
Expand All @@ -91,20 +137,20 @@ impl MeasurementSeries {
/// let run = TestRun::new("diagnostic_name", "my_dut", "1.0").start().await?;
/// let step = run.step("step_name").start().await?;
///
/// let series = step.measurement_series("name");
/// series.start().await?;
/// let series = step.measurement_series("name").start().await?;
/// series.end().await?;
///
/// # Ok::<(), WriterError>(())
/// # });
/// ```
pub async fn end(&self) -> Result<(), emitter::WriterError> {
let end = spec::MeasurementSeriesEnd {
series_id: self.start.series_id.clone(),
total_count: self.seq_no.load(Ordering::Acquire),
series_id: self.parent.start.series_id.clone(),
total_count: self.seqno.load(Ordering::Acquire),
};

self.emitter
self.parent
.emitter
.emit(&spec::TestStepArtifactImpl::MeasurementSeriesEnd(end))
.await?;

Expand All @@ -124,8 +170,7 @@ impl MeasurementSeries {
/// let run = TestRun::new("diagnostic_name", "my_dut", "1.0").start().await?;
/// let step = run.step("step_name").start().await?;
///
/// let series = step.measurement_series("name");
/// series.start().await?;
/// let series = step.measurement_series("name").start().await?;
/// series.add_measurement(60.into()).await?;
///
/// # Ok::<(), WriterError>(())
Expand All @@ -136,11 +181,12 @@ impl MeasurementSeries {
index: self.incr_seqno(),
value: value.clone(),
timestamp: chrono::Local::now().with_timezone(&chrono_tz::Tz::UTC),
series_id: self.start.series_id.clone(),
series_id: self.parent.start.series_id.clone(),
metadata: None,
};

self.emitter
self.parent
.emitter
.emit(&spec::TestStepArtifactImpl::MeasurementSeriesElement(
element,
))
Expand All @@ -163,8 +209,7 @@ impl MeasurementSeries {
/// let run = TestRun::new("diagnostic_name", "my_dut", "1.0").start().await?;
/// let step = run.step("step_name").start().await?;
///
/// let series = step.measurement_series("name");
/// series.start().await?;
/// let series = step.measurement_series("name").start().await?;
/// series.add_measurement_with_metadata(60.into(), vec![("key", "value".into())]).await?;
///
/// # Ok::<(), WriterError>(())
Expand All @@ -179,59 +224,21 @@ impl MeasurementSeries {
index: self.incr_seqno(),
value: value.clone(),
timestamp: chrono::Local::now().with_timezone(&chrono_tz::Tz::UTC),
series_id: self.start.series_id.clone(),
series_id: self.parent.start.series_id.clone(),
metadata: Some(Map::from_iter(
metadata.iter().map(|(k, v)| (k.to_string(), v.clone())),
)),
};

self.emitter
self.parent
.emitter
.emit(&spec::TestStepArtifactImpl::MeasurementSeriesElement(
element,
))
.await?;

Ok(())
}

/// Builds a scope in the [`MeasurementSeries`] object, taking care of starting and
/// ending it. View [`MeasurementSeries::start`] and [`MeasurementSeries::end`] methods.
/// After the scope is constructed, additional objects may be added to it.
/// This is the preferred usage for the [`MeasurementSeries`], since it guarantees
/// all the messages are emitted between the start and end messages, the order
/// is respected and no messages is lost.
///
/// # Examples
///
/// ```rust
/// # tokio_test::block_on(async {
/// # use ocptv::output::*;
///
/// let run = TestRun::new("diagnostic_name", "my_dut", "1.0").start().await?;
/// let step = run.step("step_name").start().await?;
///
/// let series = step.measurement_series("name");
/// series.start().await?;
/// series.scope(|s| async {
/// s.add_measurement(60.into()).await?;
/// s.add_measurement(70.into()).await?;
/// s.add_measurement(80.into()).await?;
/// Ok(())
/// }).await?;
///
/// # Ok::<(), WriterError>(())
/// # });
/// ```
pub async fn scope<'s, F, R>(&'s self, func: F) -> Result<(), emitter::WriterError>
where
R: Future<Output = Result<(), emitter::WriterError>>,
F: std::ops::FnOnce(&'s MeasurementSeries) -> R,
{
self.start().await?;
func(self).await?;
self.end().await?;
Ok(())
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -730,6 +737,24 @@ impl MeasurementSeriesStartBuilder {
}
}

// pub struct MeasurementSeriesEmitter {
// series_id: String,
// step_emitter: Arc<step::StepEmitter>,
// }

// impl StepEmitter {
// pub async fn emit(&self, object: &spec::TestStepArtifactImpl) -> Result<(), WriterError> {
// let root = spec::RootImpl::TestStepArtifact(spec::TestStepArtifact {
// id: self.step_id.clone(),
// // TODO: can these copies be avoided?
// artifact: object.clone(),
// });
// self.run_emitter.emit(&root).await?;

// Ok(())
// }
// }

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit e8968c3

Please sign in to comment.