From 6e8e82bc19f62ff462b0ce205026cb4d65e063ad Mon Sep 17 00:00:00 2001 From: Ho Kim Date: Mon, 8 Jul 2024 13:03:28 +0000 Subject: [PATCH] feat(kubegraph): begin implementating of trader --- crates/kubegraph/api/src/connector/mod.rs | 9 +- crates/kubegraph/api/src/function/mod.rs | 13 +- crates/kubegraph/api/src/graph/mod.rs | 9 +- crates/kubegraph/api/src/lib.rs | 1 + crates/kubegraph/api/src/market/product.rs | 2 +- crates/kubegraph/api/src/problem.rs | 13 +- crates/kubegraph/api/src/resource.rs | 19 ++- crates/kubegraph/api/src/trader.rs | 28 ++++ crates/kubegraph/api/src/vm.rs | 73 ++++++--- crates/kubegraph/gateway/Cargo.toml | 5 + crates/kubegraph/market/client/Cargo.toml | 9 ++ crates/kubegraph/market/client/src/lib.rs | 28 +++- crates/kubegraph/market/entity/src/price.rs | 13 +- crates/kubegraph/market/entity/src/product.rs | 23 ++- crates/kubegraph/market/gateway/Cargo.toml | 5 + crates/kubegraph/market/gateway/src/actix.rs | 1 + crates/kubegraph/market/gateway/src/db.rs | 47 +++++- .../market/gateway/src/routes/product.rs | 6 + crates/kubegraph/trader/Cargo.toml | 19 ++- crates/kubegraph/trader/src/actix.rs | 0 crates/kubegraph/trader/src/db.rs | 56 +++++++ crates/kubegraph/trader/src/lib.rs | 152 ++++++++++++++++-- crates/kubegraph/trader/src/session.rs | 6 + crates/kubegraph/vm/local/Cargo.toml | 10 +- crates/kubegraph/vm/local/src/args.rs | 4 + crates/kubegraph/vm/local/src/graph.rs | 2 +- crates/kubegraph/vm/local/src/lib.rs | 11 +- crates/kubegraph/vm/local/src/reloader.rs | 4 +- crates/kubegraph/vm/local/src/solver.rs | 2 +- crates/kubegraph/vm/local/src/trader.rs | 1 + crates/kubegraph/vm/local/src/visualizer.rs | 2 +- .../kubegraph-market-function-blackhole.yaml | 87 ++++++++++ templates/kubegraph/kubegraph-market.yaml | 2 + .../kubegraph/samples/warehouse-function.yaml | 25 +++ templates/kubegraph/samples/warehouse.yaml | 25 --- 35 files changed, 623 insertions(+), 89 deletions(-) create mode 100644 crates/kubegraph/api/src/trader.rs create mode 100644 crates/kubegraph/trader/src/actix.rs create mode 100644 crates/kubegraph/trader/src/db.rs create mode 100644 crates/kubegraph/trader/src/session.rs create mode 100644 crates/kubegraph/vm/local/src/trader.rs create mode 100644 templates/kubegraph/kubegraph-market-function-blackhole.yaml create mode 100644 templates/kubegraph/samples/warehouse-function.yaml diff --git a/crates/kubegraph/api/src/connector/mod.rs b/crates/kubegraph/api/src/connector/mod.rs index 9a32c0e9..9b83adbf 100644 --- a/crates/kubegraph/api/src/connector/mod.rs +++ b/crates/kubegraph/api/src/connector/mod.rs @@ -10,7 +10,7 @@ use std::collections::BTreeMap; use anyhow::Result; use async_trait::async_trait; use futures::{stream::FuturesUnordered, TryStreamExt}; -use kube::CustomResource; +use kube::{CustomResource, CustomResourceExt}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use tokio::time::{sleep, Instant}; @@ -202,6 +202,13 @@ impl NetworkResource for NetworkConnectorCrd { fn description(&self) -> String { self.spec.name() } + + fn type_name() -> &'static str + where + Self: Sized, + { + ::crd_name() + } } impl NetworkConnectorSpec { diff --git a/crates/kubegraph/api/src/function/mod.rs b/crates/kubegraph/api/src/function/mod.rs index f70b2dd3..93d0f07d 100644 --- a/crates/kubegraph/api/src/function/mod.rs +++ b/crates/kubegraph/api/src/function/mod.rs @@ -7,7 +7,7 @@ pub mod service; pub mod spawn; pub mod webhook; -use kube::CustomResource; +use kube::{CustomResource, CustomResourceExt}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -44,6 +44,17 @@ pub struct NetworkFunctionSpec { impl NetworkResource for NetworkFunctionCrd { type Filter = (); + + fn description(&self) -> String { + ::type_name().into() + } + + fn type_name() -> &'static str + where + Self: Sized, + { + ::crd_name() + } } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] diff --git a/crates/kubegraph/api/src/graph/mod.rs b/crates/kubegraph/api/src/graph/mod.rs index f15635b3..bd60033f 100644 --- a/crates/kubegraph/api/src/graph/mod.rs +++ b/crates/kubegraph/api/src/graph/mod.rs @@ -1,7 +1,7 @@ #[cfg(feature = "df-polars")] pub mod polars; -use std::{collections::BTreeMap, mem::swap, sync::Arc}; +use std::{collections::BTreeMap, fmt, mem::swap, sync::Arc}; use anyhow::Result; use async_trait::async_trait; @@ -1175,6 +1175,13 @@ pub struct GraphScope { pub name: String, } +impl fmt::Display for GraphScope { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self { namespace, name } = self; + write!(f, "{namespace}/{name}") + } +} + impl GraphScope { pub const NAME_GLOBAL: &'static str = "__global__"; diff --git a/crates/kubegraph/api/src/lib.rs b/crates/kubegraph/api/src/lib.rs index 33838f47..a6eba799 100644 --- a/crates/kubegraph/api/src/lib.rs +++ b/crates/kubegraph/api/src/lib.rs @@ -14,6 +14,7 @@ pub mod query; pub mod resource; pub mod runner; pub mod solver; +pub mod trader; pub mod visualizer; pub mod vm; diff --git a/crates/kubegraph/api/src/market/product.rs b/crates/kubegraph/api/src/market/product.rs index 8dc6cd94..d7e95de8 100644 --- a/crates/kubegraph/api/src/market/product.rs +++ b/crates/kubegraph/api/src/market/product.rs @@ -3,7 +3,7 @@ use uuid::Uuid; use crate::problem::ProblemSpec; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ProductSpec { pub problem: ProblemSpec, diff --git a/crates/kubegraph/api/src/problem.rs b/crates/kubegraph/api/src/problem.rs index c8473054..b017a5af 100644 --- a/crates/kubegraph/api/src/problem.rs +++ b/crates/kubegraph/api/src/problem.rs @@ -1,4 +1,4 @@ -use kube::CustomResource; +use kube::{CustomResource, CustomResourceExt}; use schemars::JsonSchema; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -85,6 +85,17 @@ where impl NetworkResource for NetworkProblemCrd { type Filter = (); + + fn description(&self) -> String { + ::type_name().into() + } + + fn type_name() -> &'static str + where + Self: Sized, + { + ::crd_name() + } } impl ProblemSpec { diff --git a/crates/kubegraph/api/src/resource.rs b/crates/kubegraph/api/src/resource.rs index 1cc29bac..4607d06e 100644 --- a/crates/kubegraph/api/src/resource.rs +++ b/crates/kubegraph/api/src/resource.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use kube::{Client, CustomResourceExt}; +use kube::Client; use crate::{ connector::NetworkConnectorCrd, function::NetworkFunctionCrd, graph::GraphScope, @@ -7,7 +7,7 @@ use crate::{ }; #[async_trait] -pub trait NetworkResourceCollectionDB +pub trait NetworkResourceCollectionDB where Self: Sync + NetworkResourceClient @@ -18,7 +18,7 @@ where } #[async_trait] -impl NetworkResourceCollectionDB for T where +impl NetworkResourceCollectionDB for DB where Self: Sync + NetworkResourceClient + NetworkResourceDB @@ -43,13 +43,12 @@ where async fn list(&self, filter: ::Filter) -> Option>; } -pub trait NetworkResource -where - Self: CustomResourceExt, -{ +pub trait NetworkResource { type Filter; - fn description(&self) -> String { - ::crd_name().into() - } + fn description(&self) -> String; + + fn type_name() -> &'static str + where + Self: Sized; } diff --git a/crates/kubegraph/api/src/trader.rs b/crates/kubegraph/api/src/trader.rs new file mode 100644 index 00000000..b531d946 --- /dev/null +++ b/crates/kubegraph/api/src/trader.rs @@ -0,0 +1,28 @@ +use std::collections::BTreeMap; + +use anyhow::Result; +use async_trait::async_trait; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::{ + function::NetworkFunctionCrd, + graph::{GraphData, GraphEdges, GraphMetadataPinned, GraphScope}, + problem::VirtualProblem, +}; + +#[async_trait] +pub trait NetworkTrader { + async fn is_locked(&self, problem: &VirtualProblem) -> Result; + + async fn register(&self, ctx: NetworkTraderContext) -> Result<()>; +} + +#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct NetworkTraderContext { + pub functions: BTreeMap, + pub graph: GraphData, + pub problem: VirtualProblem, + pub static_edges: Option>, +} diff --git a/crates/kubegraph/api/src/vm.rs b/crates/kubegraph/api/src/vm.rs index fec30d2b..4b0cd92a 100644 --- a/crates/kubegraph/api/src/vm.rs +++ b/crates/kubegraph/api/src/vm.rs @@ -36,6 +36,7 @@ use crate::{ resource::{NetworkResourceClient, NetworkResourceCollectionDB, NetworkResourceDB}, runner::{NetworkRunner, NetworkRunnerContext}, solver::NetworkSolver, + trader::{NetworkTrader, NetworkTraderContext}, visualizer::{NetworkVisualizer, NetworkVisualizerExt}, }; @@ -133,19 +134,22 @@ where NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL_INIT } self::sealed::NetworkVirtualMachineState::Ready - | self::sealed::NetworkVirtualMachineState::Empty => match self.restart_policy() { - NetworkVirtualMachineRestartPolicy::Always => { - NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL - } - NetworkVirtualMachineRestartPolicy::Interval { interval } => interval, - NetworkVirtualMachineRestartPolicy::Manually => { - self.visualizer().wait_to_next().await?; - continue; - } - NetworkVirtualMachineRestartPolicy::Never => { - NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL_INIT + | self::sealed::NetworkVirtualMachineState::Empty + | self::sealed::NetworkVirtualMachineState::Trading => { + match self.restart_policy() { + NetworkVirtualMachineRestartPolicy::Always => { + NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL + } + NetworkVirtualMachineRestartPolicy::Interval { interval } => interval, + NetworkVirtualMachineRestartPolicy::Manually => { + self.visualizer().wait_to_next().await?; + continue; + } + NetworkVirtualMachineRestartPolicy::Never => { + NetworkVirtualMachineRestartPolicy::DEFAULT_INTERVAL_INIT + } } - }, + } self::sealed::NetworkVirtualMachineState::Completed => { match self.restart_policy() { NetworkVirtualMachineRestartPolicy::Always => { @@ -188,13 +192,20 @@ where .await } - #[instrument(level = Level::INFO, skip(self))] + #[instrument(level = Level::INFO, skip(self, state))] async fn step_with_custom_problem( &self, state: self::sealed::NetworkVirtualMachineState, problem: VirtualProblem, ) -> Result { - // Step 1. Pull & Convert graphs + // Step 1. Check whether the problem is locked + let scope = &problem.scope; + if self.trader().is_locked(&problem).await? { + info!("The problem is locked by the market: {scope}"); + return Ok(self::sealed::NetworkVirtualMachineState::Trading); + } + + // Step 2. Pull & Convert graphs let NetworkDependencyPipeline { connectors, functions, @@ -222,10 +233,23 @@ where None => return Ok(self::sealed::NetworkVirtualMachineState::Empty), }; - // Step 2. Solve edge flows + // Step 3. Solve edge flows let data = self.solver().solve(data, &problem.spec).await?; - // Step 3. Apply edges to real-world (or simulator) + // Step 4. Register to the market if no feasible functions are found + if matches!(&data.edges, LazyFrame::Empty) { + info!("Registering the problem to the market: {scope}"); + let ctx = NetworkTraderContext { + functions, + graph: data, + problem, + static_edges, + }; + self.trader().register(ctx).await?; + return Ok(self::sealed::NetworkVirtualMachineState::Trading); + } + + // Step 5. Apply edges to real-world (or simulator) let runner_ctx = NetworkRunnerContext { connectors, functions, @@ -240,7 +264,7 @@ where }; self.runner().execute(runner_ctx).await?; - // Step 4. Visualize the outputs + // Step 6. Visualize the outputs let graph = Graph { connector, data, @@ -368,6 +392,7 @@ mod sealed { Pending, Ready, Empty, + Trading, #[default] Completed, } @@ -385,12 +410,17 @@ where Self: Send + Sync, { type DependencySolver: NetworkComponent + NetworkDependencySolver; - type ResourceDB: 'static + Send + Clone + NetworkComponent + NetworkResourceCollectionDB; + type ResourceDB: 'static + + Send + + Clone + + NetworkComponent + + NetworkResourceCollectionDB; type GraphDB: 'static + Send + Clone + NetworkComponent + NetworkGraphDB; type Runner: NetworkComponent + for<'a> NetworkRunner<::GraphDB, LazyFrame>; type Solver: NetworkComponent + NetworkSolver, Output = GraphData>; + type Trader: NetworkComponent + NetworkTrader; type Visualizer: NetworkComponent + NetworkVisualizer; fn dependency_solver(&self) -> &::DependencySolver; @@ -403,6 +433,8 @@ where fn solver(&self) -> &::Solver; + fn trader(&self) -> &::Trader; + fn visualizer(&self) -> &::Visualizer; fn fallback_policy(&self) -> NetworkFallbackPolicy { @@ -426,6 +458,7 @@ where type ResourceDB = ::ResourceDB; type Runner = ::Runner; type Solver = ::Solver; + type Trader = ::Trader; type Visualizer = ::Visualizer; fn dependency_solver(&self) -> &::DependencySolver { @@ -448,6 +481,10 @@ where ::solver(&**self) } + fn trader(&self) -> &::Trader { + ::trader(&**self) + } + fn visualizer(&self) -> &::Visualizer { ::visualizer(&**self) } diff --git a/crates/kubegraph/gateway/Cargo.toml b/crates/kubegraph/gateway/Cargo.toml index e367f87b..926f6e1d 100644 --- a/crates/kubegraph/gateway/Cargo.toml +++ b/crates/kubegraph/gateway/Cargo.toml @@ -31,6 +31,7 @@ full = [ "function-full", "graph-full", "solver-full", + "trader-full", "vm-full", "visualizer-full", ] @@ -74,6 +75,10 @@ graph-memory = ["kubegraph-vm-local?/graph-memory"] solver-full = ["solver-ortools"] solver-ortools = ["kubegraph-vm-local?/solver-ortools"] +# Configure Traders +trader-full = ["trader-default"] +trader-default = ["kubegraph-vm-local?/trader-default"] + # Configure Virtual Machines vm-full = ["vm-local"] vm-local = ["kubegraph-vm-local"] diff --git a/crates/kubegraph/market/client/Cargo.toml b/crates/kubegraph/market/client/Cargo.toml index 4422524f..aa14cf79 100644 --- a/crates/kubegraph/market/client/Cargo.toml +++ b/crates/kubegraph/market/client/Cargo.toml @@ -19,6 +19,14 @@ workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["full"] +full = ["df-full"] + +# DataFrame +df-full = ["df-polars"] +df-polars = ["kubegraph-api/df-polars"] + [dependencies] ark-core = { path = "../../../ark/core", features = ["signal"] } ark-core-k8s = { path = "../../../ark/core/k8s", features = ["data"] } @@ -30,5 +38,6 @@ async-trait = { workspace = true } clap = { workspace = true } futures = { workspace = true } reqwest = { workspace = true } +schemars = { workspace = true } serde = { workspace = true } tracing = { workspace = true } diff --git a/crates/kubegraph/market/client/src/lib.rs b/crates/kubegraph/market/client/src/lib.rs index 9df7f860..ed0614f7 100644 --- a/crates/kubegraph/market/client/src/lib.rs +++ b/crates/kubegraph/market/client/src/lib.rs @@ -18,6 +18,7 @@ use kubegraph_api::{ }, }; use reqwest::Method; +use schemars::JsonSchema; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tracing::{instrument, Level}; @@ -54,6 +55,17 @@ impl MarketClient { self.execute(request).await } + #[instrument(level = Level::INFO, skip(self, spec))] + pub async fn find_product(&self, spec: &ProductSpec) -> Result<::Id> { + let request = Request { + method: Method::POST, + rel_url: "prod", + page: None, + payload: Some(spec), + }; + self.execute(request).await + } + pub fn list_product_ids( &self, ) -> impl '_ + Stream::Id>> { @@ -76,7 +88,7 @@ impl MarketClient { self.execute(request).await } - #[instrument(level = Level::INFO, skip(self))] + #[instrument(level = Level::INFO, skip(self, spec))] pub async fn insert_product(&self, spec: &ProductSpec) -> Result<()> { let request = Request { method: Method::PUT, @@ -180,7 +192,7 @@ impl MarketClient { self.execute(request).await } - #[instrument(level = Level::INFO, skip(self))] + #[instrument(level = Level::INFO, skip(self, spec))] pub async fn insert_pub( &self, prod_id: ::Id, @@ -251,7 +263,7 @@ impl MarketClient { self.execute(request).await } - #[instrument(level = Level::INFO, skip(self))] + #[instrument(level = Level::INFO, skip(self, spec))] pub async fn insert_sub( &self, prod_id: ::Id, @@ -333,7 +345,7 @@ struct Request<'a, T> { payload: Option<&'a T>, } -#[derive(Clone, Debug, Serialize, Deserialize, Parser)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema, Parser)] #[clap(rename_all = "kebab-case")] #[serde(rename_all = "camelCase")] pub struct MarketClientArgs { @@ -348,6 +360,14 @@ pub struct MarketClientArgs { pub endpoint: Url, } +impl Default for MarketClientArgs { + fn default() -> Self { + Self { + endpoint: Self::default_endpoint(), + } + } +} + impl MarketClientArgs { const fn default_endpoint_str() -> &'static str { "http://market.kubegraph.svc" diff --git a/crates/kubegraph/market/entity/src/price.rs b/crates/kubegraph/market/entity/src/price.rs index f3e9bcef..f2934442 100644 --- a/crates/kubegraph/market/entity/src/price.rs +++ b/crates/kubegraph/market/entity/src/price.rs @@ -1,6 +1,9 @@ use anyhow::{Error, Result}; use chrono::NaiveDateTime; -use kubegraph_api::market::{product::ProductSpec, r#pub::PubSpec, sub::SubSpec, BaseModel}; +use kubegraph_api::{ + function::webhook::NetworkFunctionWebhookSpec, + market::{product::ProductSpec, r#pub::PubSpec, sub::SubSpec, BaseModel}, +}; use sea_orm::{ ActiveModelBehavior, ActiveValue, DeriveActiveEnum, DeriveEntityModel, DerivePrimaryKey, DeriveRelation, EntityTrait, EnumIter, PrimaryKeyTrait, @@ -119,7 +122,7 @@ impl ActiveModel { function, } = spec; - let spec = ::serde_json::to_value(function)?; + let spec = to_spec(function)?; Ok(Self { id: ActiveValue::Set(pub_id), @@ -142,7 +145,7 @@ impl ActiveModel { function, } = spec; - let spec = ::serde_json::to_value(function)?; + let spec = to_spec(function)?; Ok(Self { id: ActiveValue::Set(sub_id), @@ -159,6 +162,10 @@ impl ActiveModel { } } +pub fn to_spec(function: NetworkFunctionWebhookSpec) -> Result { + ::serde_json::to_value(function).map_err(Into::into) +} + #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { #[sea_orm( diff --git a/crates/kubegraph/market/entity/src/product.rs b/crates/kubegraph/market/entity/src/product.rs index b4a192fd..a3d32eb0 100644 --- a/crates/kubegraph/market/entity/src/product.rs +++ b/crates/kubegraph/market/entity/src/product.rs @@ -1,6 +1,9 @@ use anyhow::{Error, Result}; use chrono::NaiveDateTime; -use kubegraph_api::market::{product::ProductSpec, BaseModel}; +use kubegraph_api::{ + market::{product::ProductSpec, BaseModel}, + problem::ProblemSpec, +}; use sea_orm::{ ActiveModelBehavior, ActiveValue, DeriveEntityModel, DerivePrimaryKey, DeriveRelation, EntityTrait, EnumIter, PrimaryKeyTrait, @@ -47,7 +50,7 @@ impl ActiveModel { pub fn from_spec(spec: ProductSpec, id: Id) -> Result { let ProductSpec { problem } = spec; - let spec = ::serde_json::to_value(problem)?; + let spec = to_problem_spec(problem)?; Ok(Self { id: ActiveValue::Set(id), @@ -55,6 +58,22 @@ impl ActiveModel { spec: ActiveValue::Set(spec), }) } + + pub const fn from_spec_native(spec: Value, id: Id) -> Self { + Self { + id: ActiveValue::Set(id), + created_at: ActiveValue::NotSet, + spec: ActiveValue::Set(spec), + } + } +} + +pub fn to_spec(spec: ProductSpec) -> Result { + to_problem_spec(spec.problem) +} + +pub fn to_problem_spec(problem: ProblemSpec) -> Result { + ::serde_json::to_value(problem).map_err(Into::into) } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/kubegraph/market/gateway/Cargo.toml b/crates/kubegraph/market/gateway/Cargo.toml index bd299d35..6c849834 100644 --- a/crates/kubegraph/market/gateway/Cargo.toml +++ b/crates/kubegraph/market/gateway/Cargo.toml @@ -27,6 +27,7 @@ full = [ "function-full", "graph-full", "solver-full", + "trader-full", "vm-full", ] @@ -69,6 +70,10 @@ graph-memory = ["kubegraph-vm-local?/graph-memory"] solver-full = ["solver-ortools"] solver-ortools = ["kubegraph-vm-local?/solver-ortools"] +# Configure Traders +trader-full = ["trader-default"] +trader-default = ["kubegraph-vm-local?/trader-default"] + # Configure Virtual Machines vm-full = ["vm-local"] vm-local = ["kubegraph-vm-local"] diff --git a/crates/kubegraph/market/gateway/src/actix.rs b/crates/kubegraph/market/gateway/src/actix.rs index b5e2fa00..b6e75ad0 100644 --- a/crates/kubegraph/market/gateway/src/actix.rs +++ b/crates/kubegraph/market/gateway/src/actix.rs @@ -42,6 +42,7 @@ async fn try_loop_forever(db: &Database) -> Result<()> { .service(crate::routes::product::list) .service(crate::routes::product::list_price) .service(crate::routes::product::get) + .service(crate::routes::product::post) .service(crate::routes::product::post_trade) .service(crate::routes::product::put) .service(crate::routes::product::delete) diff --git a/crates/kubegraph/market/gateway/src/db.rs b/crates/kubegraph/market/gateway/src/db.rs index 64218b1b..6df94a86 100644 --- a/crates/kubegraph/market/gateway/src/db.rs +++ b/crates/kubegraph/market/gateway/src/db.rs @@ -39,7 +39,7 @@ pub struct Database { impl NetworkComponent for Database { type Args = DatabaseArgs; - #[instrument(level = Level::INFO, skip(args, signal))] + #[instrument(level = Level::INFO, skip(signal))] async fn try_new( args: ::Args, signal: &FunctionSignal, @@ -112,6 +112,51 @@ impl Database { .map_err(Into::into) } + #[instrument(level = Level::INFO, skip(self, spec))] + pub async fn find_product(&self, spec: ProductSpec) -> Result<::Id> { + self.connection + .transaction::<_, _, DbErr>(|txn| { + Box::pin(async move { + let col_id = entity::product::Column::Id; + let col_spec = entity::product::Column::Spec; + + let spec = match entity::product::to_spec(spec) { + Ok(spec) => spec, + Err(error) => return Ok(Err(error.into())), + }; + match entity::product::Entity::find() + .select_only() + .column(col_id) + .filter(col_spec.eq(spec.clone())) + .into_tuple() + .one(txn) + .await? + { + Some(item) => Ok(Ok(item)), + None => { + let prod_id = ::Id::new_v4(); + let model = + entity::product::ActiveModel::from_spec_native(spec, prod_id); + let dsl = entity::product::Entity::insert(model); + + dsl.exec_without_returning(txn).await?; + Ok(Ok(prod_id)) + } + } + }) + }) + .await + .map_err(|error| match error { + ::sea_orm::TransactionError::Connection(error) => { + anyhow!("failed to connect to DB while finding a product: {error}") + } + ::sea_orm::TransactionError::Transaction(error) => { + anyhow!("failed to execute transaction on DB while finding a product: {error}") + } + }) + .and_then(identity) + } + #[instrument(level = Level::INFO, skip(self, spec))] pub async fn insert_product( &self, diff --git a/crates/kubegraph/market/gateway/src/routes/product.rs b/crates/kubegraph/market/gateway/src/routes/product.rs index 36a35f4c..e3cc5311 100644 --- a/crates/kubegraph/market/gateway/src/routes/product.rs +++ b/crates/kubegraph/market/gateway/src/routes/product.rs @@ -35,6 +35,12 @@ pub async fn get(db: Data, path: Path<::Id>) HttpResponse::Ok().json(Result::from(db.get_product(prod_id).await)) } +#[instrument(level = Level::INFO, skip(db, spec))] +#[post("/prod")] +pub async fn post(db: Data, spec: Json) -> impl Responder { + HttpResponse::Ok().json(Result::from(db.find_product(spec.0).await)) +} + #[instrument(level = Level::INFO, skip(db))] #[post("/prod/{prod_id}/trade")] pub async fn post_trade( diff --git a/crates/kubegraph/trader/Cargo.toml b/crates/kubegraph/trader/Cargo.toml index c6244673..e3589d2d 100644 --- a/crates/kubegraph/trader/Cargo.toml +++ b/crates/kubegraph/trader/Cargo.toml @@ -19,13 +19,24 @@ workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["full"] +full = ["df-full"] + +# DataFrame +df-full = ["df-polars"] +df-polars = ["kubegraph-api/df-polars", "kubegraph-market-client/df-polars"] + [dependencies] +ark-core = { path = "../../ark/core", features = ["signal"] } kubegraph-api = { path = "../api", default-features = false } +kubegraph-market-client = { path = "../market/client", default-features = false } -actix-web = { workspace = true } -actix-web-opentelemetry = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } -futures = { workspace = true } -tokio = { workspace = true, features = ["full"] } +clap = { workspace = true } +schemars = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } +uuid = { workspace = true } diff --git a/crates/kubegraph/trader/src/actix.rs b/crates/kubegraph/trader/src/actix.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/kubegraph/trader/src/db.rs b/crates/kubegraph/trader/src/db.rs new file mode 100644 index 00000000..587fbf3c --- /dev/null +++ b/crates/kubegraph/trader/src/db.rs @@ -0,0 +1,56 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::Result; +use ark_core::signal::FunctionSignal; +use async_trait::async_trait; +use clap::Parser; +use kubegraph_api::{component::NetworkComponent, graph::GraphScope}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use tracing::{instrument, Level}; + +use crate::session::NetworkTraderSession; + +#[derive(Clone)] +pub struct NetworkTraderDB { + data: Arc>>, +} + +#[async_trait] +impl NetworkComponent for NetworkTraderDB { + type Args = NetworkTraderDBArgs; + + async fn try_new(args: ::Args, _: &FunctionSignal) -> Result { + let NetworkTraderDBArgs {} = args; + + Ok(Self { + data: Arc::default(), + }) + } +} + +impl NetworkTraderDB { + #[instrument(level = Level::INFO, skip(self, scope))] + pub(crate) async fn is_locked(&self, scope: &GraphScope) -> Result { + Ok(self.data.read().await.contains_key(scope)) + } + + #[instrument(level = Level::INFO, skip(self, session))] + pub(crate) async fn register(&self, session: NetworkTraderSession) -> Result<()> { + let scope = session.ctx.problem.scope.clone(); + self.data.write().await.insert(scope, session); + Ok(()) + } + + #[instrument(level = Level::INFO, skip(self, scope))] + pub(crate) async fn unregister(&self, scope: &GraphScope) -> Result<()> { + self.data.write().await.remove(scope); + Ok(()) + } +} + +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema, Parser)] +#[clap(rename_all = "kebab-case")] +#[serde(rename_all = "camelCase")] +pub struct NetworkTraderDBArgs {} diff --git a/crates/kubegraph/trader/src/lib.rs b/crates/kubegraph/trader/src/lib.rs index b93cf3ff..a4781dba 100644 --- a/crates/kubegraph/trader/src/lib.rs +++ b/crates/kubegraph/trader/src/lib.rs @@ -1,14 +1,148 @@ -pub fn add(left: u64, right: u64) -> u64 { - left + right +mod actix; +mod db; +mod session; + +use anyhow::Result; +use ark_core::signal::FunctionSignal; +use async_trait::async_trait; +use clap::Parser; +use kubegraph_api::{ + component::NetworkComponent, + frame::LazyFrame, + function::webhook::NetworkFunctionWebhookSpec, + market::{product::ProductSpec, sub::SubSpec, BaseModel}, + problem::VirtualProblem, + trader::NetworkTraderContext, +}; +use kubegraph_market_client::{MarketClient, MarketClientArgs}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use tracing::{info, instrument, Level}; + +#[derive(Clone)] +pub struct NetworkTrader { + client: MarketClient, + db: crate::db::NetworkTraderDB, +} + +#[async_trait] +impl NetworkComponent for NetworkTrader { + type Args = NetworkTraderArgs; + + #[instrument(level = Level::INFO, skip(signal))] + async fn try_new( + args: ::Args, + signal: &FunctionSignal, + ) -> Result { + let NetworkTraderArgs { client, db } = args; + + Ok(Self { + client: { + info!("Initializing market trader..."); + MarketClient::try_new(client, signal).await? + }, + db: crate::db::NetworkTraderDB::try_new(db, signal).await?, + }) + } } -#[cfg(test)] -mod tests { - use super::*; +#[async_trait] +impl ::kubegraph_api::trader::NetworkTrader for NetworkTrader { + #[instrument(level = Level::INFO, skip(self, problem))] + async fn is_locked(&self, problem: &VirtualProblem) -> Result { + self.db.is_locked(&problem.scope).await + } - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); + #[instrument(level = Level::INFO, skip(self, ctx))] + async fn register(&self, ctx: NetworkTraderContext) -> Result<()> { + let mut state = NetworkTraderState::default(); + match self.try_register(&mut state, ctx).await { + Ok(()) => Ok(()), + Err(error) => self + .rollback_register(state) + .await + .map_err(|error_rollback| error.context(error_rollback)), + } } } + +impl NetworkTrader { + #[instrument(level = Level::INFO, skip(self, state, ctx))] + async fn try_register( + &self, + state: &mut NetworkTraderState, + ctx: NetworkTraderContext, + ) -> Result<()> { + // Step 1. Create a problem + let prod_id = { + let spec = ProductSpec { + problem: ctx.problem.spec.clone(), + }; + self.client.find_product(&spec).await? + }; + state.prod_id.replace(prod_id); + + // Step 2. Create a webhook + let function: NetworkFunctionWebhookSpec = todo!(); + state.function.replace(function.clone()); + + // Step 3. Estimate the cost + let cost = todo!(); + + // Step 4. Create a subscriber + { + let spec = SubSpec { + cost, + count: 1, + function, + }; + self.client.insert_sub(prod_id, &spec).await? + } + + // Step 5. Store it to the DB + let session = crate::session::NetworkTraderSession { ctx }; + self.db.register(session).await + } + + #[instrument(level = Level::INFO, skip(self, state))] + async fn rollback_register(&self, state: NetworkTraderState) -> Result<()> { + let NetworkTraderState { + function, + prod_id: _, + sub_id, + } = state; + + // Step -4. Rollback creating the subscriber + todo!(); + + // Step -3. Rollback estimating the cost + // NOTE: nothing to do + + // Step -2. Rollback creating the webhook + todo!(); + + // Step -1. Rollback creating the problem + // NOTE: nothing to do + + Ok(()) + } +} + +#[derive(Default)] +struct NetworkTraderState { + function: Option, + prod_id: Option<::Id>, + sub_id: Option<::Id>, +} + +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema, Parser)] +#[clap(rename_all = "kebab-case")] +#[serde(rename_all = "camelCase")] +pub struct NetworkTraderArgs { + #[command(flatten)] + #[serde(default)] + pub client: MarketClientArgs, + + #[command(flatten)] + pub db: ::Args, +} diff --git a/crates/kubegraph/trader/src/session.rs b/crates/kubegraph/trader/src/session.rs new file mode 100644 index 00000000..3a414050 --- /dev/null +++ b/crates/kubegraph/trader/src/session.rs @@ -0,0 +1,6 @@ +use kubegraph_api::{frame::LazyFrame, trader::NetworkTraderContext}; + +#[derive(Clone)] +pub(crate) struct NetworkTraderSession { + pub(crate) ctx: NetworkTraderContext, +} diff --git a/crates/kubegraph/vm/local/Cargo.toml b/crates/kubegraph/vm/local/Cargo.toml index aeb38fe0..d19a40e3 100644 --- a/crates/kubegraph/vm/local/Cargo.toml +++ b/crates/kubegraph/vm/local/Cargo.toml @@ -27,6 +27,7 @@ full = [ "function-full", "graph-full", "solver-full", + "trader-full", "visualizer-full", ] @@ -58,6 +59,7 @@ df-polars = [ "kubegraph-dependency-solver/df-polars", "kubegraph-runner/df-polars", "kubegraph-solver-ortools?/df-polars", + "kubegraph-trader/df-polars", "kubegraph-visualizer-egui?/df-polars", ] @@ -83,12 +85,17 @@ graph-memory = ["kubegraph-graph-memory"] solver-full = ["solver-ortools"] solver-ortools = ["kubegraph-solver-ortools"] +# Configure Traders +trader-full = ["trader-default"] +trader-default = ["kubegraph-trader"] + # Configure Visualizers visualizer-auto = ["visualizer-egui"] visualizer-full = ["visualizer-egui"] visualizer-egui = ["kubegraph-visualizer-egui"] [dependencies] +ark-core = { path = "../../../ark/core", features = ["signal"] } kubegraph-api = { path = "../../api", default-features = false } kubegraph-connector-fake = { path = "../../connector/fake", optional = true, default-features = false } kubegraph-connector-local = { path = "../../connector/local", optional = true, default-features = false } @@ -98,11 +105,10 @@ kubegraph-graph-local = { path = "../../graph/local", optional = true, default-f kubegraph-graph-memory = { path = "../../graph/memory", optional = true, default-features = false } kubegraph-runner = { path = "../../runner", default-features = false } kubegraph-solver-ortools = { path = "../../solver/ortools", optional = true, default-features = false } -kubegraph-trader = { path = "../../trader", default-features = false } +kubegraph-trader = { path = "../../trader", optional = true, default-features = false } kubegraph-visualizer-egui = { path = "../../visualizer/egui", optional = true, default-features = false } anyhow = { workspace = true } -ark-core = { path = "../../../ark/core", features = ["signal"] } async-trait = { workspace = true } clap = { workspace = true } futures = { workspace = true } diff --git a/crates/kubegraph/vm/local/src/args.rs b/crates/kubegraph/vm/local/src/args.rs index 6980e21c..edfc5192 100644 --- a/crates/kubegraph/vm/local/src/args.rs +++ b/crates/kubegraph/vm/local/src/args.rs @@ -30,6 +30,10 @@ pub struct NetworkArgs { #[serde(default)] pub solver: <::Solver as NetworkComponent>::Args, + #[command(flatten)] + #[serde(default)] + pub trader: <::Trader as NetworkComponent>::Args, + #[command(flatten)] #[serde(default)] pub visualizer: <::Visualizer as NetworkComponent>::Args, diff --git a/crates/kubegraph/vm/local/src/graph.rs b/crates/kubegraph/vm/local/src/graph.rs index 28ef4215..b76be31b 100644 --- a/crates/kubegraph/vm/local/src/graph.rs +++ b/crates/kubegraph/vm/local/src/graph.rs @@ -86,7 +86,7 @@ pub enum NetworkGraphDB { impl NetworkComponent for NetworkGraphDB { type Args = NetworkGraphDBArgs; - #[instrument(level = Level::INFO)] + #[instrument(level = Level::INFO, skip(signal))] async fn try_new( args: ::Args, signal: &FunctionSignal, diff --git a/crates/kubegraph/vm/local/src/lib.rs b/crates/kubegraph/vm/local/src/lib.rs index b68b8b5d..7865086e 100644 --- a/crates/kubegraph/vm/local/src/lib.rs +++ b/crates/kubegraph/vm/local/src/lib.rs @@ -5,6 +5,7 @@ mod reloader; mod resource; mod runner; mod solver; +mod trader; mod visualizer; use std::sync::Arc; @@ -29,6 +30,7 @@ pub struct NetworkVirtualMachine { resource_worker: Arc>>, runner: self::runner::NetworkRunner, solver: self::solver::NetworkSolver, + trader: self::trader::NetworkTrader, visualizer: self::visualizer::NetworkVisualizer, vm_runner: Arc>>, } @@ -37,7 +39,7 @@ pub struct NetworkVirtualMachine { impl NetworkComponent for NetworkVirtualMachine { type Args = self::args::NetworkArgs; - #[instrument(level = Level::INFO)] + #[instrument(level = Level::INFO, skip(args, signal))] async fn try_new( args: ::Args, signal: &FunctionSignal, @@ -49,6 +51,7 @@ impl NetworkComponent for NetworkVirtualMachine { resource_db, runner, solver, + trader, visualizer, vm, } = args; @@ -64,6 +67,7 @@ impl NetworkComponent for NetworkVirtualMachine { resource_worker: Arc::new(Mutex::new(None)), runner: self::runner::NetworkRunner::try_new(runner, signal).await?, solver: self::solver::NetworkSolver::try_new(solver, signal).await?, + trader: self::trader::NetworkTrader::try_new(trader, signal).await?, visualizer: self::visualizer::NetworkVisualizer::try_new(visualizer, signal).await?, vm_runner: Arc::new(Mutex::new(None)), }; @@ -88,6 +92,7 @@ impl ::kubegraph_api::vm::NetworkVirtualMachine for NetworkVirtualMachine { type GraphDB = self::graph::NetworkGraphDB; type Runner = self::runner::NetworkRunner; type Solver = self::solver::NetworkSolver; + type Trader = self::trader::NetworkTrader; type Visualizer = self::visualizer::NetworkVisualizer; fn dependency_solver( @@ -112,6 +117,10 @@ impl ::kubegraph_api::vm::NetworkVirtualMachine for NetworkVirtualMachine { &self.solver } + fn trader(&self) -> &::Trader { + &self.trader + } + fn visualizer(&self) -> &::Visualizer { &self.visualizer } diff --git a/crates/kubegraph/vm/local/src/reloader.rs b/crates/kubegraph/vm/local/src/reloader.rs index 554e6988..9584e645 100644 --- a/crates/kubegraph/vm/local/src/reloader.rs +++ b/crates/kubegraph/vm/local/src/reloader.rs @@ -99,8 +99,8 @@ where K: 'static + Send + Clone + fmt::Debug + DeserializeOwned + Resource + NetworkResource, ::DynamicType: Default, { - let name = ::crd_name(); - info!("Starting {name} reloader..."); + let desc = ::type_name(); + info!("Starting {desc} reloader..."); let kube = resource_db.kube(); let default_namespace = kube.default_namespace().to_string(); diff --git a/crates/kubegraph/vm/local/src/solver.rs b/crates/kubegraph/vm/local/src/solver.rs index 69d06554..4e578c21 100644 --- a/crates/kubegraph/vm/local/src/solver.rs +++ b/crates/kubegraph/vm/local/src/solver.rs @@ -82,7 +82,7 @@ pub enum NetworkSolver { impl NetworkComponent for NetworkSolver { type Args = NetworkSolverArgs; - #[instrument(level = Level::INFO)] + #[instrument(level = Level::INFO, skip(signal))] async fn try_new( args: ::Args, signal: &FunctionSignal, diff --git a/crates/kubegraph/vm/local/src/trader.rs b/crates/kubegraph/vm/local/src/trader.rs new file mode 100644 index 00000000..49faf609 --- /dev/null +++ b/crates/kubegraph/vm/local/src/trader.rs @@ -0,0 +1 @@ +pub type NetworkTrader = ::kubegraph_trader::NetworkTrader; diff --git a/crates/kubegraph/vm/local/src/visualizer.rs b/crates/kubegraph/vm/local/src/visualizer.rs index aee9513c..ce9383c1 100644 --- a/crates/kubegraph/vm/local/src/visualizer.rs +++ b/crates/kubegraph/vm/local/src/visualizer.rs @@ -82,7 +82,7 @@ pub enum NetworkVisualizer { impl NetworkComponent for NetworkVisualizer { type Args = NetworkVisualizerArgs; - #[instrument(level = Level::INFO)] + #[instrument(level = Level::INFO, skip(signal))] async fn try_new( args: ::Args, signal: &FunctionSignal, diff --git a/templates/kubegraph/kubegraph-market-function-blackhole.yaml b/templates/kubegraph/kubegraph-market-function-blackhole.yaml new file mode 100644 index 00000000..f5f61b58 --- /dev/null +++ b/templates/kubegraph/kubegraph-market-function-blackhole.yaml @@ -0,0 +1,87 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: market-function-blackhole + namespace: kubegraph + labels: + name: market-function-blackhole + dashService: "true" + serviceType: internal +spec: + replicas: 1 + strategy: + rollingUpdate: + maxUnavailable: 1 + selector: + matchLabels: + name: market-function-blackhole + template: + metadata: + annotations: + instrumentation.opentelemetry.io/inject-sdk: "true" + labels: + name: market-function-blackhole + dashService: "true" + serviceType: internal + spec: + affinity: + nodeAffinity: + # KISS normal control plane nodes should be preferred + preferredDuringSchedulingIgnoredDuringExecution: + # KISS normal control plane nodes should be preferred + - weight: 1 + preference: + matchExpressions: + - key: node-role.kubernetes.io/kiss-ephemeral-control-plane + operator: DoesNotExist + # KISS gateway nodes should be more preferred + - weight: 2 + preference: + matchExpressions: + - key: node-role.kubernetes.io/kiss + operator: In + values: + - Gateway + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/kiss + operator: In + values: + - ControlPlane + - Gateway + securityContext: + seccompProfile: + type: RuntimeDefault + containers: + - name: gateway + image: quay.io/ulagbulag/openark:latest + imagePullPolicy: Always + command: + - kubegraph-market-function-blackhole + env: + - name: RUST_LOG + value: INFO + resources: + requests: + cpu: 30m + memory: 20Mi + limits: + cpu: 100m + memory: 100Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: market-function-blackhole + namespace: kubegraph +spec: + type: ClusterIP + selector: + name: market-function-blackhole + ports: + - name: http + protocol: TCP + port: 80 + targetPort: 80 diff --git a/templates/kubegraph/kubegraph-market.yaml b/templates/kubegraph/kubegraph-market.yaml index 8149563f..022082ac 100644 --- a/templates/kubegraph/kubegraph-market.yaml +++ b/templates/kubegraph/kubegraph-market.yaml @@ -61,6 +61,8 @@ spec: command: - kubegraph-market-gateway env: + - name: KUBEGRAPH_MARKET_DB_ENDPOINT + value: sqlite:///tmp/market.db?mode=rwc - name: RUST_LOG value: INFO resources: diff --git a/templates/kubegraph/samples/warehouse-function.yaml b/templates/kubegraph/samples/warehouse-function.yaml new file mode 100644 index 00000000..7ce8fbc3 --- /dev/null +++ b/templates/kubegraph/samples/warehouse-function.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: kubegraph.ulagbulag.io/v1alpha1 +kind: NetworkFunction +metadata: + name: warehouse-annotation + namespace: kubegraph +spec: + annotation: {} + filter: payload >= 0 + script: | + supply = payload; +--- +apiVersion: kubegraph.ulagbulag.io/v1alpha1 +kind: NetworkFunction +metadata: + name: warehouse + namespace: kubegraph +spec: + fake: {} + # webhook: + # endpoint: http://localhost:8888 + filter: src != sink and src.payload > 0 and src.payload > sink.payload + 1 + script: | + capacity = min(50, max(1, (src.payload - sink.payload) / 10)); + unit_cost = 1; diff --git a/templates/kubegraph/samples/warehouse.yaml b/templates/kubegraph/samples/warehouse.yaml index cc92027c..a1774a2f 100644 --- a/templates/kubegraph/samples/warehouse.yaml +++ b/templates/kubegraph/samples/warehouse.yaml @@ -73,28 +73,3 @@ spec: metadata: supply: payload verbose: true ---- -apiVersion: kubegraph.ulagbulag.io/v1alpha1 -kind: NetworkFunction -metadata: - name: warehouse-annotation - namespace: kubegraph -spec: - annotation: {} - filter: payload >= 0 - script: | - supply = payload; ---- -apiVersion: kubegraph.ulagbulag.io/v1alpha1 -kind: NetworkFunction -metadata: - name: warehouse - namespace: kubegraph -spec: - fake: {} - # webhook: - # endpoint: http://localhost:8888 - filter: src != sink and src.payload > 0 and src.payload > sink.payload + 1 - script: | - capacity = min(50, max(1, (src.payload - sink.payload) / 10)); - unit_cost = 1;