Skip to content

Commit

Permalink
[2/6] [oxql] move core types to a new oxql-types crate (#6364)
Browse files Browse the repository at this point in the history
Means that the Nexus external API crate doesn't have to pull in
oximeter-db.

I did have to expose a few mutators, but my general reasoning is that
because these types derive `Deserialize`, it's always possible to make
these changes by serializing into JSON and working with them there.
  • Loading branch information
sunshowers authored Aug 18, 2024
1 parent 24c129b commit 4e8c731
Show file tree
Hide file tree
Showing 27 changed files with 710 additions and 612 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ members = [
"oximeter/instruments",
"oximeter/oximeter-macro-impl",
"oximeter/oximeter",
"oximeter/oxql-types",
"oximeter/producer",
"oximeter/schema",
"oximeter/test-utils",
Expand Down Expand Up @@ -196,6 +197,7 @@ default-members = [
"oximeter/instruments",
"oximeter/oximeter-macro-impl",
"oximeter/oximeter",
"oximeter/oxql-types",
"oximeter/producer",
"oximeter/schema",
"oximeter/test-utils",
Expand Down Expand Up @@ -470,6 +472,7 @@ oximeter-schema = { path = "oximeter/schema" }
oximeter-test-utils = { path = "oximeter/test-utils" }
oximeter-timeseries-macro = { path = "oximeter/timeseries-macro" }
oximeter-types = { path = "oximeter/types" }
oxql-types = { path = "oximeter/oxql-types" }
p256 = "0.13"
parse-display = "0.10.0"
partial-io = { version = "0.5.4", features = ["proptest1", "tokio1"] }
Expand Down
1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ openssl.workspace = true
oximeter-client.workspace = true
oximeter-db = { workspace = true, default-features = false, features = [ "oxql" ] }
oxnet.workspace = true
oxql-types.workspace = true
parse-display.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
Expand Down
4 changes: 2 additions & 2 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use nexus_db_queries::{
};
use omicron_common::api::external::{Error, InternalContext};
use oximeter_db::{
oxql, Measurement, TimeseriesSchema, TimeseriesSchemaPaginationParams,
Measurement, TimeseriesSchema, TimeseriesSchemaPaginationParams,
};
use std::num::NonZeroU32;

Expand Down Expand Up @@ -138,7 +138,7 @@ impl super::Nexus {
&self,
opctx: &OpContext,
query: impl AsRef<str>,
) -> Result<Vec<oxql::Table>, Error> {
) -> Result<Vec<oxql_types::Table>, Error> {
// Must be a fleet user to list timeseries schema.
//
// TODO-security: We need to figure out how to implement proper security
Expand Down
2 changes: 1 addition & 1 deletion nexus/src/external_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6386,7 +6386,7 @@ async fn timeseries_schema_list(
async fn timeseries_query(
rqctx: RequestContext<ApiContext>,
body: TypedBody<params::TimeseriesQuery>,
) -> Result<HttpResponseOk<Vec<oximeter_db::oxql::Table>>, HttpError> {
) -> Result<HttpResponseOk<Vec<oxql_types::Table>>, HttpError> {
let apictx = rqctx.context();
let handler = async {
let nexus = &apictx.context.nexus;
Expand Down
6 changes: 3 additions & 3 deletions nexus/tests/integration_tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async fn test_timeseries_schema_list(
pub async fn timeseries_query(
cptestctx: &ControlPlaneTestContext<omicron_nexus::Server>,
query: impl ToString,
) -> Vec<oximeter_db::oxql::Table> {
) -> Vec<oxql_types::Table> {
// first, make sure the latest timeseries have been collected.
cptestctx.oximeter.force_collect().await;

Expand Down Expand Up @@ -429,11 +429,11 @@ async fn test_instance_watcher_metrics(

#[track_caller]
fn count_state(
table: &oximeter_db::oxql::Table,
table: &oxql_types::Table,
instance_id: InstanceUuid,
state: &'static str,
) -> i64 {
use oximeter_db::oxql::point::ValueArray;
use oxql_types::point::ValueArray;
let uuid = FieldValue::Uuid(instance_id.into_untyped_uuid());
let state = FieldValue::String(state.into());
let mut timeserieses = table.timeseries().filter(|ts| {
Expand Down
14 changes: 12 additions & 2 deletions openapi/nexus.json
Original file line number Diff line number Diff line change
Expand Up @@ -20131,10 +20131,20 @@
"type": "object",
"properties": {
"metric_type": {
"$ref": "#/components/schemas/MetricType"
"description": "The type of this metric.",
"allOf": [
{
"$ref": "#/components/schemas/MetricType"
}
]
},
"values": {
"$ref": "#/components/schemas/ValueArray"
"description": "The data values.",
"allOf": [
{
"$ref": "#/components/schemas/ValueArray"
}
]
}
},
"required": [
Expand Down
1 change: 1 addition & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ num.workspace = true
omicron-common.workspace = true
omicron-workspace-hack.workspace = true
oximeter.workspace = true
oxql-types.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ use crate::Error;
use crate::Metric;
use crate::Target;
use crate::Timeseries;
use crate::TimeseriesKey;
use crate::TimeseriesName;
use crate::TimeseriesPageSelector;
use crate::TimeseriesScanParams;
use crate::TimeseriesSchema;
use dropshot::EmptyScanParams;
use dropshot::PaginationOrder;
use dropshot::ResultsPage;
use dropshot::WhichPage;
use oximeter::schema::TimeseriesKey;
use oximeter::types::Sample;
use oximeter::TimeseriesName;
use regex::Regex;
use regex::RegexBuilder;
use slog::debug;
Expand Down
30 changes: 15 additions & 15 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::query::field_table_name;
use crate::Error;
use crate::Metric;
use crate::Target;
use crate::TimeseriesKey;
use oximeter::schema::TimeseriesKey;
use oximeter::TimeseriesSchema;
use slog::debug;
use slog::trace;
Expand Down Expand Up @@ -68,7 +68,7 @@ pub struct OxqlResult {
pub query_summaries: Vec<QuerySummary>,

/// The list of OxQL tables returned from the query.
pub tables: Vec<oxql::Table>,
pub tables: Vec<oxql_types::Table>,
}

/// The maximum number of data values fetched from the database for an OxQL
Expand Down Expand Up @@ -479,7 +479,9 @@ impl Client {
query_id,
total_duration: query_start.elapsed(),
query_summaries,
tables: vec![oxql::Table::new(schema.timeseries_name.as_str())],
tables: vec![oxql_types::Table::new(
schema.timeseries_name.as_str(),
)],
};
return Ok(result);
}
Expand All @@ -503,7 +505,7 @@ impl Client {

// At this point, let's construct a set of tables and run the results
// through the transformation pipeline.
let mut tables = vec![oxql::Table::from_timeseries(
let mut tables = vec![oxql_types::Table::from_timeseries(
schema.timeseries_name.as_str(),
timeseries_by_key.into_values(),
)?];
Expand Down Expand Up @@ -553,7 +555,7 @@ impl Client {
limit: Option<Limit>,
total_rows_fetched: &mut u64,
) -> Result<
(Vec<QuerySummary>, BTreeMap<TimeseriesKey, oxql::Timeseries>),
(Vec<QuerySummary>, BTreeMap<TimeseriesKey, oxql_types::Timeseries>),
Error,
> {
// We'll create timeseries for each key on the fly. To enable computing
Expand Down Expand Up @@ -624,25 +626,25 @@ impl Client {
for (key, measurements) in measurements_by_key.into_iter() {
// Constuct a new timeseries, from the target/metric info.
let (target, metric) = info.get(&key).unwrap();
let mut timeseries = oxql::Timeseries::new(
let mut timeseries = oxql_types::Timeseries::new(
target
.fields
.iter()
.chain(metric.fields.iter())
.map(|field| (field.name.clone(), field.value.clone())),
oxql::point::DataType::try_from(schema.datum_type)?,
oxql_types::point::DataType::try_from(schema.datum_type)?,
if schema.datum_type.is_cumulative() {
oxql::point::MetricType::Delta
oxql_types::point::MetricType::Delta
} else {
oxql::point::MetricType::Gauge
oxql_types::point::MetricType::Gauge
},
)?;

// Covert its oximeter measurements into OxQL data types.
let points = if schema.datum_type.is_cumulative() {
oxql::point::Points::delta_from_cumulative(&measurements)?
oxql_types::point::Points::delta_from_cumulative(&measurements)?
} else {
oxql::point::Points::gauge_from_gauge(&measurements)?
oxql_types::point::Points::gauge_from_gauge(&measurements)?
};
timeseries.points = points;
debug!(
Expand Down Expand Up @@ -1108,17 +1110,15 @@ fn update_total_rows_and_check(
mod tests {
use super::ConsistentKeyGroup;
use crate::client::oxql::chunk_consistent_key_groups_impl;
use crate::{
oxql::{point::Points, Table, Timeseries},
Client, DbWrite,
};
use crate::{Client, DbWrite};
use crate::{Metric, Target};
use chrono::{DateTime, Utc};
use dropshot::test_util::LogContext;
use omicron_test_utils::dev::clickhouse::ClickHouseInstance;
use omicron_test_utils::dev::test_setup_log;
use oximeter::{types::Cumulative, FieldValue};
use oximeter::{DatumType, Sample};
use oxql_types::{point::Points, Table, Timeseries};
use std::collections::BTreeMap;
use std::time::Duration;

Expand Down
3 changes: 1 addition & 2 deletions oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use dropshot::EmptyScanParams;
use dropshot::PaginationParams;
pub use oximeter::schema::FieldSchema;
pub use oximeter::schema::FieldSource;
use oximeter::schema::TimeseriesKey;
pub use oximeter::schema::TimeseriesName;
pub use oximeter::schema::TimeseriesSchema;
pub use oximeter::DatumType;
Expand Down Expand Up @@ -267,8 +268,6 @@ pub async fn make_client(
Ok(client)
}

pub(crate) type TimeseriesKey = u64;

// TODO-cleanup: Add the timeseries version in to the computation of the key.
// This will require a full drop of the database, since we're changing the
// sorting key and the timeseries key on each past sample. See
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use crate::FieldSchema;
use crate::FieldSource;
use crate::Metric;
use crate::Target;
use crate::TimeseriesKey;
use crate::TimeseriesSchema;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use num::traits::Zero;
use oximeter::histogram::Histogram;
use oximeter::schema::TimeseriesKey;
use oximeter::traits;
use oximeter::types::Cumulative;
use oximeter::types::Datum;
Expand Down
35 changes: 17 additions & 18 deletions oximeter/db/src/oxql/ast/table_ops/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@

// Copyright 2024 Oxide Computer Company

use crate::oxql::point::DataType;
use crate::oxql::point::MetricType;
use crate::oxql::point::Points;
use crate::oxql::point::ValueArray;
use crate::oxql::point::Values;
use crate::oxql::query::Alignment;
use crate::oxql::Error;
use crate::oxql::Table;
use crate::oxql::Timeseries;
use anyhow::Context;
use anyhow::Error;
use chrono::DateTime;
use chrono::TimeDelta;
use chrono::Utc;
use oxql_types::point::DataType;
use oxql_types::point::MetricType;
use oxql_types::point::Points;
use oxql_types::point::ValueArray;
use oxql_types::point::Values;
use oxql_types::Alignment;
use oxql_types::Table;
use oxql_types::Timeseries;
use std::time::Duration;

// The maximum factor by which an alignment operation may upsample data.
Expand Down Expand Up @@ -144,7 +144,7 @@ fn align_mean_within(
"Alignment by mean requires a gauge or delta metric, not {}",
metric_type,
);
verify_max_upsampling_ratio(&points.timestamps, &period)?;
verify_max_upsampling_ratio(points.timestamps(), &period)?;

// Always convert the output to doubles, when computing the mean. The
// output is always a gauge, so we do not need the start times of the
Expand Down Expand Up @@ -179,7 +179,7 @@ fn align_mean_within(
// - Compute the mean of those.
let period_ =
TimeDelta::from_std(*period).context("time delta out of range")?;
let first_timestamp = points.timestamps[0];
let first_timestamp = points.timestamps()[0];
let mut ix: u32 = 0;
loop {
// Compute the next output timestamp, by shifting the query end time
Expand Down Expand Up @@ -220,15 +220,15 @@ fn align_mean_within(
// entries.
let output_value = if matches!(metric_type, MetricType::Gauge) {
mean_gauge_value_in_window(
&points.timestamps,
points.timestamps(),
&input_points,
window_start,
output_time,
)
} else {
mean_delta_value_in_window(
points.start_times.as_ref().unwrap(),
&points.timestamps,
points.start_times().unwrap(),
points.timestamps(),
&input_points,
window_start,
output_time,
Expand All @@ -255,10 +255,9 @@ fn align_mean_within(
ValueArray::Double(output_values.into_iter().rev().collect());
let timestamps = output_timestamps.into_iter().rev().collect();
let values = Values { values, metric_type: MetricType::Gauge };
new_timeseries.points =
Points { start_times: None, timestamps, values: vec![values] };
new_timeseries.alignment =
Some(Alignment { end_time: *query_end, period: *period });
new_timeseries.points = Points::new(None, timestamps, vec![values]);
new_timeseries
.set_alignment(Alignment { end_time: *query_end, period: *period });
output_table.insert(new_timeseries).unwrap();
}
Ok(output_table)
Expand Down
Loading

0 comments on commit 4e8c731

Please sign in to comment.