Skip to content

Commit

Permalink
feat(kubegraph): begin implementating of trader
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Jul 8, 2024
1 parent 1133a24 commit 6e8e82b
Show file tree
Hide file tree
Showing 35 changed files with 623 additions and 89 deletions.
9 changes: 8 additions & 1 deletion crates/kubegraph/api/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::BTreeMap;
use anyhow::Result;
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, TryStreamExt};
use kube::CustomResource;
use kube::{CustomResource, CustomResourceExt};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::time::{sleep, Instant};
Expand Down Expand Up @@ -202,6 +202,13 @@ impl NetworkResource for NetworkConnectorCrd {
fn description(&self) -> String {
self.spec.name()
}

fn type_name() -> &'static str
where
Self: Sized,
{
<Self as CustomResourceExt>::crd_name()
}
}

impl NetworkConnectorSpec {
Expand Down
13 changes: 12 additions & 1 deletion crates/kubegraph/api/src/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod service;
pub mod spawn;
pub mod webhook;

use kube::CustomResource;
use kube::{CustomResource, CustomResourceExt};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -44,6 +44,17 @@ pub struct NetworkFunctionSpec {

impl NetworkResource for NetworkFunctionCrd {
type Filter = ();

fn description(&self) -> String {
<Self as NetworkResource>::type_name().into()
}

fn type_name() -> &'static str
where
Self: Sized,
{
<Self as CustomResourceExt>::crd_name()
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
Expand Down
9 changes: 8 additions & 1 deletion crates/kubegraph/api/src/graph/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "df-polars")]
pub mod polars;

use std::{collections::BTreeMap, mem::swap, sync::Arc};
use std::{collections::BTreeMap, fmt, mem::swap, sync::Arc};

use anyhow::Result;
use async_trait::async_trait;
Expand Down Expand Up @@ -1175,6 +1175,13 @@ pub struct GraphScope {
pub name: String,
}

impl fmt::Display for GraphScope {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { namespace, name } = self;
write!(f, "{namespace}/{name}")
}
}

impl GraphScope {
pub const NAME_GLOBAL: &'static str = "__global__";

Expand Down
1 change: 1 addition & 0 deletions crates/kubegraph/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod query;
pub mod resource;
pub mod runner;
pub mod solver;
pub mod trader;
pub mod visualizer;
pub mod vm;

Expand Down
2 changes: 1 addition & 1 deletion crates/kubegraph/api/src/market/product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use uuid::Uuid;

use crate::problem::ProblemSpec;

#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProductSpec {
pub problem: ProblemSpec,
Expand Down
13 changes: 12 additions & 1 deletion crates/kubegraph/api/src/problem.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use kube::CustomResource;
use kube::{CustomResource, CustomResourceExt};
use schemars::JsonSchema;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

Expand Down Expand Up @@ -85,6 +85,17 @@ where

impl NetworkResource for NetworkProblemCrd {
type Filter = ();

fn description(&self) -> String {
<Self as NetworkResource>::type_name().into()
}

fn type_name() -> &'static str
where
Self: Sized,
{
<Self as CustomResourceExt>::crd_name()
}
}

impl<M> ProblemSpec<M> {
Expand Down
19 changes: 9 additions & 10 deletions crates/kubegraph/api/src/resource.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use async_trait::async_trait;
use kube::{Client, CustomResourceExt};
use kube::Client;

use crate::{
connector::NetworkConnectorCrd, function::NetworkFunctionCrd, graph::GraphScope,
problem::NetworkProblemCrd,
};

#[async_trait]
pub trait NetworkResourceCollectionDB
pub trait NetworkResourceCollectionDB<T>
where
Self: Sync
+ NetworkResourceClient
Expand All @@ -18,7 +18,7 @@ where
}

#[async_trait]
impl<T> NetworkResourceCollectionDB for T where
impl<DB, T> NetworkResourceCollectionDB<T> for DB where
Self: Sync
+ NetworkResourceClient
+ NetworkResourceDB<NetworkConnectorCrd>
Expand All @@ -43,13 +43,12 @@ where
async fn list(&self, filter: <K as NetworkResource>::Filter) -> Option<Vec<K>>;
}

pub trait NetworkResource
where
Self: CustomResourceExt,
{
pub trait NetworkResource {
type Filter;

fn description(&self) -> String {
<Self as CustomResourceExt>::crd_name().into()
}
fn description(&self) -> String;

fn type_name() -> &'static str
where
Self: Sized;
}
28 changes: 28 additions & 0 deletions crates/kubegraph/api/src/trader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::collections::BTreeMap;

use anyhow::Result;
use async_trait::async_trait;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::{
function::NetworkFunctionCrd,
graph::{GraphData, GraphEdges, GraphMetadataPinned, GraphScope},
problem::VirtualProblem,
};

#[async_trait]
pub trait NetworkTrader<T> {
async fn is_locked(&self, problem: &VirtualProblem) -> Result<bool>;

async fn register(&self, ctx: NetworkTraderContext<T>) -> Result<()>;
}

#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NetworkTraderContext<T> {
pub functions: BTreeMap<GraphScope, NetworkFunctionCrd>,
pub graph: GraphData<T>,
pub problem: VirtualProblem<GraphMetadataPinned>,
pub static_edges: Option<GraphEdges<T>>,
}
73 changes: 55 additions & 18 deletions crates/kubegraph/api/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
resource::{NetworkResourceClient, NetworkResourceCollectionDB, NetworkResourceDB},
runner::{NetworkRunner, NetworkRunnerContext},
solver::NetworkSolver,
trader::{NetworkTrader, NetworkTraderContext},
visualizer::{NetworkVisualizer, NetworkVisualizerExt},
};

Expand Down Expand Up @@ -133,19 +134,22 @@ where
NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL_INIT
}
self::sealed::NetworkVirtualMachineState::Ready
| self::sealed::NetworkVirtualMachineState::Empty => match self.restart_policy() {
NetworkVirtualMachineRestartPolicy::Always => {
NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL
}
NetworkVirtualMachineRestartPolicy::Interval { interval } => interval,
NetworkVirtualMachineRestartPolicy::Manually => {
self.visualizer().wait_to_next().await?;
continue;
}
NetworkVirtualMachineRestartPolicy::Never => {
NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL_INIT
| self::sealed::NetworkVirtualMachineState::Empty
| self::sealed::NetworkVirtualMachineState::Trading => {
match self.restart_policy() {
NetworkVirtualMachineRestartPolicy::Always => {
NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL
}
NetworkVirtualMachineRestartPolicy::Interval { interval } => interval,
NetworkVirtualMachineRestartPolicy::Manually => {
self.visualizer().wait_to_next().await?;
continue;
}
NetworkVirtualMachineRestartPolicy::Never => {
NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL_INIT
}
}
},
}
self::sealed::NetworkVirtualMachineState::Completed => {
match self.restart_policy() {
NetworkVirtualMachineRestartPolicy::Always => {
Expand Down Expand Up @@ -188,13 +192,20 @@ where
.await
}

#[instrument(level = Level::INFO, skip(self))]
#[instrument(level = Level::INFO, skip(self, state))]
async fn step_with_custom_problem(
&self,
state: self::sealed::NetworkVirtualMachineState,
problem: VirtualProblem,
) -> Result<self::sealed::NetworkVirtualMachineState> {
// Step 1. Pull & Convert graphs
// Step 1. Check whether the problem is locked
let scope = &problem.scope;
if self.trader().is_locked(&problem).await? {
info!("The problem is locked by the market: {scope}");
return Ok(self::sealed::NetworkVirtualMachineState::Trading);
}

// Step 2. Pull & Convert graphs
let NetworkDependencyPipeline {
connectors,
functions,
Expand Down Expand Up @@ -222,10 +233,23 @@ where
None => return Ok(self::sealed::NetworkVirtualMachineState::Empty),
};

// Step 2. Solve edge flows
// Step 3. Solve edge flows
let data = self.solver().solve(data, &problem.spec).await?;

// Step 3. Apply edges to real-world (or simulator)
// Step 4. Register to the market if no feasible functions are found
if matches!(&data.edges, LazyFrame::Empty) {
info!("Registering the problem to the market: {scope}");
let ctx = NetworkTraderContext {
functions,
graph: data,
problem,
static_edges,
};
self.trader().register(ctx).await?;
return Ok(self::sealed::NetworkVirtualMachineState::Trading);
}

// Step 5. Apply edges to real-world (or simulator)
let runner_ctx = NetworkRunnerContext {
connectors,
functions,
Expand All @@ -240,7 +264,7 @@ where
};
self.runner().execute(runner_ctx).await?;

// Step 4. Visualize the outputs
// Step 6. Visualize the outputs
let graph = Graph {
connector,
data,
Expand Down Expand Up @@ -368,6 +392,7 @@ mod sealed {
Pending,
Ready,
Empty,
Trading,
#[default]
Completed,
}
Expand All @@ -385,12 +410,17 @@ where
Self: Send + Sync,
{
type DependencySolver: NetworkComponent + NetworkDependencySolver;
type ResourceDB: 'static + Send + Clone + NetworkComponent + NetworkResourceCollectionDB;
type ResourceDB: 'static
+ Send
+ Clone
+ NetworkComponent
+ NetworkResourceCollectionDB<LazyFrame>;
type GraphDB: 'static + Send + Clone + NetworkComponent + NetworkGraphDB;
type Runner: NetworkComponent
+ for<'a> NetworkRunner<<Self as NetworkVirtualMachine>::GraphDB, LazyFrame>;
type Solver: NetworkComponent
+ NetworkSolver<GraphData<LazyFrame>, Output = GraphData<LazyFrame>>;
type Trader: NetworkComponent + NetworkTrader<LazyFrame>;
type Visualizer: NetworkComponent + NetworkVisualizer;

fn dependency_solver(&self) -> &<Self as NetworkVirtualMachine>::DependencySolver;
Expand All @@ -403,6 +433,8 @@ where

fn solver(&self) -> &<Self as NetworkVirtualMachine>::Solver;

fn trader(&self) -> &<Self as NetworkVirtualMachine>::Trader;

fn visualizer(&self) -> &<Self as NetworkVirtualMachine>::Visualizer;

fn fallback_policy(&self) -> NetworkFallbackPolicy {
Expand All @@ -426,6 +458,7 @@ where
type ResourceDB = <T as NetworkVirtualMachine>::ResourceDB;
type Runner = <T as NetworkVirtualMachine>::Runner;
type Solver = <T as NetworkVirtualMachine>::Solver;
type Trader = <T as NetworkVirtualMachine>::Trader;
type Visualizer = <T as NetworkVirtualMachine>::Visualizer;

fn dependency_solver(&self) -> &<Self as NetworkVirtualMachine>::DependencySolver {
Expand All @@ -448,6 +481,10 @@ where
<T as NetworkVirtualMachine>::solver(&**self)
}

fn trader(&self) -> &<Self as NetworkVirtualMachine>::Trader {
<T as NetworkVirtualMachine>::trader(&**self)
}

fn visualizer(&self) -> &<Self as NetworkVirtualMachine>::Visualizer {
<T as NetworkVirtualMachine>::visualizer(&**self)
}
Expand Down
5 changes: 5 additions & 0 deletions crates/kubegraph/gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ full = [
"function-full",
"graph-full",
"solver-full",
"trader-full",
"vm-full",
"visualizer-full",
]
Expand Down Expand Up @@ -74,6 +75,10 @@ graph-memory = ["kubegraph-vm-local?/graph-memory"]
solver-full = ["solver-ortools"]
solver-ortools = ["kubegraph-vm-local?/solver-ortools"]

# Configure Traders
trader-full = ["trader-default"]
trader-default = ["kubegraph-vm-local?/trader-default"]

# Configure Virtual Machines
vm-full = ["vm-local"]
vm-local = ["kubegraph-vm-local"]
Expand Down
9 changes: 9 additions & 0 deletions crates/kubegraph/market/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["full"]
full = ["df-full"]

# DataFrame
df-full = ["df-polars"]
df-polars = ["kubegraph-api/df-polars"]

[dependencies]
ark-core = { path = "../../../ark/core", features = ["signal"] }
ark-core-k8s = { path = "../../../ark/core/k8s", features = ["data"] }
Expand All @@ -30,5 +38,6 @@ async-trait = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
reqwest = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true }
tracing = { workspace = true }
Loading

0 comments on commit 6e8e82b

Please sign in to comment.