From f78bd81646e02b528f8314bf2803511c2e5d2641 Mon Sep 17 00:00:00 2001 From: Ho Kim Date: Thu, 23 May 2024 15:59:48 +0000 Subject: [PATCH] feat(kubegraph): implement generic fake dist generator --- Cargo.toml | 1 + crates/kubegraph/api/src/connector/fake.rs | 6 ++-- crates/kubegraph/connector/fake/Cargo.toml | 1 + crates/kubegraph/connector/fake/src/lib.rs | 2 -- .../connector/fake/src/model/dist.rs | 36 +++++++++++++++++++ .../kubegraph/connector/fake/src/model/mod.rs | 8 +++++ .../connector/fake/src/model/normal.rs | 33 ++++++++++++----- .../kubegraph/gateway/problems/warehouse.yaml | 6 ++-- 8 files changed, 77 insertions(+), 16 deletions(-) create mode 100644 crates/kubegraph/connector/fake/src/model/dist.rs diff --git a/Cargo.toml b/Cargo.toml index b1badd8b..f591502b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -194,6 +194,7 @@ prometheus-http-query = { version = "0.8", default-features = false, features = pyo3 = { version = "0.21" } r2r = { version = "0.9" } rand = { version = "0.8" } +rand_distr = { version = "0.4" } rdkafka = { version = "0.36", features = ["cmake-build"] } regex = { version = "1.10" } reqwest = { version = "0.12", default-features = false, features = [ diff --git a/crates/kubegraph/api/src/connector/fake.rs b/crates/kubegraph/api/src/connector/fake.rs index d9794b03..b6738410 100644 --- a/crates/kubegraph/api/src/connector/fake.rs +++ b/crates/kubegraph/api/src/connector/fake.rs @@ -51,7 +51,7 @@ impl NetworkConnectorFakeDataModel { None } - const fn default_seed() -> Option { + const fn default_seed() -> Option { None } @@ -88,7 +88,7 @@ mod impl_json_schema_for_fake_data_model { prefix: Option, r#type: NetworkConnectorFakeDataModelType, #[serde(default = "super::NetworkConnectorFakeDataModel::default_seed")] - seed: Option, + seed: Option, #[serde(default = "super::NetworkConnectorFakeDataModel::default_std")] #[validate(range(min = 0.0))] std: f64, @@ -148,7 +148,7 @@ pub mod model { #[serde(default = "super::NetworkConnectorFakeDataModel::default_mean")] pub mean: f64, #[serde(default = "super::NetworkConnectorFakeDataModel::default_seed")] - pub seed: Option, + pub seed: Option, #[serde(default = "super::NetworkConnectorFakeDataModel::default_std")] #[validate(range(min = 0.0))] pub std: f64, diff --git a/crates/kubegraph/connector/fake/Cargo.toml b/crates/kubegraph/connector/fake/Cargo.toml index c77c1238..8381640b 100644 --- a/crates/kubegraph/connector/fake/Cargo.toml +++ b/crates/kubegraph/connector/fake/Cargo.toml @@ -39,4 +39,5 @@ async-trait = { workspace = true } futures = { workspace = true } polars = { workspace = true } rand = { workspace = true } +rand_distr = { workspace = true } tracing = { workspace = true } diff --git a/crates/kubegraph/connector/fake/src/lib.rs b/crates/kubegraph/connector/fake/src/lib.rs index a9c24e75..f207b645 100644 --- a/crates/kubegraph/connector/fake/src/lib.rs +++ b/crates/kubegraph/connector/fake/src/lib.rs @@ -84,8 +84,6 @@ impl NetworkConnectorItem { let GraphScope { namespace, name } = &scope; info!("Loading fake connector: {namespace}/{name}"); - dbg!(nodes.clone().generate(&scope)?.collect().await?); - Ok(Graph { data: GraphData { edges: edges.generate(&scope).map_err(|error| { diff --git a/crates/kubegraph/connector/fake/src/model/dist.rs b/crates/kubegraph/connector/fake/src/model/dist.rs new file mode 100644 index 00000000..faf3a8ba --- /dev/null +++ b/crates/kubegraph/connector/fake/src/model/dist.rs @@ -0,0 +1,36 @@ +use kubegraph_api::connector::fake::NetworkConnectorFakeDataValueType; +use polars::{error::PolarsError, series::Series}; +use rand::Rng; +use rand_distr::Distribution; + +pub(super) struct GenericDistModel { + pub(super) count: usize, + pub(super) dist: D, + pub(super) rng: R, + pub(super) value_type: NetworkConnectorFakeDataValueType, +} + +impl<'a, D, R> super::DataGenerator<'a> for GenericDistModel +where + D: Distribution, + R: Rng, +{ + type Args = (); + type Error = PolarsError; + type Output = Series; + + fn generate( + self, + (): >::Args, + ) -> Result<>::Output, >::Error> + { + let Self { + count, + dist, + rng, + value_type, + } = self; + + Series::from_iter(rng.sample_iter::(dist).take(count)).cast(&value_type.into()) + } +} diff --git a/crates/kubegraph/connector/fake/src/model/mod.rs b/crates/kubegraph/connector/fake/src/model/mod.rs index 9f6cbfb9..f413a2ec 100644 --- a/crates/kubegraph/connector/fake/src/model/mod.rs +++ b/crates/kubegraph/connector/fake/src/model/mod.rs @@ -1,10 +1,12 @@ mod constant; +mod dist; mod name; mod normal; use anyhow::{anyhow, Error, Result}; use kubegraph_api::{ connector::fake::{ + model::{ConstantModel, NormalModel}, NetworkConnectorFakeData, NetworkConnectorFakeDataFrame, NetworkConnectorFakeDataModel, }, frame::LazyFrame, @@ -88,6 +90,12 @@ impl<'a> DataGenerator<'a> for NetworkConnectorFakeDataModel { match self { Self::Constant(model) => model.generate(count), Self::Name(model) => model.generate((scope, count)), + Self::Normal(NormalModel { + mean: value, + std, + value_type, + .. + }) if std <= 0.0 => ConstantModel { value, value_type }.generate(count), Self::Normal(model) => model.generate(count), } } diff --git a/crates/kubegraph/connector/fake/src/model/normal.rs b/crates/kubegraph/connector/fake/src/model/normal.rs index 08e8046a..7deec8d6 100644 --- a/crates/kubegraph/connector/fake/src/model/normal.rs +++ b/crates/kubegraph/connector/fake/src/model/normal.rs @@ -1,6 +1,9 @@ use kubegraph_api::connector::fake::model::NormalModel; use polars::{error::PolarsError, series::Series}; -use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng}; +use rand::{rngs::SmallRng, thread_rng, SeedableRng}; +use rand_distr::Normal; + +use super::dist::GenericDistModel; impl<'a> super::DataGenerator<'a> for NormalModel { type Args = usize; @@ -15,14 +18,28 @@ impl<'a> super::DataGenerator<'a> for NormalModel { let Self { mean, seed, - std, + std: std_dev, value_type, } = self; - Series::from_iter( - StdRng::from_entropy() - .sample_iter::(Standard) - .take(count), - ) - .cast(&value_type.into()) + + let dist = Normal::new(mean, std_dev) + .map_err(|error| PolarsError::ComputeError(error.to_string().into()))?; + + match seed { + Some(seed) => GenericDistModel { + count, + dist, + rng: SmallRng::seed_from_u64(seed), + value_type, + } + .generate(()), + None => GenericDistModel { + count, + dist, + rng: thread_rng(), + value_type, + } + .generate(()), + } } } diff --git a/crates/kubegraph/gateway/problems/warehouse.yaml b/crates/kubegraph/gateway/problems/warehouse.yaml index 1a0ec13b..d782772c 100644 --- a/crates/kubegraph/gateway/problems/warehouse.yaml +++ b/crates/kubegraph/gateway/problems/warehouse.yaml @@ -20,7 +20,7 @@ spec: supply: payload fake: nodes: - count: 30 + count: 2 frame: name: type: Name @@ -31,7 +31,7 @@ spec: valueType: I64 payload: type: Normal - mean: 300 + mean: 200 std: 20 valueType: I64 unit_cost: @@ -57,5 +57,5 @@ spec: dummy: {} filter: src != sink and src.supply > 0 and src.supply > sink.supply script: | - capacity = 49; + capacity = 50; unit_cost = 1;