Skip to content

Commit

Permalink
feat: add basic driver structure
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jun 15, 2024
1 parent 25a9727 commit 7991827
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ uuid = {version="1.8.0", features = ["v4","serde"]}
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
kube = { version = "0.91.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.22.0", features = ["latest"] }
handlebars = "5.1.2"

[dev-dependencies]
arrayvec = {version="0.7.4", features= ["serde"]}
50 changes: 23 additions & 27 deletions src/core/cnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,30 @@ mod tests {

#[test]
fn test_from_str() {
xxx();
}

fn xxx() -> ComputeUnit<Uuid> {
let json_str = r#"
{
"id": "5c42b900-a87f-45e3-ba06-c40d94ad5ba2",
"name": "ComputeUnit1",
"dependency": [
],
"spec": {
"cmd": [
"ls"
],
"image": ""
},
"channel": {
"spec": {
"cmd": [
"ls"
],
"image": ""
}
}
}
"#
{
"id": "5c42b900-a87f-45e3-ba06-c40d94ad5ba2",
"name": "ComputeUnit1",
"dependency": [
],
"spec": {
"cmd": [
"ls"
],
"image": ""
},
"channel": {
"spec": {
"cmd": [
"ls"
],
"image": ""
}
}
}
"#
.to_owned();
serde_json::from_str::<ComputeUnit<Uuid>>(&json_str).unwrap()
serde_json::from_str::<ComputeUnit<Uuid>>(&json_str).unwrap();
}
}
4 changes: 3 additions & 1 deletion src/core/id.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize};
use std::default::Default;
use std::hash::Hash;
use std::marker::Send;

pub trait GID = Eq + Clone + Copy + Hash + Send + Sync + Serialize + for<'de> Deserialize<'de>;
pub trait GID =
Eq + Clone + Default + Copy + Hash + Send + Sync + Serialize + for<'de> Deserialize<'de>;
1 change: 1 addition & 0 deletions src/core/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MachineSpec {
pub image: String,
pub replicas: u32,
pub cmd: Vec<String>,
}
9 changes: 9 additions & 0 deletions src/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct Dag<ID>
where
ID: GID,
{
id: ID,
name: String,
nodes: HashMap<ID, ComputeUnit<ID>>,
/// Store dependency relations.
Expand All @@ -17,6 +18,7 @@ where
impl<ID: GID> Dag<ID> {
pub fn new() -> Self {
Dag {
id: ID::default(),
name: String::new(),
nodes: HashMap::new(),
rely_graph: Graph::new(),
Expand Down Expand Up @@ -49,6 +51,12 @@ impl<ID: GID> Dag<ID> {
// from_json build graph from json string
pub fn from_json<'a>(json: &'a str) -> Result<Self> {
let value: serde_json::Value = serde_json::from_str(json)?;

let id: ID = value
.get("id")
.anyhow("id must exit")
.map(|v| serde_json::from_value::<ID>(v.clone()))??;

let dag_name: &str = value
.get("name")
.anyhow("name must exit")
Expand All @@ -72,6 +80,7 @@ impl<ID: GID> Dag<ID> {
let nodes_map: HashMap<ID, ComputeUnit<ID>> =
nodes.into_iter().map(|node| (node.id, node)).collect();
Ok(Dag {
id: id,
name: dag_name.to_string(),
nodes: nodes_map,
rely_graph: rely_graph,
Expand Down
1 change: 0 additions & 1 deletion src/dag/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ mod tests {

use super::*;
use arrayvec::ArrayString;
use serde::{Deserialize, Serialize};

type StringID = ArrayString<5>;

Expand Down
51 changes: 46 additions & 5 deletions src/driver/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{
future::Future,
sync::{Arc, Mutex},
};
pub trait NHandler {
pub trait UnitHandler<ID>
where
ID: GID,
{
//pause graph running for now
fn pause(&mut self) -> impl Future<Output = Result<()>> + Send;

Expand All @@ -14,10 +17,40 @@ pub trait NHandler {

//stop resource about this graph
fn stop(&mut self) -> impl Future<Output = Result<()>> + Send;

//return a channel handler
fn channel_handler(
&self,
) -> impl Future<Output = Result<Option<Arc<Mutex<impl ChannelHandler<ID>>>>>> + Send;
}

pub trait PipelineController {
fn get_node(&self, name: &str) -> impl Future<Output = Result<impl NHandler>> + Send;
pub trait ChannelHandler<ID>
where
ID: GID,
{
//pause graph running for now
fn pause(&mut self) -> impl Future<Output = Result<()>> + Send;

//restart paused graph
fn restart(&mut self) -> impl Future<Output = Result<()>> + Send;

//stop resource about this graph
fn stop(&mut self) -> impl Future<Output = Result<()>> + Send;
}

pub trait PipelineController<ID>
where
ID: GID,
{
fn get_node<'a>(
&'a self,
id: &'a ID,
) -> impl std::future::Future<Output = Result<&'a impl UnitHandler<ID>>> + Send;

fn get_node_mut<'a>(
&'a mut self,
id: &'a ID,
) -> impl std::future::Future<Output = Result<&'a mut impl UnitHandler<ID>>> + Send;
}

pub trait Driver<ID>
Expand All @@ -27,9 +60,17 @@ where
//deploy graph to cluster
fn deploy(
&self,
namespace: &str,
graph: &Dag<ID>,
) -> impl Future<Output = Result<impl PipelineController<ID>>> + Send;

//attach cluster in cloud with graph
fn attach(
&self,
namespace: &str,
graph: &Dag<ID>,
) -> impl Future<Output = Result<Arc<Mutex<impl PipelineController>>>> + Send;
) -> impl Future<Output = Result<impl PipelineController<ID>>> + Send;

//clean all resource about this graph
fn clean(&self) -> impl Future<Output = Result<()>> + Send;
fn clean(&self, namespace: &str) -> impl Future<Output = Result<()>> + Send;
}
Loading

0 comments on commit 7991827

Please sign in to comment.