Skip to content

Commit

Permalink
feat(kubegraph): add dynamic graph metadata support
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed May 20, 2024
1 parent 53294bb commit 04ea2f6
Show file tree
Hide file tree
Showing 41 changed files with 1,218 additions and 532 deletions.
74 changes: 47 additions & 27 deletions crates/kubegraph/api/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,25 @@ use tracing::{error, info, instrument, Level};

use crate::{
frame::LazyFrame,
function::NetworkFunctionCrd,
graph::{Graph, GraphScope, NetworkGraphDB},
graph::{Graph, GraphMetadataRaw, GraphScope, NetworkGraphDB},
vm::NetworkVirtualMachine,
};

#[async_trait]
pub trait NetworkConnectorDB {
async fn delete_connector(&self, key: &GraphScope);

async fn delete_function(&self, key: &GraphScope);

async fn insert_connector(&self, object: NetworkConnectorCrd);

async fn insert_function(&self, object: NetworkFunctionCrd);

async fn list_connectors(
&self,
r#type: NetworkConnectorSourceType,
r#type: NetworkConnectorType,
) -> Option<Vec<NetworkConnectorCrd>>;

async fn list_functions(&self) -> Vec<NetworkFunctionCrd>;
}

#[async_trait]
pub trait NetworkConnector {
fn connection_type(&self) -> NetworkConnectorSourceType;
fn connector_type(&self) -> NetworkConnectorType;

fn name(&self) -> &str;

Expand All @@ -48,14 +41,15 @@ pub trait NetworkConnector {
where
Self: Sized,
{
let interval = vm.interval();
let name = self.name();
info!("Starting {name} connector...");

loop {
let instant = Instant::now();

if let Some(connectors) = vm
.connector_db()
.list_connectors(self.connection_type())
.resource_db()
.list_connectors(self.connector_type())
.await
{
let name = self.name();
Expand All @@ -81,8 +75,8 @@ pub trait NetworkConnector {
}
}

let elapsed = instant.elapsed();
if let Some(interval) = interval {
if let Some(interval) = vm.interval() {
let elapsed = instant.elapsed();
if elapsed < interval {
sleep(interval - elapsed).await;
}
Expand Down Expand Up @@ -115,41 +109,67 @@ pub trait NetworkConnector {
"jsonPath": ".metadata.generation"
}"#
)]
#[schemars(bound = "M: Default + JsonSchema")]
#[serde(rename_all = "camelCase")]
pub struct NetworkConnectorSpec<M = GraphMetadataRaw> {
#[serde(default)]
pub metadata: M,
#[serde(flatten)]
pub kind: NetworkConnectorKind,
}

impl<M> NetworkConnectorSpec<M> {
pub fn name(&self) -> String {
self.kind.name()
}

pub const fn to_ref(&self) -> NetworkConnectorType {
self.kind.to_ref()
}
}

impl<M> PartialEq<NetworkConnectorType> for NetworkConnectorSpec<M> {
fn eq(&self, other: &NetworkConnectorType) -> bool {
self.to_ref() == *other
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
#[non_exhaustive]
pub enum NetworkConnectorSpec {
#[serde(rename_all = "camelCase")]
pub enum NetworkConnectorKind {
#[cfg(feature = "connector-prometheus")]
Prometheus(self::prometheus::NetworkConnectorPrometheusSpec),
#[cfg(feature = "connector-simulation")]
Simulation(self::simulation::NetworkConnectorSimulationSpec),
}

impl NetworkConnectorSpec {
pub fn name(&self) -> String {
impl NetworkConnectorKind {
fn name(&self) -> String {
match self {
#[cfg(feature = "connector-prometheus")]
Self::Prometheus(spec) => format!(
"{type}/{spec}",
type = NetworkConnectorSourceType::Prometheus.name(),
type = NetworkConnectorType::Prometheus.name(),
spec = spec.name(),
),
#[cfg(feature = "connector-simulation")]
Self::Simulation(_) => NetworkConnectorSourceType::Simulation.name().into(),
Self::Simulation(_) => NetworkConnectorType::Simulation.name().into(),
}
}

pub const fn to_ref(&self) -> NetworkConnectorSourceType {
const fn to_ref(&self) -> NetworkConnectorType {
match self {
#[cfg(feature = "connector-prometheus")]
Self::Prometheus(_) => NetworkConnectorSourceType::Prometheus,
Self::Prometheus(_) => NetworkConnectorType::Prometheus,
#[cfg(feature = "connector-simulation")]
Self::Simulation(_) => NetworkConnectorSourceType::Simulation,
Self::Simulation(_) => NetworkConnectorType::Simulation,
}
}
}

impl PartialEq<NetworkConnectorSourceType> for NetworkConnectorSpec {
fn eq(&self, other: &NetworkConnectorSourceType) -> bool {
impl PartialEq<NetworkConnectorType> for NetworkConnectorKind {
fn eq(&self, other: &NetworkConnectorType) -> bool {
self.to_ref() == *other
}
}
Expand All @@ -159,14 +179,14 @@ impl PartialEq<NetworkConnectorSourceType> for NetworkConnectorSpec {
)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum NetworkConnectorSourceType {
pub enum NetworkConnectorType {
#[cfg(feature = "connector-prometheus")]
Prometheus,
#[cfg(feature = "connector-simulation")]
Simulation,
}

impl NetworkConnectorSourceType {
impl NetworkConnectorType {
pub const fn name(&self) -> &'static str {
match self {
#[cfg(feature = "connector-prometheus")]
Expand Down
6 changes: 3 additions & 3 deletions crates/kubegraph/api/src/connector/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use ark_core_k8s::data::Url;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::query::NetworkQuery;
use crate::query::{NetworkQuery, NetworkQueryMetadata};

#[derive(
Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,
)]
#[serde(rename_all = "camelCase")]
pub struct NetworkConnectorPrometheusSpec {
pub template: NetworkQuery,
pub struct NetworkConnectorPrometheusSpec<M = NetworkQueryMetadata> {
pub template: NetworkQuery<M>,
pub url: Url,
}

Expand Down
4 changes: 0 additions & 4 deletions crates/kubegraph/api/src/connector/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ use std::path::PathBuf;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::graph::GraphMetadata;

#[derive(
Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,
)]
#[serde(rename_all = "camelCase")]
pub struct NetworkConnectorSimulationSpec {
#[serde(default)]
pub metadata: GraphMetadata,
pub path: PathBuf,

#[serde(default = "NetworkConnectorSimulationSpec::default_key_edges")]
Expand Down
112 changes: 74 additions & 38 deletions crates/kubegraph/api/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ pub mod polars;

use std::ops::{Add, Div, Mul, Neg, Not, Sub};

use ::polars::datatypes::DataType;
use anyhow::{bail, Result};
#[cfg(feature = "df-polars")]
use pl::lazy::dsl;
use serde::{Deserialize, Serialize};

use crate::{
function::FunctionMetadata,
graph::{GraphDataType, GraphMetadata},
graph::{GraphDataType, GraphMetadataPinnedExt},
ops::{And, Eq, Ge, Gt, Le, Lt, Ne, Or},
problem::{r#virtual::VirtualProblem, ProblemSpec},
problem::ProblemSpec,
vm::{Feature, Number},
};

Expand All @@ -24,6 +25,38 @@ pub enum DataFrame {
Polars(::pl::frame::DataFrame),
}

impl DataFrame {
pub fn drop_null_columns(self) -> Self {
match self {
Self::Empty => Self::Empty,
#[cfg(feature = "df-polars")]
Self::Polars(df) => {
let null_columns: Vec<_> = df
.get_columns()
.iter()
.filter(|column| column.dtype() == &DataType::Null)
.map(|column| column.name().to_string())
.collect();

let df_filtered = if !null_columns.is_empty() {
df.drop_many(&null_columns)
} else {
df
};
Self::Polars(df_filtered)
}
}
}

pub fn lazy(self) -> LazyFrame {
match self {
Self::Empty => LazyFrame::Empty,
#[cfg(feature = "df-polars")]
Self::Polars(df) => LazyFrame::Polars(::pl::lazy::frame::IntoLazy::lazy(df)),
}
}
}

pub trait IntoLazyFrame
where
Self: Into<LazyFrame>,
Expand Down Expand Up @@ -59,11 +92,15 @@ impl LazyFrame {
}
}

pub fn cast(self, ty: GraphDataType, origin: &GraphMetadata, problem: &VirtualProblem) -> Self {
pub fn cast<MF, MT>(self, ty: GraphDataType, from: &MF, to: &MT) -> Self
where
MF: GraphMetadataPinnedExt,
MT: GraphMetadataPinnedExt,
{
match self {
Self::Empty => Self::Empty,
#[cfg(feature = "df-polars")]
Self::Polars(df) => Self::Polars(self::polars::cast(df, ty, origin, problem)),
Self::Polars(df) => Self::Polars(self::polars::cast(df, ty, from, to)),
}
}

Expand All @@ -87,28 +124,13 @@ impl LazyFrame {
}
}

pub fn get_column(&self, name: &str) -> Result<LazySlice> {
match self {
Self::Empty => bail!("cannot get column from empty lazyframe"),
#[cfg(feature = "df-polars")]
Self::Polars(_) => Ok(LazySlice::Polars(dsl::col(name))),
}
}

/// Create a fully-connected edges
pub fn fabric(&self, problem: &ProblemSpec) -> Result<Self> {
pub fn fabric<M>(&self, problem: &ProblemSpec<M>) -> Result<Self>
where
M: GraphMetadataPinnedExt,
{
let ProblemSpec {
metadata:
GraphMetadata {
capacity,
flow: _,
function: _,
name,
sink,
src,
supply: _,
unit_cost: _,
},
metadata,
verbose: _,
} = problem;

Expand All @@ -131,13 +153,27 @@ impl LazyFrame {
Self::Empty => bail!("cannot get fabric from empty lazyframe"),
#[cfg(feature = "df-polars")]
Self::Polars(nodes) => Ok(Self::Polars(
select_polars_edge_side(&nodes, name, src)
.cross_join(select_polars_edge_side(&nodes, name, sink))
.with_column(dsl::lit(ProblemSpec::MAX_CAPACITY).alias(capacity.as_ref())),
select_polars_edge_side(&nodes, metadata.name(), metadata.src())
.cross_join(select_polars_edge_side(
&nodes,
metadata.name(),
metadata.sink(),
))
.with_column(
dsl::lit(ProblemSpec::<M>::MAX_CAPACITY).alias(metadata.capacity()),
),
)),
}
}

pub fn get_column(&self, name: &str) -> Result<LazySlice> {
match self {
Self::Empty => bail!("cannot get column from empty lazyframe"),
#[cfg(feature = "df-polars")]
Self::Polars(_) => Ok(LazySlice::Polars(dsl::col(name))),
}
}

pub fn alias(&mut self, key: &str, metadata: &FunctionMetadata) -> Result<()> {
let FunctionMetadata { name } = metadata;

Expand All @@ -151,17 +187,6 @@ impl LazyFrame {
}
}

pub fn insert_column(&mut self, name: &str, column: LazySlice) -> Result<()> {
match (self, column) {
(Self::Empty, _) => bail!("cannot fill column into empty lazyframe: {name:?}"),
#[cfg(feature = "df-polars")]
(Self::Polars(df), LazySlice::Polars(column)) => {
*df = df.clone().with_column(column.alias(name));
Ok(())
}
}
}

pub fn apply_filter(&mut self, filter: LazySlice) -> Result<()> {
match (self, filter) {
(Self::Empty, _) => bail!("cannot apply filter into empty lazyframe"),
Expand Down Expand Up @@ -195,6 +220,17 @@ impl LazyFrame {
}
}

pub fn insert_column(&mut self, name: &str, column: LazySlice) -> Result<()> {
match (self, column) {
(Self::Empty, _) => bail!("cannot fill column into empty lazyframe: {name:?}"),
#[cfg(feature = "df-polars")]
(Self::Polars(df), LazySlice::Polars(column)) => {
*df = df.clone().with_column(column.alias(name));
Ok(())
}
}
}

#[cfg(feature = "df-polars")]
pub fn try_into_polars(self) -> Result<::pl::lazy::frame::LazyFrame> {
match self {
Expand Down
Loading

0 comments on commit 04ea2f6

Please sign in to comment.