Skip to content

Commit

Permalink
fix(kubegraph): add static edges support on dependency graph solver
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Jun 6, 2024
1 parent 84259a5 commit a729a57
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 88 deletions.
2 changes: 1 addition & 1 deletion crates/kubegraph/api/src/dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub trait NetworkDependencySolver {
analyzer: &A,
problem: &VirtualProblem,
spec: NetworkDependencySolverSpec,
) -> Result<Option<NetworkDependencyPipeline<GraphData<LazyFrame>, A>>>
) -> Result<NetworkDependencyPipeline<GraphData<LazyFrame>, A>>
where
A: NetworkAnalyzer;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/kubegraph/api/src/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ pub struct FunctionMetadata {
}

impl FunctionMetadata {
pub const NAME_STATIC: &'static str = "static";
pub const NAME_STATIC: &'static str = "__static__";
}
17 changes: 5 additions & 12 deletions crates/kubegraph/api/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,17 @@ impl<T> GraphEdges<T> {
}

impl GraphEdges<LazyFrame> {
pub fn from_static(
namespace: impl Into<String>,
key: &str,
edges: LazyFrame,
) -> Result<Option<Self>> {
let function = FunctionMetadata {
pub fn mark_as_static(self, namespace: impl Into<String>, key: &str) -> Result<Self> {
let metadata = FunctionMetadata {
scope: GraphScope {
namespace: namespace.into(),
name: FunctionMetadata::NAME_STATIC.into(),
},
};

match edges {
LazyFrame::Empty => Ok(None),
mut edges => edges
.alias(key, &function)
.map(|()| Self::new(edges))
.map(Some),
match self.0 {
LazyFrame::Empty => Ok(self),
mut edges => edges.alias(key, &metadata).map(|()| Self::new(edges)),
}
}

Expand Down
12 changes: 3 additions & 9 deletions crates/kubegraph/api/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,19 @@ where
}

// Step 2. Collect all functions
// NOTE: static edges can be used instead of functions
let functions = self.resource_db().list(()).await.unwrap_or_default();
if functions.is_empty() {
return Ok(None);
}

// Step 3. Solve the dependencies
let spec = NetworkDependencySolverSpec { functions, graphs };
let NetworkDependencyPipelineTemplate {
graph: data,
problem,
static_edges,
} = match self
} = self
.dependency_solver()
.build_pipeline(self.analyzer(), &problem, spec)
.await?
{
Some(pipeline) => pipeline,
None => return Ok(None),
};
.await?;

Ok(Some(NetworkDependencyPipelineTemplate {
graph: Graph {
Expand Down
141 changes: 83 additions & 58 deletions crates/kubegraph/dependency/solver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG
_analyzer: &A,
problem: &VirtualProblem,
spec: NetworkDependencySolverSpec,
) -> Result<Option<NetworkDependencyPipeline<GraphData<LazyFrame>, A>>>
) -> Result<NetworkDependencyPipeline<GraphData<LazyFrame>, A>>
where
A: NetworkAnalyzer,
{
Expand All @@ -48,28 +48,36 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG
.map(|cr| Function::new(cr, problem))
.collect::<Result<Graph<_>>>()?;

// Step 2. Collect all pipelines per graph
let pipelines = graph.build_pipelines(problem, spec.graphs);
if pipelines.is_empty() {
return Ok(None);
// Step 2. Disaggregate the graphs
let mut static_edges = Vec::with_capacity(spec.graphs.len());
let mut static_nodes = Vec::with_capacity(spec.graphs.len());
for ::kubegraph_api::graph::Graph {
data: GraphData { edges, nodes },
metadata,
scope: _,
} in spec.graphs
{
static_edges.push(edges);
static_nodes.push((metadata, nodes));
}

// Step 3. Merge duplicated pipelines
let mut static_edges = vec![];
// Step 3. Collect all static edges
let static_edges: GraphEdges<_> = static_edges.into_iter().map(GraphEdges::new).collect();
let static_edges = static_edges
.mark_as_static(&problem.scope.namespace, problem.spec.metadata.function())?;

// Step 4. Collect all pipelines per graph
// NOTE: static edges can be used instead of pipelines
let (pipelines, static_nodes) = graph.build_pipelines(problem, static_nodes);

// Step 5. Merge duplicated pipelines
let merged_pipelines = pipelines
.into_iter()
.map(
|GraphPipeline {
graph:
::kubegraph_api::graph::Graph {
data: GraphData { edges, nodes },
metadata: _,
scope: _,
},
inner: ::kubegraph_dependency_graph::GraphPipeline { nodes: functions },
nodes,
}| {
static_edges.push(edges);

let mut nodes = Some(nodes);
let nodes = ::std::iter::from_fn(move || Some(nodes.take()));
functions
Expand All @@ -81,7 +89,7 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG
)
.merge_pipelines();

// Step 4. Build the dependency pipeline graph
// Step 6. Build the dependency pipeline graph
let mut finalized_edges = Vec::default();
let mut finalized_nodes = Vec::default();
let mut stack = BTreeMap::<_, Vec<_>>::default();
Expand Down Expand Up @@ -137,84 +145,101 @@ impl ::kubegraph_api::dependency::NetworkDependencySolver for NetworkDependencyG
finalized_edges.append(&mut nodes);
}

// Step 5. Collect all graphs
let edges: GraphEdges<_> = finalized_edges.into_iter().map(GraphEdges::new).collect();
let nodes: GraphEdges<_> = finalized_nodes.into_iter().map(GraphEdges::new).collect();
// Step 7. Collect all graphs
let edges: GraphEdges<_> = finalized_edges
.into_iter()
.map(GraphEdges::new)
.chain(Some(static_edges.clone()))
.collect();
let nodes: GraphEdges<_> = finalized_nodes
.into_iter()
.chain(static_nodes)
.map(GraphEdges::new)
.collect();
let graph = GraphData {
edges: edges.into_inner(),
nodes: nodes.into_inner(),
};

if problem.spec.verbose {
let GraphData { edges, nodes } = graph.clone().collect().await?;
println!("Edges: {edges}");
println!("Nodes: {nodes}");
println!("Edges: {edges}");
println!();
}

let static_edges = static_edges.into_iter().map(GraphEdges::new).collect();

Ok(Some(NetworkDependencyPipeline::<GraphData<LazyFrame>, A> {
Ok(NetworkDependencyPipeline::<GraphData<LazyFrame>, A> {
graph,
problem: VirtualProblem {
// TODO: to be implemented
// TODO: 여기부터 시작
// TODO: 여기부터 시작 (그냥 없앨까..? 아니면 함수 복원용으로..?)
analyzer: BTreeMap::default(),
filter: problem.filter.clone(),
scope: problem.scope.clone(),
spec: problem.spec.clone(),
},
static_edges: Some(static_edges),
}))
})
}
}

trait GraphPipelineBuilder {
fn build_pipelines(
fn build_pipelines<M>(
&self,
problem: &VirtualProblem,
graphs: Vec<::kubegraph_api::graph::Graph<LazyFrame>>,
) -> Vec<GraphPipeline<'_>>;
nodes: Vec<(M, LazyFrame)>,
) -> (Vec<GraphPipeline<'_>>, Vec<LazyFrame>)
where
M: GraphMetadataExt;
}

impl GraphPipelineBuilder for Graph<Function> {
fn build_pipelines(
fn build_pipelines<M>(
&self,
problem: &VirtualProblem,
graphs: Vec<::kubegraph_api::graph::Graph<LazyFrame>>,
) -> Vec<GraphPipeline<'_>> {
graphs
.into_iter()
.filter_map(|graph| {
let src = graph.metadata.all_node_inputs_raw();
let sink: Vec<_> = problem
.spec
.metadata
.all_node_inputs()
.iter()
.map(|&column| column.into())
.collect();

let claim = GraphPipelineClaim {
option: GraphPipelineClaimOptions {
fastest: true,
..Default::default()
},
src: &src,
sink: &sink,
};

self.build_pipeline(&claim)
.and_then(|mut pipelines| pipelines.pop())
.map(|inner| GraphPipeline { graph, inner })
})
.collect()
nodes: Vec<(M, LazyFrame)>,
) -> (Vec<GraphPipeline<'_>>, Vec<LazyFrame>)
where
M: GraphMetadataExt,
{
let mut dropped_nodes = Vec::default();
let mut pipelines = Vec::default();

for (metadata, nodes) in nodes {
let src = metadata.all_node_inputs_raw();
let sink: Vec<_> = problem
.spec
.metadata
.all_node_inputs()
.iter()
.map(|&column| column.into())
.collect();

let claim = GraphPipelineClaim {
option: GraphPipelineClaimOptions {
fastest: true,
..Default::default()
},
src: &src,
sink: &sink,
};

match self
.build_pipeline(&claim)
.and_then(|mut pipelines| pipelines.pop())
{
Some(inner) => pipelines.push(GraphPipeline { inner, nodes }),
None => dropped_nodes.push(nodes),
}
}

(pipelines, dropped_nodes)
}
}

struct GraphPipeline<'a> {
graph: ::kubegraph_api::graph::Graph<LazyFrame>,
inner: ::kubegraph_dependency_graph::GraphPipeline<'a, Function>,
nodes: LazyFrame,
}

impl<'a> fmt::Debug for GraphPipeline<'a> {
Expand Down
24 changes: 17 additions & 7 deletions crates/kubegraph/vm/local/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl NetworkVirtualMachineRunner {
mod tests {
use super::*;

#[cfg(feature = "df-polars")]
#[cfg(all(feature = "df-polars", feature = "runner-simulator"))]
#[::tokio::test]
async fn simulate_simple_with_edges() {
use kubegraph_api::{
Expand All @@ -191,11 +191,16 @@ mod tests {

use crate::{
args::NetworkArgs,
runner::{NetworkRunnerArgs, NetworkRunnerType},
visualizer::{NetworkVisualizerArgs, NetworkVisualizerType},
};

// Step 1. Define problems
let args = NetworkArgs {
runner: NetworkRunnerArgs {
runner: NetworkRunnerType::Simulator,
..Default::default()
},
visualizer: NetworkVisualizerArgs {
visualizer: NetworkVisualizerType::Disabled,
..Default::default()
Expand Down Expand Up @@ -309,17 +314,17 @@ mod tests {
assert_eq!(
output_edges,
::pl::df!(
"src" => [ "a"],
"sink" => [ "b"],
"capacity" => [ 50i64],
"unit_cost" => [ 1i64],
"function" => ["static"],
"src" => [ "a"],
"sink" => [ "b"],
"capacity" => [ 50i64],
"unit_cost" => [ 1i64],
"function" => ["__static__"],
)
.expect("failed to create ground-truth nodes dataframe"),
);
}

#[cfg(all(feature = "df-polars", feature = "function-dummy"))]
#[cfg(all(feature = "df-polars", feature = "runner-simulator"))]
#[::tokio::test]
async fn simulate_simple_with_function() {
use kube::api::ObjectMeta;
Expand All @@ -340,11 +345,16 @@ mod tests {

use crate::{
args::NetworkArgs,
runner::{NetworkRunnerArgs, NetworkRunnerType},
visualizer::{NetworkVisualizerArgs, NetworkVisualizerType},
};

// Step 1. Define problems
let args = NetworkArgs {
runner: NetworkRunnerArgs {
runner: NetworkRunnerType::Simulator,
..Default::default()
},
visualizer: NetworkVisualizerArgs {
visualizer: NetworkVisualizerType::Disabled,
..Default::default()
Expand Down

0 comments on commit a729a57

Please sign in to comment.