From f3c2db4ac864febd69f9970498d1ff123d425e4c Mon Sep 17 00:00:00 2001 From: Ho Kim Date: Thu, 16 May 2024 14:21:45 +0000 Subject: [PATCH] feat(kubegraph): begin implementation of basic simulation gateway --- crates/kubegraph/api/Cargo.toml | 5 +- .../src/{connector.rs => connector/mod.rs} | 47 +-- .../kubegraph/api/src/connector/prometheus.rs | 20 + .../kubegraph/api/src/connector/simulation.rs | 31 ++ crates/kubegraph/api/src/frame/mod.rs | 15 +- crates/kubegraph/api/src/function/dummy.rs | 11 + .../api/src/{function.rs => function/mod.rs} | 13 +- .../api/src/{graph.rs => graph/mod.rs} | 127 +++++-- crates/kubegraph/api/src/graph/polars.rs | 37 ++ crates/kubegraph/api/src/problem.rs | 93 +---- crates/kubegraph/api/src/query.rs | 2 - .../kubegraph/connector/prometheus/src/lib.rs | 13 +- .../kubegraph/connector/simulation/Cargo.toml | 6 +- .../kubegraph/connector/simulation/src/lib.rs | 358 ++++++++++-------- .../simulation/src/schema/constraint.rs | 12 - .../connector/simulation/src/schema/edge.rs | 6 - .../simulation/src/schema/function.rs | 81 ---- .../connector/simulation/src/schema/mod.rs | 45 --- .../connector/simulation/src/schema/node.rs | 11 - .../connector/simulation/src/schema/value.rs | 5 - crates/kubegraph/function/dummy/Cargo.toml | 4 + crates/kubegraph/gateway/Cargo.toml | 16 +- .../kubegraph/gateway/problems/warehouse.yaml | 22 ++ .../gateway/problems/warehouse/env.yaml | 44 --- .../gateway/problems/warehouse/nodes.csv | 3 + crates/kubegraph/gateway/src/connector.rs | 38 +- crates/kubegraph/gateway/src/db.rs | 8 +- crates/kubegraph/gateway/src/reloader.rs | 2 +- crates/kubegraph/operator/Cargo.toml | 4 +- crates/kubegraph/solver/ortools/src/polars.rs | 14 +- .../solver/ortools/tests/simple_solver.rs | 15 +- crates/kubegraph/twin/simulator/src/polars.rs | 14 +- crates/kubegraph/vm/local/src/func.rs | 9 +- crates/kubegraph/vm/local/src/lib.rs | 19 +- 34 files changed, 509 insertions(+), 641 deletions(-) rename crates/kubegraph/api/src/{connector.rs => connector/mod.rs} (80%) create mode 100644 crates/kubegraph/api/src/connector/prometheus.rs create mode 100644 crates/kubegraph/api/src/connector/simulation.rs create mode 100644 crates/kubegraph/api/src/function/dummy.rs rename crates/kubegraph/api/src/{function.rs => function/mod.rs} (83%) rename crates/kubegraph/api/src/{graph.rs => graph/mod.rs} (66%) create mode 100644 crates/kubegraph/api/src/graph/polars.rs delete mode 100644 crates/kubegraph/connector/simulation/src/schema/constraint.rs delete mode 100644 crates/kubegraph/connector/simulation/src/schema/edge.rs delete mode 100644 crates/kubegraph/connector/simulation/src/schema/function.rs delete mode 100644 crates/kubegraph/connector/simulation/src/schema/mod.rs delete mode 100644 crates/kubegraph/connector/simulation/src/schema/node.rs delete mode 100644 crates/kubegraph/connector/simulation/src/schema/value.rs delete mode 100644 crates/kubegraph/gateway/problems/warehouse/env.yaml create mode 100644 crates/kubegraph/gateway/problems/warehouse/nodes.csv diff --git a/crates/kubegraph/api/Cargo.toml b/crates/kubegraph/api/Cargo.toml index 2e61448e..53e5d798 100644 --- a/crates/kubegraph/api/Cargo.toml +++ b/crates/kubegraph/api/Cargo.toml @@ -20,8 +20,9 @@ workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["full"] -full = ["connector-full", "df-full"] +default = ["minimal"] +minimal = ["connector-simulation", "df-polars", "function-dummy"] +full = ["connector-full", "df-full", "function-full"] # Connectors connector-full = ["connector-prometheus", "connector-simulation"] diff --git a/crates/kubegraph/api/src/connector.rs b/crates/kubegraph/api/src/connector/mod.rs similarity index 80% rename from crates/kubegraph/api/src/connector.rs rename to crates/kubegraph/api/src/connector/mod.rs index f74964fe..5c1b251b 100644 --- a/crates/kubegraph/api/src/connector.rs +++ b/crates/kubegraph/api/src/connector/mod.rs @@ -1,10 +1,11 @@ -use std::{ - path::PathBuf, - time::{Duration, Instant}, -}; +#[cfg(feature = "connector-prometheus")] +pub mod prometheus; +#[cfg(feature = "connector-simulation")] +pub mod simulation; + +use std::time::{Duration, Instant}; use anyhow::Result; -use ark_core_k8s::data::Url; use async_trait::async_trait; use kube::CustomResource; use schemars::JsonSchema; @@ -16,14 +17,14 @@ use crate::db::NetworkGraphDB; #[async_trait] pub trait NetworkConnectors { - async fn add_connector(&self, namespace: String, name: String, spec: NetworkConnectorSpec); + async fn add_connector(&self, object: NetworkConnectorCrd); async fn delete_connector(&self, namespace: String, name: String); async fn get_connectors( &self, r#type: NetworkConnectorSourceRef, - ) -> Option>; + ) -> Option>; } #[async_trait] @@ -64,6 +65,7 @@ pub trait NetworkConnector { kind = "NetworkConnector", root = "NetworkConnectorCrd", shortname = "nc", + namespaced, printcolumn = r#"{ "name": "created-at", "type": "date", @@ -78,11 +80,12 @@ pub trait NetworkConnector { }"# )] #[serde(rename_all = "camelCase")] +#[non_exhaustive] pub enum NetworkConnectorSpec { #[cfg(feature = "connector-prometheus")] - Prometheus(NetworkConnectorPrometheusSpec), + Prometheus(self::prometheus::NetworkConnectorPrometheusSpec), #[cfg(feature = "connector-simulation")] - Simulation(NetworkConnectorSimulationSpec), + Simulation(self::simulation::NetworkConnectorSimulationSpec), } impl NetworkConnectorSpec { @@ -119,6 +122,7 @@ impl PartialEq for NetworkConnectorSpec { Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, )] #[serde(rename_all = "camelCase")] +#[non_exhaustive] pub enum NetworkConnectorSourceRef { #[cfg(feature = "connector-prometheus")] Prometheus, @@ -136,28 +140,3 @@ impl NetworkConnectorSourceRef { } } } - -#[cfg(feature = "connector-prometheus")] -#[derive( - Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct NetworkConnectorPrometheusSpec { - pub template: crate::query::NetworkQuery, - pub url: Url, -} - -impl NetworkConnectorPrometheusSpec { - pub const fn name(&self) -> &'static str { - self.template.name() - } -} - -#[cfg(feature = "connector-simulation")] -#[derive( - Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct NetworkConnectorSimulationSpec { - pub path: PathBuf, -} diff --git a/crates/kubegraph/api/src/connector/prometheus.rs b/crates/kubegraph/api/src/connector/prometheus.rs new file mode 100644 index 00000000..31aa6035 --- /dev/null +++ b/crates/kubegraph/api/src/connector/prometheus.rs @@ -0,0 +1,20 @@ +use ark_core_k8s::data::Url; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::query::NetworkQuery; + +#[derive( + Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct NetworkConnectorPrometheusSpec { + pub template: NetworkQuery, + pub url: Url, +} + +impl NetworkConnectorPrometheusSpec { + pub const fn name(&self) -> &'static str { + self.template.name() + } +} diff --git a/crates/kubegraph/api/src/connector/simulation.rs b/crates/kubegraph/api/src/connector/simulation.rs new file mode 100644 index 00000000..bf49c5ca --- /dev/null +++ b/crates/kubegraph/api/src/connector/simulation.rs @@ -0,0 +1,31 @@ +use std::path::PathBuf; + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::graph::NetworkGraphMetadata; + +#[derive( + Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct NetworkConnectorSimulationSpec { + #[serde(default)] + pub metadata: NetworkGraphMetadata, + pub path: PathBuf, + + #[serde(default = "NetworkConnectorSimulationSpec::default_key_edges")] + pub key_edges: String, + #[serde(default = "NetworkConnectorSimulationSpec::default_key_nodes")] + pub key_nodes: String, +} + +impl NetworkConnectorSimulationSpec { + fn default_key_edges() -> String { + "edges.csv".into() + } + + fn default_key_nodes() -> String { + "nodes.csv".into() + } +} diff --git a/crates/kubegraph/api/src/frame/mod.rs b/crates/kubegraph/api/src/frame/mod.rs index 40b51fb1..8ceb99a7 100644 --- a/crates/kubegraph/api/src/frame/mod.rs +++ b/crates/kubegraph/api/src/frame/mod.rs @@ -9,8 +9,9 @@ use pl::lazy::dsl; use crate::{ function::FunctionMetadata, + graph::NetworkGraphMetadata, ops::{And, Eq, Ge, Gt, Le, Lt, Ne, Or}, - problem::{ProblemMetadata, ProblemSpec}, + problem::ProblemSpec, vm::{Feature, Number}, }; @@ -51,17 +52,17 @@ impl LazyFrame { #[allow(unused_variables)] let ProblemSpec { metadata: - ProblemMetadata { + NetworkGraphMetadata { + capacity, flow: _, function: _, name, sink, src, - verbose: _, + supply: _, + unit_cost: _, }, - capacity, - supply: _, - unit_cost: _, + verbose: _, } = problem; #[cfg(feature = "df-polars")] @@ -85,7 +86,7 @@ impl LazyFrame { Self::Polars(nodes) => Ok(Self::Polars( select_polars_edge_side(&nodes, name, src) .cross_join(select_polars_edge_side(&nodes, name, sink)) - .with_column(dsl::lit(ProblemMetadata::MAX_CAPACITY).alias(capacity.as_ref())), + .with_column(dsl::lit(ProblemSpec::MAX_CAPACITY).alias(capacity.as_ref())), )), } } diff --git a/crates/kubegraph/api/src/function/dummy.rs b/crates/kubegraph/api/src/function/dummy.rs new file mode 100644 index 00000000..cb3d4fbb --- /dev/null +++ b/crates/kubegraph/api/src/function/dummy.rs @@ -0,0 +1,11 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive( + Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct NetworkFunctionDummySpec { + pub filter: Filter, + pub script: Script, +} diff --git a/crates/kubegraph/api/src/function.rs b/crates/kubegraph/api/src/function/mod.rs similarity index 83% rename from crates/kubegraph/api/src/function.rs rename to crates/kubegraph/api/src/function/mod.rs index f7e543df..6db2f05c 100644 --- a/crates/kubegraph/api/src/function.rs +++ b/crates/kubegraph/api/src/function/mod.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "function-dummy")] +pub mod dummy; + use kube::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -9,6 +12,7 @@ use serde::{Deserialize, Serialize}; kind = "NetworkFunction", root = "NetworkFunctionCrd", shortname = "nf", + namespaced, printcolumn = r#"{ "name": "created-at", "type": "date", @@ -23,17 +27,12 @@ use serde::{Deserialize, Serialize}; }"# )] #[serde(rename_all = "camelCase")] +#[non_exhaustive] pub enum NetworkFunctionSpec { #[cfg(feature = "function-dummy")] - Dummy(NetworkFunctionDummySpec), + Dummy(self::dummy::NetworkFunctionDummySpec), } -#[derive( - Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct NetworkFunctionDummySpec {} - #[derive( Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, )] diff --git a/crates/kubegraph/api/src/graph.rs b/crates/kubegraph/api/src/graph/mod.rs similarity index 66% rename from crates/kubegraph/api/src/graph.rs rename to crates/kubegraph/api/src/graph/mod.rs index 19a9ffc1..67697421 100644 --- a/crates/kubegraph/api/src/graph.rs +++ b/crates/kubegraph/api/src/graph/mod.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "df-polars")] +pub mod polars; + use std::fmt; use anyhow::Result; @@ -50,40 +53,79 @@ pub struct Graph { pub nodes: T, } -#[cfg(feature = "df-polars")] -impl From> for Graph { - fn from(graph: Graph<::pl::lazy::frame::LazyFrame>) -> Self { - let Graph { edges, nodes } = graph; +#[derive( + Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct NetworkGraphMetadata { + #[serde(default = "NetworkGraphMetadata::default_capacity")] + pub capacity: String, + #[serde(default = "NetworkGraphMetadata::default_flow")] + pub flow: String, + #[serde(default = "NetworkGraphMetadata::default_function")] + pub function: String, + #[serde(default = "NetworkGraphMetadata::default_name")] + pub name: String, + #[serde(default = "NetworkGraphMetadata::default_sink")] + pub sink: String, + #[serde(default = "NetworkGraphMetadata::default_src")] + pub src: String, + #[serde(default = "NetworkGraphMetadata::default_supply")] + pub supply: String, + #[serde(default = "NetworkGraphMetadata::default_unit_cost")] + pub unit_cost: String, +} + +impl Default for NetworkGraphMetadata { + fn default() -> Self { Self { - edges: LazyFrame::Polars(edges), - nodes: LazyFrame::Polars(nodes), + capacity: Self::default_capacity(), + flow: Self::default_flow(), + function: Self::default_function(), + name: Self::default_name(), + sink: Self::default_sink(), + src: Self::default_src(), + supply: Self::default_supply(), + unit_cost: Self::default_unit_cost(), } } } -#[cfg(feature = "df-polars")] -impl From> for Graph<::pl::lazy::frame::LazyFrame> { - fn from(graph: Graph<::pl::frame::DataFrame>) -> Self { - use pl::lazy::frame::IntoLazy; +impl NetworkGraphMetadata { + pub fn default_capacity() -> String { + "capacity".into() + } - let Graph { edges, nodes } = graph; - Self { - edges: edges.lazy(), - nodes: nodes.lazy(), - } + pub fn default_flow() -> String { + "flow".into() } -} -#[cfg(feature = "df-polars")] -impl TryFrom> for Graph<::pl::frame::DataFrame> { - type Error = ::pl::error::PolarsError; + pub fn default_function() -> String { + "function".into() + } + + pub fn default_name() -> String { + "name".into() + } + + pub fn default_link() -> String { + "link".into() + } + + pub fn default_sink() -> String { + "sink".into() + } + + pub fn default_src() -> String { + "src".into() + } - fn try_from(graph: Graph<::pl::lazy::frame::LazyFrame>) -> Result { - let Graph { edges, nodes } = graph; - Ok(Self { - edges: edges.collect()?, - nodes: nodes.collect()?, - }) + pub fn default_supply() -> String { + "supply".into() + } + + pub fn default_unit_cost() -> String { + "unit_cost".into() } } @@ -123,8 +165,6 @@ pub struct NetworkEntry { )] #[serde(rename_all = "camelCase")] pub struct NetworkEntryKeyFilter { - #[serde(default)] - pub kind: Option, #[serde(default)] pub namespace: Option, } @@ -142,8 +182,9 @@ impl NetworkEntryKeyFilter { } fn contains_node_key(&self, key: &NetworkNodeKey) -> bool { - let Self { kind, namespace } = self; + let Self { namespace } = self; + #[inline] fn test(a: &Option, b: &String) -> bool { match a.as_ref() { Some(a) => a == b, @@ -151,7 +192,7 @@ impl NetworkEntryKeyFilter { } } - test(kind, &key.kind) && test(namespace, &key.namespace) + test(namespace, &key.namespace) } } @@ -220,29 +261,31 @@ pub struct NetworkNode { )] #[serde(rename_all = "camelCase")] pub struct NetworkNodeKey { - pub kind: String, pub name: String, pub namespace: String, } impl fmt::Display for NetworkNodeKey { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let Self { - kind, - name, - namespace, - } = self; + let Self { name, namespace } = self; - write!(f, "{kind}/{namespace}/{name}") + write!(f, "{namespace}/{name}") } } -#[derive(Clone, Debug, PartialEq, PartialOrd, Serialize, Deserialize, JsonSchema)] -#[serde(untagged)] -pub enum NetworkValue { - Boolean(bool), - Number(f64), - String(String), +#[derive(Clone, Debug, Default, PartialEq, PartialOrd, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct NetworkValue { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub capacity: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub function: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub flow: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub supply: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub unit_cost: Option, } mod prefix { diff --git a/crates/kubegraph/api/src/graph/polars.rs b/crates/kubegraph/api/src/graph/polars.rs new file mode 100644 index 00000000..63db5c48 --- /dev/null +++ b/crates/kubegraph/api/src/graph/polars.rs @@ -0,0 +1,37 @@ +use pl::{ + error::PolarsError, + frame::DataFrame, + lazy::frame::{IntoLazy, LazyFrame}, +}; + +impl From> for super::Graph { + fn from(graph: super::Graph) -> Self { + let super::Graph { edges, nodes } = graph; + Self { + edges: super::LazyFrame::Polars(edges), + nodes: super::LazyFrame::Polars(nodes), + } + } +} + +impl From> for super::Graph { + fn from(graph: super::Graph) -> Self { + let super::Graph { edges, nodes } = graph; + Self { + edges: edges.lazy(), + nodes: nodes.lazy(), + } + } +} + +impl TryFrom> for super::Graph { + type Error = PolarsError; + + fn try_from(graph: super::Graph) -> Result { + let super::Graph { edges, nodes } = graph; + Ok(Self { + edges: edges.collect()?, + nodes: nodes.collect()?, + }) + } +} diff --git a/crates/kubegraph/api/src/problem.rs b/crates/kubegraph/api/src/problem.rs index 0dbf080e..a22e4469 100644 --- a/crates/kubegraph/api/src/problem.rs +++ b/crates/kubegraph/api/src/problem.rs @@ -2,6 +2,8 @@ use kube::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::graph::NetworkGraphMetadata; + #[derive( Clone, Debug, @@ -20,7 +22,8 @@ use serde::{Deserialize, Serialize}; version = "v1alpha1", kind = "NetworkProblem", root = "NetworkProblemCrd", - shortname = "prob", + shortname = "np", + namespaced, printcolumn = r#"{ "name": "created-at", "type": "date", @@ -37,101 +40,25 @@ use serde::{Deserialize, Serialize}; #[serde(rename_all = "camelCase")] pub struct ProblemSpec { #[serde(default, flatten)] - pub metadata: ProblemMetadata, - #[serde(default = "ProblemSpec::default_capacity")] - pub capacity: String, - #[serde(default = "ProblemSpec::default_supply")] - pub supply: String, - #[serde(default = "ProblemSpec::default_unit_cost")] - pub unit_cost: String, -} - -impl Default for ProblemSpec { - fn default() -> Self { - Self { - metadata: ProblemMetadata::default(), - capacity: Self::default_capacity(), - supply: Self::default_supply(), - unit_cost: Self::default_unit_cost(), - } - } -} + pub metadata: NetworkGraphMetadata, -impl ProblemSpec { - pub fn default_capacity() -> String { - "capacity".into() - } - - pub fn default_supply() -> String { - "supply".into() - } - - pub fn default_unit_cost() -> String { - "unit_cost".into() - } -} - -#[derive( - Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct ProblemMetadata { - #[serde(default = "ProblemMetadata::default_flow")] - pub flow: String, - #[serde(default = "ProblemMetadata::default_function")] - pub function: String, - #[serde(default = "ProblemMetadata::default_name")] - pub name: String, - #[serde(default = "ProblemMetadata::default_sink")] - pub sink: String, - #[serde(default = "ProblemMetadata::default_src")] - pub src: String, - - #[serde(default = "ProblemMetadata::default_verbose")] + #[serde(default = "ProblemSpec::default_verbose")] pub verbose: bool, } -impl Default for ProblemMetadata { +impl Default for ProblemSpec { fn default() -> Self { Self { - flow: Self::default_flow(), - function: Self::default_function(), - name: Self::default_name(), - sink: Self::default_sink(), - src: Self::default_src(), + metadata: NetworkGraphMetadata::default(), verbose: Self::default_verbose(), } } } -impl ProblemMetadata { +impl ProblemSpec { pub const MAX_CAPACITY: u64 = u64::MAX >> 32; - pub fn default_flow() -> String { - "flow".into() - } - - pub fn default_function() -> String { - "function".into() - } - - pub fn default_name() -> String { - "name".into() - } - - pub fn default_link() -> String { - "link".into() - } - - pub fn default_sink() -> String { - "sink".into() - } - - pub fn default_src() -> String { - "src".into() - } - - pub const fn default_verbose() -> bool { + const fn default_verbose() -> bool { false } } diff --git a/crates/kubegraph/api/src/query.rs b/crates/kubegraph/api/src/query.rs index 986fb9c1..251cb5ac 100644 --- a/crates/kubegraph/api/src/query.rs +++ b/crates/kubegraph/api/src/query.rs @@ -94,8 +94,6 @@ mod impl_json_schema_for_network_query_type { )] #[serde(rename_all = "camelCase")] pub struct NetworkQueryNodeType { - #[serde(default)] - pub kind: NetworkQueryNodeValue, #[serde(default)] pub name: NetworkQueryNodeValue, #[serde(default)] diff --git a/crates/kubegraph/connector/prometheus/src/lib.rs b/crates/kubegraph/connector/prometheus/src/lib.rs index 29890aa8..0ecdb9e5 100644 --- a/crates/kubegraph/connector/prometheus/src/lib.rs +++ b/crates/kubegraph/connector/prometheus/src/lib.rs @@ -9,8 +9,8 @@ use async_trait::async_trait; use futures::StreamExt; use kubegraph_api::{ connector::{ - NetworkConnectorPrometheusSpec, NetworkConnectorSourceRef, NetworkConnectorSpec, - NetworkConnectors, + prometheus::NetworkConnectorPrometheusSpec, NetworkConnectorSourceRef, + NetworkConnectorSpec, NetworkConnectors, }, db::NetworkGraphDB, graph::{NetworkEdgeKey, NetworkEntry, NetworkEntryKey, NetworkNodeKey, NetworkValue}, @@ -47,9 +47,8 @@ impl ::kubegraph_api::connector::NetworkConnector for NetworkConnector { info!("Reloading prometheus connector..."); self.db = db .into_iter() - .filter_map(|spec| match spec { + .filter_map(|crd| match crd.spec { NetworkConnectorSpec::Prometheus(spec) => Some(spec), - #[allow(unused_variables)] _ => None, }) .collect(); @@ -128,7 +127,10 @@ async fn pull_with( } NetworkQueryType::Node { node } => NetworkEntryKey::Node(node.search(&metric)?), }, - value: NetworkValue::Number(sample.value()), + value: NetworkValue { + supply: Some(sample.value().round() as i64), + ..Default::default() + }, }) }); @@ -147,7 +149,6 @@ impl Search for NetworkQueryNodeType { #[inline] fn search(&self, metric: &Metric) -> Option<::Output> { Some(NetworkNodeKey { - kind: self.kind.search(metric)?, name: self.name.search(metric)?, namespace: self.namespace.search(metric)?, }) diff --git a/crates/kubegraph/connector/simulation/Cargo.toml b/crates/kubegraph/connector/simulation/Cargo.toml index f5c9028a..4459f5a0 100644 --- a/crates/kubegraph/connector/simulation/Cargo.toml +++ b/crates/kubegraph/connector/simulation/Cargo.toml @@ -29,12 +29,10 @@ connector-prometheus = [] [dependencies] kubegraph-api = { path = "../../api", features = ["connector-simulation"] } -kubegraph-parser = { path = "../../parser" } anyhow = { workspace = true } async-trait = { workspace = true } -schemars = { workspace = true } -serde = { workspace = true } -serde_yaml = { workspace = true } +kube = { workspace = true } +polars = { workspace = true } tokio = { workspace = true, features = ["fs"] } tracing = { workspace = true } diff --git a/crates/kubegraph/connector/simulation/src/lib.rs b/crates/kubegraph/connector/simulation/src/lib.rs index fb7db909..a0f9576d 100644 --- a/crates/kubegraph/connector/simulation/src/lib.rs +++ b/crates/kubegraph/connector/simulation/src/lib.rs @@ -1,35 +1,33 @@ -mod schema; +use std::{collections::BTreeMap, mem::swap, path::Path, time::Duration}; -use std::{collections::BTreeMap, mem::swap, time::Duration}; - -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; +use kube::ResourceExt; use kubegraph_api::{ connector::{ - NetworkConnectorSimulationSpec, NetworkConnectorSourceRef, NetworkConnectorSpec, - NetworkConnectors, + simulation::NetworkConnectorSimulationSpec, NetworkConnectorSourceRef, + NetworkConnectorSpec, NetworkConnectors, }, db::NetworkGraphDB, - graph::{NetworkEntry, NetworkEntryKey, NetworkNodeKey}, + graph::{ + NetworkEdgeKey, NetworkEntry, NetworkEntryKey, NetworkGraphMetadata, NetworkNodeKey, + NetworkValue, + }, +}; +use polars::{ + frame::DataFrame, + io::{csv::CsvReader, SerReader}, + series::Series, }; -use kubegraph_parser::{Filter, FilterParser}; -use serde::Deserialize; use tokio::fs; use tracing::{info, instrument, warn, Level}; -use crate::schema::{ - constraint::NetworkConstraint, function::NetworkFunction, node::NetworkNode, NetworkObjectCrd, - NetworkObjectMetadata, NetworkObjectTemplate, -}; - #[derive(Default)] pub struct NetworkConnector { - db: Vec, + db: Vec, - constraints: BTreeMap>, - functions: BTreeMap, - nodes: BTreeMap, - nodes_new: Vec<(NetworkObjectMetadata, NetworkNode)>, + values: BTreeMap, + values_new: Vec, } #[async_trait] @@ -54,15 +52,20 @@ impl ::kubegraph_api::connector::NetworkConnector for NetworkConnector { info!("Reloading simulation connector..."); self.db = db .into_iter() - .filter_map(|spec| match spec { - NetworkConnectorSpec::Simulation(spec) => Some(spec), - #[allow(unused_variables)] - _ => None, + .filter_map(|crd| { + let namespace = crd.namespace()?; + match crd.spec { + NetworkConnectorSpec::Simulation(spec) => { + Some(NetworkConnectorItem { namespace, spec }) + } + _ => None, + } }) .collect(); - for spec in self.db.clone() { - if let Err(error) = self.load_templates(&spec).await { + for item in self.db.clone() { + if let Err(error) = self.load_templates(&item).await { + let spec = &item.spec; warn!("failed to load simulation templates {spec:?}: {error}"); } } @@ -71,170 +74,191 @@ impl ::kubegraph_api::connector::NetworkConnector for NetworkConnector { return Ok(()); } - // NOTE: ordered - self.pull_nodes(graph).await?; - // self.pull_edges(graph).await?; - self.pull_constraints(graph).await?; - self.pull_functions(graph).await?; - Ok(()) + self.pull_values(graph).await } } impl NetworkConnector { - async fn pull_nodes(&mut self, graph: &impl NetworkGraphDB) -> Result<()> { - if self.nodes_new.is_empty() { + async fn pull_values(&mut self, graph: &impl NetworkGraphDB) -> Result<()> { + if self.values_new.is_empty() { return Ok(()); } - // unregister new nodes, taking the values to a local variable `nodes` - let mut nodes = vec![]; - swap(&mut self.nodes_new, &mut nodes); + // unregister new values, taking to a local variable `values` + let mut values = vec![]; + swap(&mut self.values_new, &mut values); - let entries = nodes.into_iter().flat_map(|(key, value)| { - let NetworkObjectMetadata { name, namespace } = key; - let NetworkNode { values } = value; - - let entry_key = move |kind| NetworkNodeKey { - kind, - name: name.clone(), - namespace: namespace.clone(), - }; - - values.into_iter().map(move |(kind, value)| NetworkEntry { - key: NetworkEntryKey::Node(entry_key(kind)), - value, - }) - }); - - graph.add_entries(entries).await + graph.add_entries(values).await } - async fn pull_constraints(&mut self, graph: &impl NetworkGraphDB) -> Result<()> { - // TODO: to be implemented - Ok(()) - } - - async fn pull_functions(&mut self, graph: &impl NetworkGraphDB) -> Result<()> { - // TODO: to be implemented - Ok(()) - } - - fn apply(&mut self, crd: NetworkObjectCrd) { - let NetworkObjectCrd { - api_version, - metadata: NetworkObjectMetadata { name, namespace }, - template: _, - } = &crd; - - match api_version.as_str() { - "kubegraph.ulagbulag.io/v1alpha1" => self.apply_unchecked(crd), - api_version => warn!("Unsupported API version {api_version:?}: {namespace}/{name}"), - } - } - - fn apply_unchecked(&mut self, crd: NetworkObjectCrd) { - let NetworkObjectCrd { - api_version: _, - metadata, - template, - } = crd; - - let NetworkObjectMetadata { name, namespace } = &metadata; - let r#type = template.name(); - info!("Applying {type} connector: {namespace}/{name}"); - - match template { - NetworkObjectTemplate::Constraint(spec) => match spec.parse() { - Ok(spec) => { - self.constraints.insert(metadata, spec); - } - Err(error) => { - warn!("Failed to parse constraint ({namespace}/{name}): {error}"); - } - }, - NetworkObjectTemplate::Function(spec) => { - self.functions.insert(metadata, spec); - } - NetworkObjectTemplate::Node(spec) => { - self.nodes.insert(metadata.clone(), spec.clone()); - self.nodes_new.push((metadata, spec)); - } - } + fn apply(&mut self, entry: NetworkEntry) { + self.values.insert(entry.key.clone(), entry.value.clone()); + self.values_new.push(entry); } #[instrument(level = Level::INFO, skip_all)] - async fn load_templates(&mut self, spec: &NetworkConnectorSimulationSpec) -> Result<()> { - let NetworkConnectorSimulationSpec { path } = spec; - - let mut file_entries = fs::read_dir(path).await.map_err(|error| { - anyhow!( - "failed to read directory {path}: {error}", - path = path.display(), - ) - })?; - - while let Some(entry) = file_entries.next_entry().await.map_err(|error| { - anyhow!( - "failed to read directory entry {path}: {error}", - path = path.display(), - ) - })? { - let path = entry.path(); - match fs::read_to_string(&path).await { - Ok(raw) => { - info!("Loading template file: {path:?}"); - ::serde_yaml::Deserializer::from_str(&raw) - .filter_map( - move |document| match NetworkObjectCrd::deserialize(document) { - Ok(item) => Some(item), - Err(error) => { - warn!("Skipping parsing YAML template ({path:?}): {error}"); - None - } - }, - ) - .for_each(|crd| self.apply(crd)) - } - Err(error) => { - warn!("Skipping erroneous template file ({path:?}): {error}"); - } - } + async fn load_templates(&mut self, item: &NetworkConnectorItem) -> Result<()> { + let NetworkConnectorItem { + namespace, + spec: + NetworkConnectorSimulationSpec { + metadata, + path: base_dir, + key_edges, + key_nodes, + }, + } = item; + + if let Some(df) = load_csv(base_dir, key_edges).await? { + collect_edges(namespace, metadata, &df)? + .into_iter() + .for_each(|entry| self.apply(entry)) + } + if let Some(df) = load_csv(base_dir, key_nodes).await? { + collect_nodes(namespace, metadata, &df)? + .into_iter() + .for_each(|entry| self.apply(entry)) } Ok(()) } } -trait NetworkParser { - type Output; - - fn parse(&self) -> Result<::Output>; +#[derive(Clone, Debug)] +struct NetworkConnectorItem { + namespace: String, + spec: NetworkConnectorSimulationSpec, } -impl NetworkParser for NetworkConstraint { - type Output = NetworkConstraint; +async fn load_csv(base_dir: &Path, filename: &str) -> Result> { + let mut path = base_dir.to_path_buf(); + path.push(filename); + + if fs::try_exists(&path).await? { + CsvReader::from_path(&path) + .map_err( + |error| anyhow!("failed to load file {path}: {error}", path = path.display(),), + )? + .has_header(true) + .finish() + .map(Some) + .map_err(|error| { + anyhow!( + "failed to parse file {path}: {error}", + path = path.display(), + ) + }) + } else { + Ok(None) + } +} - fn parse(&self) -> Result<::Output> { - let Self { filters, r#where } = self; +fn collect_edges<'a>( + namespace: &'a str, + metadata: &NetworkGraphMetadata, + df: &'a DataFrame, +) -> Result> { + let NetworkGraphMetadata { + capacity: key_capacity, + flow: _, + function: _, + name: key_name, + sink: key_sink, + src: key_src, + supply: _, + unit_cost: key_unit_cost, + } = metadata; + + // Get columns + let name = validate_column(df, key_name)?; + let capacity = validate_column(df, key_capacity)?; + let sink = validate_column(df, key_sink)?; + let src = validate_column(df, key_src)?; + let unit_cost = validate_column(df, key_unit_cost)?; + + // Collect entries + Ok(name + .iter() + .zip(capacity.iter()) + .zip(sink.iter()) + .zip(src.iter()) + .zip(unit_cost.iter()) + .filter_map(|((((name, capacity), sink), src), unit_cost)| { + Some(NetworkEntry { + key: NetworkEntryKey::Edge(NetworkEdgeKey { + interval_ms: None, + link: NetworkNodeKey { + name: name.to_string(), + namespace: namespace.into(), + }, + sink: NetworkNodeKey { + name: sink.to_string(), + namespace: namespace.into(), + }, + src: NetworkNodeKey { + name: src.to_string(), + namespace: namespace.into(), + }, + }), + value: NetworkValue { + capacity: capacity.try_extract().ok(), + function: None, + flow: None, + supply: None, + unit_cost: unit_cost.try_extract().ok(), + }, + }) + })) +} - let filter_parser = FilterParser::default(); +fn collect_nodes<'a>( + namespace: &'a str, + metadata: &NetworkGraphMetadata, + df: &'a DataFrame, +) -> Result> { + let NetworkGraphMetadata { + capacity: key_capacity, + flow: _, + function: _, + name: key_name, + sink: _, + src: _, + supply: key_supply, + unit_cost: key_unit_cost, + } = metadata; + + // Get columns + let name = validate_column(df, key_name)?; + let capacity = validate_column(df, key_capacity)?; + let supply = validate_column(df, key_supply)?; + let unit_cost = validate_column(df, key_unit_cost)?; + + // Collect entries + Ok(name + .iter() + .zip(capacity.iter()) + .zip(supply.iter()) + .zip(unit_cost.iter()) + .filter_map(|(((name, capacity), supply), unit_cost)| { + Some(NetworkEntry { + key: NetworkEntryKey::Node(NetworkNodeKey { + name: name.to_string(), + namespace: namespace.into(), + }), + value: NetworkValue { + capacity: capacity.try_extract().ok(), + function: None, + flow: None, + supply: supply.try_extract().ok(), + unit_cost: unit_cost.try_extract().ok(), + }, + }) + })) +} - Ok(NetworkConstraint { - filters: filters - .iter() - .map(|input| { - filter_parser - .parse(input) - .map_err(|error| anyhow!("{error}")) - }) - .collect::>()?, - r#where: r#where - .iter() - .map(|input| { - filter_parser - .parse(input) - .map_err(|error| anyhow!("{error}")) - }) - .collect::>()?, - }) +fn validate_column<'a>(df: &'a DataFrame, name: &str) -> Result<&'a Series> { + let column = df.column(name)?; + if column.is_empty() { + bail!("empty column: {name}") } + Ok(column) } diff --git a/crates/kubegraph/connector/simulation/src/schema/constraint.rs b/crates/kubegraph/connector/simulation/src/schema/constraint.rs deleted file mode 100644 index e534bebc..00000000 --- a/crates/kubegraph/connector/simulation/src/schema/constraint.rs +++ /dev/null @@ -1,12 +0,0 @@ -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct NetworkConstraint { - #[serde(default)] - pub filters: Vec, - - #[serde(default)] - pub r#where: Vec, -} diff --git a/crates/kubegraph/connector/simulation/src/schema/edge.rs b/crates/kubegraph/connector/simulation/src/schema/edge.rs deleted file mode 100644 index 928929e3..00000000 --- a/crates/kubegraph/connector/simulation/src/schema/edge.rs +++ /dev/null @@ -1,6 +0,0 @@ -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct NetworkEdge {} diff --git a/crates/kubegraph/connector/simulation/src/schema/function.rs b/crates/kubegraph/connector/simulation/src/schema/function.rs deleted file mode 100644 index 35495d75..00000000 --- a/crates/kubegraph/connector/simulation/src/schema/function.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::collections::BTreeMap; - -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct NetworkFunction -where - Script: Default, -{ - #[serde(default)] - pub handlers: NetworkHandlers, -} - -impl Default for NetworkFunction -where - Script: Default, -{ - fn default() -> Self { - Self { - handlers: Default::default(), - } - } -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct NetworkHandlers -where - Script: Default, -{ - #[serde(default)] - pub fake: NetworkFakeHandler, -} - -impl Default for NetworkHandlers -where - Script: Default, -{ - fn default() -> Self { - Self { - fake: Default::default(), - } - } -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct NetworkFakeHandler -where - Script: Default, -{ - #[serde(default)] - pub data: BTreeMap>, - - #[serde(default)] - pub script: Script, -} - -impl Default for NetworkFakeHandler -where - Script: Default, -{ - fn default() -> Self { - Self { - data: Default::default(), - script: Default::default(), - } - } -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct NetworkFakeHandlerData { - #[serde(default)] - pub filters: Vec, - - #[serde(default)] - pub provides: Vec, -} diff --git a/crates/kubegraph/connector/simulation/src/schema/mod.rs b/crates/kubegraph/connector/simulation/src/schema/mod.rs deleted file mode 100644 index 46c098bf..00000000 --- a/crates/kubegraph/connector/simulation/src/schema/mod.rs +++ /dev/null @@ -1,45 +0,0 @@ -pub mod constraint; -pub mod edge; -pub mod function; -pub mod node; -pub mod value; - -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct NetworkObjectCrd { - pub api_version: String, - pub metadata: NetworkObjectMetadata, - #[serde(flatten)] - pub template: NetworkObjectTemplate, -} - -#[derive( - Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct NetworkObjectMetadata { - pub name: String, - pub namespace: String, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(tag = "type", content = "spec")] -pub enum NetworkObjectTemplate { - Constraint(#[serde(default)] self::constraint::NetworkConstraint), - // Edge(#[serde(default)] self::edge::NetworkEdge), - Function(#[serde(default)] self::function::NetworkFunction), - Node(#[serde(default)] self::node::NetworkNode), -} - -impl NetworkObjectTemplate { - pub const fn name(&self) -> &'static str { - match self { - Self::Constraint(_) => "constraint", - Self::Function(_) => "function", - Self::Node(_) => "node", - } - } -} diff --git a/crates/kubegraph/connector/simulation/src/schema/node.rs b/crates/kubegraph/connector/simulation/src/schema/node.rs deleted file mode 100644 index 6db593dd..00000000 --- a/crates/kubegraph/connector/simulation/src/schema/node.rs +++ /dev/null @@ -1,11 +0,0 @@ -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -use super::value::NetworkValues; - -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct NetworkNode { - #[serde(flatten)] - pub values: NetworkValues, -} diff --git a/crates/kubegraph/connector/simulation/src/schema/value.rs b/crates/kubegraph/connector/simulation/src/schema/value.rs deleted file mode 100644 index 7c86e906..00000000 --- a/crates/kubegraph/connector/simulation/src/schema/value.rs +++ /dev/null @@ -1,5 +0,0 @@ -use std::collections::BTreeMap; - -use kubegraph_api::graph::NetworkValue; - -pub type NetworkValues = BTreeMap; diff --git a/crates/kubegraph/function/dummy/Cargo.toml b/crates/kubegraph/function/dummy/Cargo.toml index d7ba8187..b82570e9 100644 --- a/crates/kubegraph/function/dummy/Cargo.toml +++ b/crates/kubegraph/function/dummy/Cargo.toml @@ -20,3 +20,7 @@ workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +kubegraph-api = { path = "../../api", features = ["function-dummy"] } + +anyhow = { workspace = true } +polars = { workspace = true, optional = true } diff --git a/crates/kubegraph/gateway/Cargo.toml b/crates/kubegraph/gateway/Cargo.toml index ba4b194d..b4fc2267 100644 --- a/crates/kubegraph/gateway/Cargo.toml +++ b/crates/kubegraph/gateway/Cargo.toml @@ -35,15 +35,18 @@ full = [ ] # Configure Connectors -connector-full = ["connector-prometheus", "connector-simulation"] +connector-full = [ + # "connector-prometheus", # FIXME: use this if simulation is built + "connector-simulation", +] connector-prometheus = [ "kubegraph-api/connector-prometheus", "kubegraph-connector-prometheus", - "kubegraph-connector-simulation/connector-prometheus", + "kubegraph-connector-simulation?/connector-prometheus", ] connector-simulation = [ "kubegraph-api/connector-simulation", - "kubegraph-connector-prometheus/connector-simulation", + "kubegraph-connector-prometheus?/connector-simulation", "kubegraph-connector-simulation", ] @@ -61,6 +64,10 @@ df-polars = [ "kubegraph-vm-local?/df-polars", ] +# Configure Functions +function-all = ["function-dummy"] +function-dummy = ["kubegraph-api/function-dummy", "kubegraph-function-dummy"] + # Configure Solvers solver-full = ["solver-ortools"] solver-ortools = ["kubegraph-solver-ortools"] @@ -75,11 +82,12 @@ vm-local = ["kubegraph-vm-local"] [dependencies] ark-core = { path = "../../ark/core", features = ["actix-web", "signal"] } -kubegraph-api = { path = "../api", default-features = false } +kubegraph-api = { path = "../api" } kubegraph-db-local = { path = "../db/local", optional = true, default-features = false } kubegraph-db-memory = { path = "../db/memory", optional = true, default-features = false } kubegraph-connector-prometheus = { path = "../connector/prometheus", optional = true, default-features = false } kubegraph-connector-simulation = { path = "../connector/simulation", optional = true, default-features = false } +kubegraph-function-dummy = { path = "../function/dummy", optional = true, default-features = false } kubegraph-solver-ortools = { path = "../solver/ortools", optional = true, default-features = false } kubegraph-twin-simulator = { path = "../twin/simulator", optional = true, default-features = false } kubegraph-vm-local = { path = "../vm/local", optional = true, default-features = false } diff --git a/crates/kubegraph/gateway/problems/warehouse.yaml b/crates/kubegraph/gateway/problems/warehouse.yaml index f43768b4..e621153a 100644 --- a/crates/kubegraph/gateway/problems/warehouse.yaml +++ b/crates/kubegraph/gateway/problems/warehouse.yaml @@ -7,3 +7,25 @@ metadata: spec: simulation: path: ./problems/warehouse + metadata: + supply: payload +--- +apiVersion: kubegraph.ulagbulag.io/v1alpha1 +kind: NetworkProblem +metadata: + name: warehouse + namespace: kubegraph +spec: + supply: payload +--- +apiVersion: kubegraph.ulagbulag.io/v1alpha1 +kind: NetworkFunction +metadata: + name: warehouse + namespace: kubegraph +spec: + dummy: + filter: src.payload >= 3 && sink.payload >= 0 + script: | + src.payload -= 3 + sink.payload += 3 diff --git a/crates/kubegraph/gateway/problems/warehouse/env.yaml b/crates/kubegraph/gateway/problems/warehouse/env.yaml deleted file mode 100644 index 9e78abe7..00000000 --- a/crates/kubegraph/gateway/problems/warehouse/env.yaml +++ /dev/null @@ -1,44 +0,0 @@ ---- -apiVersion: kubegraph.ulagbulag.io/v1alpha1 -type: Node -metadata: - name: warehouse-a - namespace: default -spec: - payload: 300 - warehouse: true ---- -apiVersion: kubegraph.ulagbulag.io/v1alpha1 -type: Node -metadata: - name: warehouse-b - namespace: default -spec: - payload: 0 - warehouse: true ---- -apiVersion: kubegraph.ulagbulag.io/v1alpha1 -type: Constraint # FIXME: Constraint 말고 "Problem" 정의 (supply, unit_cost, capacity, ...) -metadata: - name: optimize-warehouse - namespace: default -spec: - filters: - - payload >= 100 - - payload <= 200 - where: - - payload! - - warehouse! ---- -apiVersion: kubegraph.ulagbulag.io/v1alpha1 -type: Function -metadata: - name: delivery - namespace: default -spec: - handlers: - fake: - filter: src.payload >= 3 && sink.payload >= 0 - script: | - src.payload -= 3 - sink.payload += 3 diff --git a/crates/kubegraph/gateway/problems/warehouse/nodes.csv b/crates/kubegraph/gateway/problems/warehouse/nodes.csv new file mode 100644 index 00000000..4ac1f844 --- /dev/null +++ b/crates/kubegraph/gateway/problems/warehouse/nodes.csv @@ -0,0 +1,3 @@ +name,warehouse,unit_cost,capacity,payload +warehouse-a,true,3,300,300 +warehouse-b,true,1,300,0 diff --git a/crates/kubegraph/gateway/src/connector.rs b/crates/kubegraph/gateway/src/connector.rs index 561ceb7c..10570e39 100644 --- a/crates/kubegraph/gateway/src/connector.rs +++ b/crates/kubegraph/gateway/src/connector.rs @@ -1,42 +1,36 @@ use std::collections::BTreeMap; -use anyhow::Result; -use kubegraph_api::connector::{NetworkConnectorSourceRef, NetworkConnectorSpec}; -use tokio::join; -use tracing::error; +use futures::{stream::FuturesUnordered, StreamExt}; +use kube::ResourceExt; +use kubegraph_api::connector::{NetworkConnector, NetworkConnectorCrd, NetworkConnectorSourceRef}; use crate::db::NetworkGraphDB; pub async fn loop_forever(graph: NetworkGraphDB) { - if let Err(error) = try_loop_forever(&graph).await { - error!("failed to run connect job: {error}") - } -} - -async fn try_loop_forever(graph: &NetworkGraphDB) -> Result<()> { - use kubegraph_api::connector::NetworkConnector; - - join!( + FuturesUnordered::from_iter(vec![ #[cfg(feature = "connector-prometheus")] ::kubegraph_connector_prometheus::NetworkConnector::default().loop_forever(graph.clone()), #[cfg(feature = "connector-simulation")] ::kubegraph_connector_simulation::NetworkConnector::default().loop_forever(graph.clone()), - ); - Ok(()) + ]) + .collect() + .await } #[derive(Default)] pub(crate) struct NetworkConnectors { - db: BTreeMap<(String, String), NetworkConnectorSpec>, + db: BTreeMap<(String, String), NetworkConnectorCrd>, has_updated: BTreeMap, } impl NetworkConnectors { - pub(crate) fn insert(&mut self, namespace: String, name: String, value: NetworkConnectorSpec) { + pub(crate) fn insert(&mut self, object: NetworkConnectorCrd) { + let namespace = object.namespace().unwrap_or_else(|| "default".into()); + let name = object.name_any(); let key = connector_key(namespace, name); - let src = value.to_ref(); + let src = object.spec.to_ref(); - self.db.insert(key, value); + self.db.insert(key, object); self.has_updated .entry(src) .and_modify(|updated| *updated = true); @@ -45,14 +39,14 @@ impl NetworkConnectors { pub(crate) fn list( &mut self, src: NetworkConnectorSourceRef, - ) -> Option> { + ) -> Option> { let updated = self.has_updated.entry(src).or_insert(true); if *updated { *updated = false; Some( self.db .values() - .filter(|&spec| *spec == src) + .filter(|&cr| cr.spec == src) .cloned() .collect(), ) @@ -67,7 +61,7 @@ impl NetworkConnectors { if let Some(object) = removed_object { self.has_updated - .entry(object.to_ref()) + .entry(object.spec.to_ref()) .and_modify(|updated| *updated = true); } } diff --git a/crates/kubegraph/gateway/src/db.rs b/crates/kubegraph/gateway/src/db.rs index 5ffa9bc2..05263bf9 100644 --- a/crates/kubegraph/gateway/src/db.rs +++ b/crates/kubegraph/gateway/src/db.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; use kubegraph_api::{ - connector::{NetworkConnectorSourceRef, NetworkConnectorSpec}, + connector::{NetworkConnectorCrd, NetworkConnectorSourceRef}, graph::{NetworkEntry, NetworkEntryKeyFilter, NetworkEntryMap}, }; use tokio::sync::Mutex; @@ -45,8 +45,8 @@ impl NetworkGraphDB { #[async_trait] impl ::kubegraph_api::connector::NetworkConnectors for NetworkGraphDB { - async fn add_connector(&self, namespace: String, name: String, spec: NetworkConnectorSpec) { - self.connectors.lock().await.insert(namespace, name, spec) + async fn add_connector(&self, object: NetworkConnectorCrd) { + self.connectors.lock().await.insert(object) } async fn delete_connector(&self, namespace: String, name: String) { @@ -56,7 +56,7 @@ impl ::kubegraph_api::connector::NetworkConnectors for NetworkGraphDB { async fn get_connectors( &self, r#type: NetworkConnectorSourceRef, - ) -> Option> { + ) -> Option> { self.connectors.lock().await.list(r#type) } } diff --git a/crates/kubegraph/gateway/src/reloader.rs b/crates/kubegraph/gateway/src/reloader.rs index 9c7b3f49..fbab2442 100644 --- a/crates/kubegraph/gateway/src/reloader.rs +++ b/crates/kubegraph/gateway/src/reloader.rs @@ -70,7 +70,7 @@ async fn handle_apply( let r#type = object.spec.name(); info!("Applying {type} connector: {namespace}/{name}"); - graph.add_connector(namespace, name, object.spec).await; + graph.add_connector(object).await; Ok(()) } diff --git a/crates/kubegraph/operator/Cargo.toml b/crates/kubegraph/operator/Cargo.toml index 5466700d..297c732a 100644 --- a/crates/kubegraph/operator/Cargo.toml +++ b/crates/kubegraph/operator/Cargo.toml @@ -21,9 +21,7 @@ workspace = true [dependencies] ark-core-k8s = { path = "../../ark/core/k8s", features = ["manager"] } -kubegraph-api = { path = "../api", default-features = false, features = [ - "full", -] } +kubegraph-api = { path = "../api", features = ["full"] } anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/kubegraph/solver/ortools/src/polars.rs b/crates/kubegraph/solver/ortools/src/polars.rs index c82147c3..6b04b2cb 100644 --- a/crates/kubegraph/solver/ortools/src/polars.rs +++ b/crates/kubegraph/solver/ortools/src/polars.rs @@ -1,8 +1,8 @@ use anyhow::{anyhow, bail, Result}; use kubegraph_api::{ frame::polars::{find_indices, get_column}, - graph::Graph, - problem::{ProblemMetadata, ProblemSpec}, + graph::{Graph, NetworkGraphMetadata}, + problem::ProblemSpec, }; use or_tools::graph::{ ebert_graph::{ArcIndex, FlowQuantity, NodeIndex, StarGraph}, @@ -29,17 +29,17 @@ impl ::kubegraph_api::solver::LocalSolver> for super::Solver { fn step(&self, graph: Graph, problem: ProblemSpec) -> Result { let ProblemSpec { metadata: - ProblemMetadata { + NetworkGraphMetadata { + capacity: key_capacity, flow: key_flow, function: _, name: key_name, sink: key_sink, src: key_src, - verbose, + supply: key_supply, + unit_cost: key_unit_cost, }, - capacity: key_capacity, - supply: key_supply, - unit_cost: key_unit_cost, + verbose, } = problem; // Step 1. Collect graph data diff --git a/crates/kubegraph/solver/ortools/tests/simple_solver.rs b/crates/kubegraph/solver/ortools/tests/simple_solver.rs index 0e279b98..47f92d96 100644 --- a/crates/kubegraph/solver/ortools/tests/simple_solver.rs +++ b/crates/kubegraph/solver/ortools/tests/simple_solver.rs @@ -1,10 +1,6 @@ extern crate polars as pl; -use kubegraph_api::{ - graph::Graph, - problem::{ProblemMetadata, ProblemSpec}, - solver::LocalSolver, -}; +use kubegraph_api::{graph::Graph, problem::ProblemSpec, solver::LocalSolver}; use kubegraph_solver_ortools::Solver; use pl::{ df, @@ -37,13 +33,8 @@ fn solver_simple() { // Step 4. Define a problem let problem = ProblemSpec { - metadata: ProblemMetadata { - verbose: true, - ..Default::default() - }, - capacity: "capacity".into(), - supply: "supply".into(), - unit_cost: "unit_cost".into(), + verbose: true, + ..Default::default() }; // Step 5. Define a solver diff --git a/crates/kubegraph/twin/simulator/src/polars.rs b/crates/kubegraph/twin/simulator/src/polars.rs index 4e173cbb..65dc8cee 100644 --- a/crates/kubegraph/twin/simulator/src/polars.rs +++ b/crates/kubegraph/twin/simulator/src/polars.rs @@ -2,8 +2,8 @@ use std::ops::{Add, Sub}; use anyhow::{anyhow, Result}; use kubegraph_api::{ - graph::Graph, - problem::{ProblemMetadata, ProblemSpec}, + graph::{Graph, NetworkGraphMetadata}, + problem::ProblemSpec, }; use pl::lazy::{ dsl, @@ -16,17 +16,17 @@ impl ::kubegraph_api::twin::LocalTwin> for super::Twin { fn execute(&self, graph: Graph, problem: &ProblemSpec) -> Result { let ProblemSpec { metadata: - ProblemMetadata { + NetworkGraphMetadata { + capacity: _, flow: key_flow, function: _, src: key_src, sink: key_sink, name: key_name, - verbose: _, + supply: key_supply, + unit_cost: _, }, - capacity: _, - supply: key_supply, - unit_cost: _, + verbose: _, } = problem; // Step 1. Collect graph data diff --git a/crates/kubegraph/vm/local/src/func.rs b/crates/kubegraph/vm/local/src/func.rs index 8d154850..201fb4b0 100644 --- a/crates/kubegraph/vm/local/src/func.rs +++ b/crates/kubegraph/vm/local/src/func.rs @@ -196,8 +196,6 @@ mod tests { function_name: &str, function_template: FunctionTemplate<&'static str>, ) -> ::pl::frame::DataFrame { - use kubegraph_api::problem::ProblemMetadata; - // Step 1. Add a function let function: Function = function_template .try_into() @@ -207,12 +205,7 @@ mod tests { }; // Step 2. Define a problem - let problem = ProblemSpec { - metadata: ProblemMetadata::default(), - capacity: "capacity".into(), - supply: "supply".into(), - unit_cost: "unit_cost".into(), - }; + let problem = ProblemSpec::default(); // Step 3. Call a function function diff --git a/crates/kubegraph/vm/local/src/lib.rs b/crates/kubegraph/vm/local/src/lib.rs index 0c9184ea..5b9b40d5 100644 --- a/crates/kubegraph/vm/local/src/lib.rs +++ b/crates/kubegraph/vm/local/src/lib.rs @@ -202,7 +202,6 @@ where #[cfg(test)] mod tests { - use kubegraph_api::problem::ProblemMetadata; use kubegraph_solver_ortools::Solver; use kubegraph_twin_simulator::Twin; @@ -239,13 +238,8 @@ mod tests { // Step 4. Add cost & value function (heuristic) let problem = ProblemSpec { - metadata: ProblemMetadata { - verbose: true, - ..Default::default() - }, - capacity: "capacity".into(), - supply: "supply".into(), - unit_cost: "unit_cost".into(), + verbose: true, + ..Default::default() }; // Step 5. Do optimize @@ -328,13 +322,8 @@ mod tests { // Step 4. Add cost & value function (heuristic) let problem = ProblemSpec { - metadata: ProblemMetadata { - verbose: true, - ..Default::default() - }, - capacity: "capacity".into(), - supply: "supply".into(), - unit_cost: "unit_cost".into(), + verbose: true, + ..Default::default() }; // Step 5. Do optimize