Skip to content

Commit

Permalink
feat(kubegraph): implement generic fake dist generator
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed May 23, 2024
1 parent 4c4ca69 commit f78bd81
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ prometheus-http-query = { version = "0.8", default-features = false, features =
pyo3 = { version = "0.21" }
r2r = { version = "0.9" }
rand = { version = "0.8" }
rand_distr = { version = "0.4" }
rdkafka = { version = "0.36", features = ["cmake-build"] }
regex = { version = "1.10" }
reqwest = { version = "0.12", default-features = false, features = [
Expand Down
6 changes: 3 additions & 3 deletions crates/kubegraph/api/src/connector/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl NetworkConnectorFakeDataModel {
None
}

const fn default_seed() -> Option<i64> {
const fn default_seed() -> Option<u64> {
None
}

Expand Down Expand Up @@ -88,7 +88,7 @@ mod impl_json_schema_for_fake_data_model {
prefix: Option<String>,
r#type: NetworkConnectorFakeDataModelType,
#[serde(default = "super::NetworkConnectorFakeDataModel::default_seed")]
seed: Option<i64>,
seed: Option<u64>,
#[serde(default = "super::NetworkConnectorFakeDataModel::default_std")]
#[validate(range(min = 0.0))]
std: f64,
Expand Down Expand Up @@ -148,7 +148,7 @@ pub mod model {
#[serde(default = "super::NetworkConnectorFakeDataModel::default_mean")]
pub mean: f64,
#[serde(default = "super::NetworkConnectorFakeDataModel::default_seed")]
pub seed: Option<i64>,
pub seed: Option<u64>,
#[serde(default = "super::NetworkConnectorFakeDataModel::default_std")]
#[validate(range(min = 0.0))]
pub std: f64,
Expand Down
1 change: 1 addition & 0 deletions crates/kubegraph/connector/fake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ async-trait = { workspace = true }
futures = { workspace = true }
polars = { workspace = true }
rand = { workspace = true }
rand_distr = { workspace = true }
tracing = { workspace = true }
2 changes: 0 additions & 2 deletions crates/kubegraph/connector/fake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ impl NetworkConnectorItem {
let GraphScope { namespace, name } = &scope;
info!("Loading fake connector: {namespace}/{name}");

dbg!(nodes.clone().generate(&scope)?.collect().await?);

Ok(Graph {
data: GraphData {
edges: edges.generate(&scope).map_err(|error| {
Expand Down
36 changes: 36 additions & 0 deletions crates/kubegraph/connector/fake/src/model/dist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use kubegraph_api::connector::fake::NetworkConnectorFakeDataValueType;
use polars::{error::PolarsError, series::Series};
use rand::Rng;
use rand_distr::Distribution;

pub(super) struct GenericDistModel<D, R> {
pub(super) count: usize,
pub(super) dist: D,
pub(super) rng: R,
pub(super) value_type: NetworkConnectorFakeDataValueType,
}

impl<'a, D, R> super::DataGenerator<'a> for GenericDistModel<D, R>
where
D: Distribution<f64>,
R: Rng,
{
type Args = ();
type Error = PolarsError;
type Output = Series;

fn generate(
self,
(): <Self as super::DataGenerator<'a>>::Args,
) -> Result<<Self as super::DataGenerator<'a>>::Output, <Self as super::DataGenerator<'a>>::Error>
{
let Self {
count,
dist,
rng,
value_type,
} = self;

Series::from_iter(rng.sample_iter::<f64, _>(dist).take(count)).cast(&value_type.into())
}
}
8 changes: 8 additions & 0 deletions crates/kubegraph/connector/fake/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod constant;
mod dist;
mod name;
mod normal;

use anyhow::{anyhow, Error, Result};
use kubegraph_api::{
connector::fake::{
model::{ConstantModel, NormalModel},
NetworkConnectorFakeData, NetworkConnectorFakeDataFrame, NetworkConnectorFakeDataModel,
},
frame::LazyFrame,
Expand Down Expand Up @@ -88,6 +90,12 @@ impl<'a> DataGenerator<'a> for NetworkConnectorFakeDataModel {
match self {
Self::Constant(model) => model.generate(count),
Self::Name(model) => model.generate((scope, count)),
Self::Normal(NormalModel {
mean: value,
std,
value_type,
..
}) if std <= 0.0 => ConstantModel { value, value_type }.generate(count),
Self::Normal(model) => model.generate(count),
}
}
Expand Down
33 changes: 25 additions & 8 deletions crates/kubegraph/connector/fake/src/model/normal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use kubegraph_api::connector::fake::model::NormalModel;
use polars::{error::PolarsError, series::Series};
use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng};
use rand::{rngs::SmallRng, thread_rng, SeedableRng};
use rand_distr::Normal;

use super::dist::GenericDistModel;

impl<'a> super::DataGenerator<'a> for NormalModel {
type Args = usize;
Expand All @@ -15,14 +18,28 @@ impl<'a> super::DataGenerator<'a> for NormalModel {
let Self {
mean,
seed,
std,
std: std_dev,
value_type,
} = self;
Series::from_iter(
StdRng::from_entropy()
.sample_iter::<f64, _>(Standard)
.take(count),
)
.cast(&value_type.into())

let dist = Normal::new(mean, std_dev)
.map_err(|error| PolarsError::ComputeError(error.to_string().into()))?;

match seed {
Some(seed) => GenericDistModel {
count,
dist,
rng: SmallRng::seed_from_u64(seed),
value_type,
}
.generate(()),
None => GenericDistModel {
count,
dist,
rng: thread_rng(),
value_type,
}
.generate(()),
}
}
}
6 changes: 3 additions & 3 deletions crates/kubegraph/gateway/problems/warehouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
supply: payload
fake:
nodes:
count: 30
count: 2
frame:
name:
type: Name
Expand All @@ -31,7 +31,7 @@ spec:
valueType: I64
payload:
type: Normal
mean: 300
mean: 200
std: 20
valueType: I64
unit_cost:
Expand All @@ -57,5 +57,5 @@ spec:
dummy: {}
filter: src != sink and src.supply > 0 and src.supply > sink.supply
script: |
capacity = 49;
capacity = 50;
unit_cost = 1;

0 comments on commit f78bd81

Please sign in to comment.