diff --git a/crates/compute_unit_runner/src/bin/main.rs b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs similarity index 98% rename from crates/compute_unit_runner/src/bin/main.rs rename to crates/compute_unit_runner/src/bin/compute_unit_runner.rs index a965cad..67e11e3 100644 --- a/crates/compute_unit_runner/src/bin/main.rs +++ b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs @@ -22,7 +22,7 @@ use unit::UnitDataStream; name = "compute_unit_runner", version = "0.0.1", author = "Author Name ", - about = "embed in k8s images" + about = "embed in k8s images. work for process data input and output" )] struct Args { #[arg(short, long, default_value = "INFO")] diff --git a/crates/dp_runner/src/main.rs b/crates/dp_runner/src/main.rs index 70738a4..8168e4c 100644 --- a/crates/dp_runner/src/main.rs +++ b/crates/dp_runner/src/main.rs @@ -24,7 +24,7 @@ use unit::UnitDataStream; name = "dp_runner", version = "0.0.1", author = "Author Name ", - about = "embed in k8s images" + about = "embed in k8s images. make process data input output" )] struct Args { #[arg(short, long, default_value = "INFO")] diff --git a/makefile b/makefile index 3033644..bbe6c56 100644 --- a/makefile +++ b/makefile @@ -5,7 +5,7 @@ $(OUTPUT): ################### build crates build-cd: $(OUTPUT) - cargo build -p compute_unit_runner --release + cargo build -p compute_unit_runner --release --bin compute_unit_runner cp target/release/compute_unit_runner $(OUTPUT)/compute_unit_runner build-dp: $(OUTPUT) diff --git a/nodes/jz_reader/src/main.rs b/nodes/jz_reader/src/main.rs index 4267505..831f830 100644 --- a/nodes/jz_reader/src/main.rs +++ b/nodes/jz_reader/src/main.rs @@ -18,7 +18,7 @@ use tracing::{info, Level}; name = "jz_reader", version = "0.0.1", author = "Author Name ", - about = "embed in k8s images" + about = "embed in k8s images. " )] struct Args { diff --git a/src/core/models.rs b/src/core/models.rs index a50ce45..fe74b4c 100644 --- a/src/core/models.rs +++ b/src/core/models.rs @@ -43,3 +43,9 @@ pub trait NodeRepo { async fn insert_node(&self, state: Node) -> Result<()>; async fn get_node_by_name(&self, name: &str) -> Result; } + +pub trait DbRepo = GraphRepo + NodeRepo + Send + Sync + 'static; + +pub trait DBConfig { + fn connection_string(&self) -> &str; +} diff --git a/src/dag/dag.rs b/src/dag/dag.rs index 08c53f0..61c1687 100644 --- a/src/dag/dag.rs +++ b/src/dag/dag.rs @@ -5,7 +5,7 @@ use anyhow::{anyhow, Ok, Result}; use std::collections::HashMap; pub struct Dag { - name: String, + pub name: String, nodes: HashMap, /// Store dependency relations. rely_graph: Graph, diff --git a/src/dbrepo/mongo.rs b/src/dbrepo/mongo.rs index 891fa10..19531c7 100644 --- a/src/dbrepo/mongo.rs +++ b/src/dbrepo/mongo.rs @@ -1,23 +1,38 @@ use crate::{ - core::models::{Graph, GraphRepo, Node, NodeRepo}, + core::models::{DBConfig, Graph, GraphRepo, Node, NodeRepo}, utils::StdIntoAnyhowResult, }; use anyhow::{anyhow, Result}; use mongodb::{bson::doc, options::IndexOptions, Client, Collection, IndexModel}; -use std::{ops::Deref, sync::Arc}; +use serde::Serialize; +use std::{marker::PhantomData, ops::Deref, sync::Arc}; use tracing::error; const GRAPH_COL_NAME: &'static str = "graph"; const NODE_COL_NAME: &'static str = "node"; +#[derive(Clone)] pub struct MongoRepo { graph_col: Collection, node_col: Collection, } +#[derive(Clone, Serialize)] +pub struct MongoConfig { + pub mongo_url: String, +} + +impl DBConfig for MongoConfig { + fn connection_string(&self) -> &str { + return &self.mongo_url; + } +} + impl MongoRepo { - pub async fn new(mongo_url: &str, db_name: &str) -> Result { - let client = Client::with_uri_str(mongo_url).await?; + pub async fn new(config: DBC, db_name: &str) -> Result + where DBC: DBConfig + { + let client = Client::with_uri_str(config.connection_string()).await?; let database = client.database(db_name); let graph_col: Collection = database.collection(&GRAPH_COL_NAME); let node_col: Collection = database.collection(&NODE_COL_NAME); @@ -41,7 +56,8 @@ impl MongoRepo { } } -impl GraphRepo for MongoRepo { +impl GraphRepo for MongoRepo +{ async fn insert_global_state(&self, state: Graph) -> Result<()> { self.graph_col.insert_one(state).await.map(|_| ()).anyhow() } @@ -55,7 +71,8 @@ impl GraphRepo for MongoRepo { } } -impl NodeRepo for MongoRepo { +impl NodeRepo for MongoRepo +{ async fn insert_node(&self, state: Node) -> Result<()> { self.node_col.insert_one(state).await.map(|_| ()).anyhow() } diff --git a/src/driver/kube.rs b/src/driver/kube.rs index 0ff39cc..aae0f16 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -1,5 +1,8 @@ use super::{ChannelHandler, Driver, PipelineController, UnitHandler}; +use crate::core::models::DBConfig; +use crate::core::{models::DbRepo, ComputeUnit}; use crate::dag::Dag; +use crate::dbrepo::mongo::{MongoConfig, MongoRepo}; use crate::utils::IntoAnyhowResult; use anyhow::{anyhow, Result}; use handlebars::Handlebars; @@ -10,27 +13,40 @@ use kube::{Api, Client}; use serde::Serialize; use std::collections::HashMap; use std::default::Default; +use std::marker::PhantomData; +use std::ptr::NonNull; use std::sync::{Arc, Mutex}; use tokio_retry::strategy::ExponentialBackoff; use tokio_retry::Retry; use tracing::debug; -pub struct KubeChannelHander { +pub struct KubeChannelHander +where + R: DbRepo, +{ pub deployment: Deployment, pub claim: PersistentVolumeClaim, pub service: Service, + pub db_repo: R, } -impl Default for KubeChannelHander { - fn default() -> Self { +impl KubeChannelHander +where + R: DbRepo, +{ + fn new(db_repo: R) -> Self { Self { deployment: Default::default(), claim: PersistentVolumeClaim::default(), service: Default::default(), + db_repo:db_repo } } } -impl ChannelHandler for KubeChannelHander { +impl ChannelHandler for KubeChannelHander +where + R: DbRepo, +{ async fn pause(&mut self) -> Result<()> { todo!() } @@ -44,25 +60,36 @@ impl ChannelHandler for KubeChannelHander { } } -pub struct KubeHandler { +pub struct KubeHandler +where + R: DbRepo, +{ pub deployment: Deployment, pub claim: PersistentVolumeClaim, pub service: Service, - pub channel: Option, + pub db_repo: R, + pub channel: Option>, } -impl Default for KubeHandler { - fn default() -> Self { +impl KubeHandler +where + R: DbRepo, +{ + fn new(repo: R) -> Self { Self { deployment: Default::default(), claim: PersistentVolumeClaim::default(), service: Default::default(), channel: None, + db_repo: repo, } } } -impl UnitHandler for KubeHandler { +impl UnitHandler for KubeHandler +where + R: DbRepo, +{ async fn pause(&mut self) -> Result<()> { todo!() } @@ -76,62 +103,61 @@ impl UnitHandler for KubeHandler { } #[allow(refining_impl_trait)] - async fn channel_handler(&self) -> Result>>> { + async fn channel_handler(&self) -> Result>>>> { todo!() } } -pub struct KubePipelineController { - handlers: HashMap, +pub struct KubePipelineController< R> +where + R: DbRepo, +{ + pub db_repo: R, + handlers: HashMap>, } -impl<'a> Default for KubePipelineController { - fn default() -> Self { +impl KubePipelineController< R> +where + R: DbRepo, +{ + fn new(repo: R) -> Self { Self { + db_repo: repo, handlers: Default::default(), } } } -impl PipelineController for KubePipelineController { - async fn get_node<'a>(&'a self, id: &'a String) -> Result<&'a impl UnitHandler> { +impl PipelineController for KubePipelineController< R> +where + R: DbRepo, +{ + async fn get_node<'b>(&'b self, id: &'b String) -> Result<&'b impl UnitHandler> { self.handlers.get(id).anyhow("id not found") } - async fn get_node_mut<'a>(&'a mut self, id: &'a String) -> Result<&'a mut impl UnitHandler> { + async fn get_node_mut<'b>(&'b mut self, id: &'b String) -> Result<&'b mut impl UnitHandler> { self.handlers.get_mut(id).anyhow("id not found") } } -pub struct KubeDriver<'reg> { +pub struct KubeDriver<'reg, R, DBC> +where + R: DbRepo, + DBC: Clone + Serialize + Send + Sync + DBConfig+'static, +{ reg: Handlebars<'reg>, client: Client, + db_config: DBC, + _phantomData: PhantomData } -impl<'reg> KubeDriver<'reg> { - pub async fn default() -> Result> { - let mut reg = Handlebars::new(); - reg.register_template_string("claim", include_str!("kubetpl/claim.tpl"))?; - - reg.register_template_string("deployment", include_str!("kubetpl/deployment.tpl"))?; - reg.register_template_string("service", include_str!("kubetpl/service.tpl"))?; - reg.register_template_string( - "channel_deployment", - include_str!("kubetpl/channel_deployment.tpl"), - )?; - reg.register_template_string( - "channel_service", - include_str!("kubetpl/channel_service.tpl"), - )?; - - let client = Client::try_default().await?; - Ok(KubeDriver { - reg: reg, - client: client, - }) - } - - pub async fn from_k8s_client(client: Client) -> Result> { +impl<'reg, R, DBC> KubeDriver<'reg, R, DBC> +where + R: DbRepo, + DBC: Clone + Serialize + Send + Sync + DBConfig, +{ + pub async fn new(client: Client, db_config: DBC) -> Result> { let mut reg = Handlebars::new(); reg.register_template_string("claim", include_str!("kubetpl/claim.tpl"))?; @@ -146,8 +172,10 @@ impl<'reg> KubeDriver<'reg> { include_str!("kubetpl/channel_service.tpl"), )?; Ok(KubeDriver { - reg: reg, - client: client, + reg, + client, + db_config, + _phantomData: PhantomData, }) } @@ -196,16 +224,33 @@ struct ClaimRenderParams { name: String, } -impl Driver for KubeDriver<'_> { +#[derive(Serialize)] +struct DataUnitDeploymentRenderParams<'a, DBC> +where + DBC: Sized + Serialize + Send + Sync + DBConfig +'static, +{ + node: &'a ComputeUnit, + log_level: &'a str, + db: DBC, +} + +impl Driver for KubeDriver<'_, R, DBC> +where + R: DbRepo, + DBC: Clone + Serialize + Send + Sync + DBConfig+'static, +{ #[allow(refining_impl_trait)] - async fn deploy(&self, ns: &str, graph: &Dag) -> Result { - Self::ensure_namespace_exit_and_clean(&self.client, ns).await?; + async fn deploy(&self, run_id: &str, graph: &Dag) -> Result> + { + Self::ensure_namespace_exit_and_clean(&self.client, run_id).await?; - let deployment_api: Api = Api::namespaced(self.client.clone(), ns); - let claim_api: Api = Api::namespaced(self.client.clone(), ns); - let service_api: Api = Api::namespaced(self.client.clone(), ns); + let repo = MongoRepo::new(self.db_config.clone(), run_id).await?; + let deployment_api: Api = Api::namespaced(self.client.clone(), run_id); + let claim_api: Api = Api::namespaced(self.client.clone(), run_id); + let service_api: Api = Api::namespaced(self.client.clone(), run_id); - let mut pipeline_ctl = KubePipelineController::default(); + //db = name + hash + retry_number + let mut pipeline_ctl= KubePipelineController::new(repo.clone()); for node in graph.iter() { let claim_string = self.reg.render( "claim", @@ -217,7 +262,13 @@ impl Driver for KubeDriver<'_> { let claim: PersistentVolumeClaim = serde_json::from_str(&claim_string)?; let claim_deployment = claim_api.create(&PostParams::default(), &claim).await?; - let deployment_string = self.reg.render("deployment", node)?; + let data_unit_render_args = DataUnitDeploymentRenderParams { + node, + db: self.db_config.clone(), + log_level: "debug", + }; + + let deployment_string = self.reg.render("deployment", &data_unit_render_args)?; debug!("rendered unit clam string {}", deployment_string); let unit_deployment: Deployment = serde_json::from_str(&deployment_string)?; @@ -244,7 +295,9 @@ impl Driver for KubeDriver<'_> { let claim: PersistentVolumeClaim = serde_json::from_str(&claim_string)?; let claim_deployment = claim_api.create(&PostParams::default(), &claim).await?; - let channel_deployment_string = self.reg.render("channel_deployment", node)?; + let channel_deployment_string = self + .reg + .render("channel_deployment", &data_unit_render_args)?; debug!( "rendered channel deployment string {}", channel_deployment_string @@ -267,6 +320,7 @@ impl Driver for KubeDriver<'_> { claim: claim_deployment, deployment: channel_deployment, service: channel_service, + db_repo: repo.clone(), }) } else { None @@ -277,6 +331,7 @@ impl Driver for KubeDriver<'_> { deployment: unit_deployment, service: unit_service, channel: channel_handler, + db_repo: repo.clone(), }; pipeline_ctl.handlers.insert(node.name.clone(), handler); @@ -285,7 +340,7 @@ impl Driver for KubeDriver<'_> { } #[allow(refining_impl_trait)] - async fn attach(&self, _namespace: &str, _graph: &Dag) -> Result { + async fn attach(&self, _namespace: &str, _graph: &Dag) -> Result> { todo!() } @@ -308,6 +363,8 @@ mod tests { use std::env; use super::*; + use crate::dbrepo::mongo::{MongoConfig, MongoRepo}; + use kube::client; use tracing_subscriber; #[tokio::test] @@ -361,7 +418,13 @@ mod tests { } "#; let dag = Dag::from_json(json_str).unwrap(); - let kube_driver = KubeDriver::default().await.unwrap(); + + let mongo_cfg = MongoConfig { + mongo_url: "".to_string(), + }; + let client = Client::try_default().await.unwrap(); + let mut kube_driver = KubeDriver::new(client, mongo_cfg).await.unwrap(); + kube_driver.deploy("ntest", &dag).await.unwrap(); // kube_driver.clean("ntest").await.unwrap(); } diff --git a/src/driver/kubetpl/channel_deployment.tpl b/src/driver/kubetpl/channel_deployment.tpl index ac73963..1a21593 100644 --- a/src/driver/kubetpl/channel_deployment.tpl +++ b/src/driver/kubetpl/channel_deployment.tpl @@ -30,6 +30,12 @@ "command": [ "/dp_runner" ], + "args":[ + "--node-name={{{.node.name}}}", + "--log-level={{{.log_level}}}", + "--mongo-url={{{.mongo_url}}}", + "--database={{{.database}}}", + ], "ports": [ { "containerPort": 80 diff --git a/src/driver/kubetpl/claim.tpl b/src/driver/kubetpl/claim.tpl index 3f4f141..17de1a0 100644 --- a/src/driver/kubetpl/claim.tpl +++ b/src/driver/kubetpl/claim.tpl @@ -2,7 +2,7 @@ "kind": "PersistentVolumeClaim", "apiVersion": "v1", "metadata": { - "name": "{{name}}" + "name": "{{{name}}}" }, "spec": { "storageClassName": "jz-action-fs", diff --git a/src/driver/kubetpl/deployment.tpl b/src/driver/kubetpl/deployment.tpl index 5c4c77a..12b3705 100644 --- a/src/driver/kubetpl/deployment.tpl +++ b/src/driver/kubetpl/deployment.tpl @@ -2,23 +2,23 @@ "apiVersion": "apps/v1", "kind": "Deployment", "metadata": { - "name": "{{{name}}}-deployment", + "name": "{{{node.name}}}-deployment", "id": "{{{id}}}", "labels": { "exec-type": "compute-unit" } }, "spec": { - "replicas": {{spec.replicas}}, + "replicas": {{{node.spec.replicas}}}, "selector": { "matchLabels": { - "app": "{{{name}}}-pod" + "app": "{{{node.name}}}-pod" } }, "template": { "metadata": { "labels": { - "app": "{{{name}}}-pod" + "app": "{{{node.name}}}-pod" } }, "spec": { @@ -27,7 +27,13 @@ "name": "compute-data-unit", "image": "jz-action/compute_unit_runner:latest", "command": [ - "/compute_unit_runner" + "/compute_unit_runner", + ], + "args":[ + "--node-name={{{.node.name}}}", + "--log-level={{{.log_level}}}", + "--mongo-url={{{.mongo_url}}}", + "--database={{{.database}}}" ], "imagePullPolicy":"IfNotPresent", "ports": [ @@ -48,7 +54,7 @@ }, { "name": "compute-user-unit", - "image": "{{{spec.image}}}", + "image": "{{{node.spec.image}}}", "command": [ "sleep" ], @@ -75,7 +81,7 @@ { "name": "tmpstore", "persistentVolumeClaim": { - "claimName":"{{name}}-node-claim" + "claimName":"{{{name}}}-node-claim" } } ]