diff --git a/crates/kubegraph/api/src/dependency.rs b/crates/kubegraph/api/src/dependency.rs index 81c90c75..46b78156 100644 --- a/crates/kubegraph/api/src/dependency.rs +++ b/crates/kubegraph/api/src/dependency.rs @@ -18,7 +18,7 @@ pub trait NetworkDependencySolver { analyzer: &A, problem: &VirtualProblem, spec: NetworkDependencySolverSpec, - ) -> Result, A>>> + ) -> Result, A>> where A: NetworkAnalyzer; } diff --git a/crates/kubegraph/api/src/function/mod.rs b/crates/kubegraph/api/src/function/mod.rs index b5fbd732..9bd36980 100644 --- a/crates/kubegraph/api/src/function/mod.rs +++ b/crates/kubegraph/api/src/function/mod.rs @@ -67,5 +67,5 @@ pub struct FunctionMetadata { } impl FunctionMetadata { - pub const NAME_STATIC: &'static str = "static"; + pub const NAME_STATIC: &'static str = "__static__"; } diff --git a/crates/kubegraph/api/src/graph/mod.rs b/crates/kubegraph/api/src/graph/mod.rs index 993cca72..6b07ab50 100644 --- a/crates/kubegraph/api/src/graph/mod.rs +++ b/crates/kubegraph/api/src/graph/mod.rs @@ -108,24 +108,17 @@ impl GraphEdges { } impl GraphEdges { - pub fn from_static( - namespace: impl Into, - key: &str, - edges: LazyFrame, - ) -> Result> { - let function = FunctionMetadata { + pub fn mark_as_static(self, namespace: impl Into, key: &str) -> Result { + let metadata = FunctionMetadata { scope: GraphScope { namespace: namespace.into(), name: FunctionMetadata::NAME_STATIC.into(), }, }; - match edges { - LazyFrame::Empty => Ok(None), - mut edges => edges - .alias(key, &function) - .map(|()| Self::new(edges)) - .map(Some), + match self.0 { + LazyFrame::Empty => Ok(self), + mut edges => edges.alias(key, &metadata).map(|()| Self::new(edges)), } } diff --git a/crates/kubegraph/api/src/vm.rs b/crates/kubegraph/api/src/vm.rs index f2fa022b..89364290 100644 --- a/crates/kubegraph/api/src/vm.rs +++ b/crates/kubegraph/api/src/vm.rs @@ -268,10 +268,8 @@ where } // Step 2. Collect all functions + // NOTE: static edges can be used instead of functions let functions = self.resource_db().list(()).await.unwrap_or_default(); - if functions.is_empty() { - return Ok(None); - } // Step 3. Solve the dependencies let spec = NetworkDependencySolverSpec { functions, graphs }; @@ -279,14 +277,10 @@ where graph: data, problem, static_edges, - } = match self + } = self .dependency_solver() .build_pipeline(self.analyzer(), &problem, spec) - .await? - { - Some(pipeline) => pipeline, - None => return Ok(None), - }; + .await?; Ok(Some(NetworkDependencyPipelineTemplate { graph: Graph { diff --git a/crates/kubegraph/dependency/solver/src/lib.rs b/crates/kubegraph/dependency/solver/src/lib.rs index 0d927c98..31262e1f 100644 --- a/crates/kubegraph/dependency/solver/src/lib.rs +++ b/crates/kubegraph/dependency/solver/src/lib.rs @@ -37,7 +37,7 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG _analyzer: &A, problem: &VirtualProblem, spec: NetworkDependencySolverSpec, - ) -> Result, A>>> + ) -> Result, A>> where A: NetworkAnalyzer, { @@ -48,28 +48,36 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG .map(|cr| Function::new(cr, problem)) .collect::>>()?; - // Step 2. Collect all pipelines per graph - let pipelines = graph.build_pipelines(problem, spec.graphs); - if pipelines.is_empty() { - return Ok(None); + // Step 2. Disaggregate the graphs + let mut static_edges = Vec::with_capacity(spec.graphs.len()); + let mut static_nodes = Vec::with_capacity(spec.graphs.len()); + for ::kubegraph_api::graph::Graph { + data: GraphData { edges, nodes }, + metadata, + scope: _, + } in spec.graphs + { + static_edges.push(edges); + static_nodes.push((metadata, nodes)); } - // Step 3. Merge duplicated pipelines - let mut static_edges = vec![]; + // Step 3. Collect all static edges + let static_edges: GraphEdges<_> = static_edges.into_iter().map(GraphEdges::new).collect(); + let static_edges = static_edges + .mark_as_static(&problem.scope.namespace, problem.spec.metadata.function())?; + + // Step 4. Collect all pipelines per graph + // NOTE: static edges can be used instead of pipelines + let (pipelines, static_nodes) = graph.build_pipelines(problem, static_nodes); + + // Step 5. Merge duplicated pipelines let merged_pipelines = pipelines .into_iter() .map( |GraphPipeline { - graph: - ::kubegraph_api::graph::Graph { - data: GraphData { edges, nodes }, - metadata: _, - scope: _, - }, inner: ::kubegraph_dependency_graph::GraphPipeline { nodes: functions }, + nodes, }| { - static_edges.push(edges); - let mut nodes = Some(nodes); let nodes = ::std::iter::from_fn(move || Some(nodes.take())); functions @@ -81,7 +89,7 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG ) .merge_pipelines(); - // Step 4. Build the dependency pipeline graph + // Step 6. Build the dependency pipeline graph let mut finalized_edges = Vec::default(); let mut finalized_nodes = Vec::default(); let mut stack = BTreeMap::<_, Vec<_>>::default(); @@ -137,9 +145,17 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG finalized_edges.append(&mut nodes); } - // Step 5. Collect all graphs - let edges: GraphEdges<_> = finalized_edges.into_iter().map(GraphEdges::new).collect(); - let nodes: GraphEdges<_> = finalized_nodes.into_iter().map(GraphEdges::new).collect(); + // Step 7. Collect all graphs + let edges: GraphEdges<_> = finalized_edges + .into_iter() + .map(GraphEdges::new) + .chain(Some(static_edges.clone())) + .collect(); + let nodes: GraphEdges<_> = finalized_nodes + .into_iter() + .chain(static_nodes) + .map(GraphEdges::new) + .collect(); let graph = GraphData { edges: edges.into_inner(), nodes: nodes.into_inner(), @@ -147,74 +163,83 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG if problem.spec.verbose { let GraphData { edges, nodes } = graph.clone().collect().await?; - println!("Edges: {edges}"); println!("Nodes: {nodes}"); + println!("Edges: {edges}"); println!(); } - let static_edges = static_edges.into_iter().map(GraphEdges::new).collect(); - - Ok(Some(NetworkDependencyPipeline::, A> { + Ok(NetworkDependencyPipeline::, A> { graph, problem: VirtualProblem { // TODO: to be implemented - // TODO: 여기부터 시작 + // TODO: 여기부터 시작 (그냥 없앨까..? 아니면 함수 복원용으로..?) analyzer: BTreeMap::default(), filter: problem.filter.clone(), scope: problem.scope.clone(), spec: problem.spec.clone(), }, static_edges: Some(static_edges), - })) + }) } } trait GraphPipelineBuilder { - fn build_pipelines( + fn build_pipelines( &self, problem: &VirtualProblem, - graphs: Vec<::kubegraph_api::graph::Graph>, - ) -> Vec>; + nodes: Vec<(M, LazyFrame)>, + ) -> (Vec>, Vec) + where + M: GraphMetadataExt; } impl GraphPipelineBuilder for Graph { - fn build_pipelines( + fn build_pipelines( &self, problem: &VirtualProblem, - graphs: Vec<::kubegraph_api::graph::Graph>, - ) -> Vec> { - graphs - .into_iter() - .filter_map(|graph| { - let src = graph.metadata.all_node_inputs_raw(); - let sink: Vec<_> = problem - .spec - .metadata - .all_node_inputs() - .iter() - .map(|&column| column.into()) - .collect(); - - let claim = GraphPipelineClaim { - option: GraphPipelineClaimOptions { - fastest: true, - ..Default::default() - }, - src: &src, - sink: &sink, - }; - - self.build_pipeline(&claim) - .and_then(|mut pipelines| pipelines.pop()) - .map(|inner| GraphPipeline { graph, inner }) - }) - .collect() + nodes: Vec<(M, LazyFrame)>, + ) -> (Vec>, Vec) + where + M: GraphMetadataExt, + { + let mut dropped_nodes = Vec::default(); + let mut pipelines = Vec::default(); + + for (metadata, nodes) in nodes { + let src = metadata.all_node_inputs_raw(); + let sink: Vec<_> = problem + .spec + .metadata + .all_node_inputs() + .iter() + .map(|&column| column.into()) + .collect(); + + let claim = GraphPipelineClaim { + option: GraphPipelineClaimOptions { + fastest: true, + ..Default::default() + }, + src: &src, + sink: &sink, + }; + + match self + .build_pipeline(&claim) + .and_then(|mut pipelines| pipelines.pop()) + { + Some(inner) => pipelines.push(GraphPipeline { inner, nodes }), + None => dropped_nodes.push(nodes), + } + } + + (pipelines, dropped_nodes) } } struct GraphPipeline<'a> { - graph: ::kubegraph_api::graph::Graph, inner: ::kubegraph_dependency_graph::GraphPipeline<'a, Function>, + nodes: LazyFrame, } impl<'a> fmt::Debug for GraphPipeline<'a> { diff --git a/crates/kubegraph/vm/local/src/lib.rs b/crates/kubegraph/vm/local/src/lib.rs index 46ff2e82..745f9c7a 100644 --- a/crates/kubegraph/vm/local/src/lib.rs +++ b/crates/kubegraph/vm/local/src/lib.rs @@ -177,7 +177,7 @@ impl NetworkVirtualMachineRunner { mod tests { use super::*; - #[cfg(feature = "df-polars")] + #[cfg(all(feature = "df-polars", feature = "runner-simulator"))] #[::tokio::test] async fn simulate_simple_with_edges() { use kubegraph_api::{ @@ -191,11 +191,16 @@ mod tests { use crate::{ args::NetworkArgs, + runner::{NetworkRunnerArgs, NetworkRunnerType}, visualizer::{NetworkVisualizerArgs, NetworkVisualizerType}, }; // Step 1. Define problems let args = NetworkArgs { + runner: NetworkRunnerArgs { + runner: NetworkRunnerType::Simulator, + ..Default::default() + }, visualizer: NetworkVisualizerArgs { visualizer: NetworkVisualizerType::Disabled, ..Default::default() @@ -309,17 +314,17 @@ mod tests { assert_eq!( output_edges, ::pl::df!( - "src" => [ "a"], - "sink" => [ "b"], - "capacity" => [ 50i64], - "unit_cost" => [ 1i64], - "function" => ["static"], + "src" => [ "a"], + "sink" => [ "b"], + "capacity" => [ 50i64], + "unit_cost" => [ 1i64], + "function" => ["__static__"], ) .expect("failed to create ground-truth nodes dataframe"), ); } - #[cfg(all(feature = "df-polars", feature = "function-dummy"))] + #[cfg(all(feature = "df-polars", feature = "runner-simulator"))] #[::tokio::test] async fn simulate_simple_with_function() { use kube::api::ObjectMeta; @@ -340,11 +345,16 @@ mod tests { use crate::{ args::NetworkArgs, + runner::{NetworkRunnerArgs, NetworkRunnerType}, visualizer::{NetworkVisualizerArgs, NetworkVisualizerType}, }; // Step 1. Define problems let args = NetworkArgs { + runner: NetworkRunnerArgs { + runner: NetworkRunnerType::Simulator, + ..Default::default() + }, visualizer: NetworkVisualizerArgs { visualizer: NetworkVisualizerType::Disabled, ..Default::default()