Skip to content

Commit

Permalink
chores: rename node name
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 10, 2024
1 parent 135308a commit 7e209c0
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
members = [
"crates/dp_runner",
"crates/compute_unit_runner",
"crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer", "nodes/dummy_in", "nodes/dummy_out"
"crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer", "nodes/make_article", "nodes/list_files"
, "nodes/copy_in_place", "crates/nodes_sdk"]

[workspace.package]
Expand Down
12 changes: 6 additions & 6 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@ build-nodes: $(OUTPUT)
cargo build -p jz_writer --release
cp target/release/jz_writer $(OUTPUT)/jz_writer

cargo build -p dummy_in --release
cp target/release/dummy_in $(OUTPUT)/dummy_in
cargo build -p make_article --release
cp target/release/make_article $(OUTPUT)/make_article

cargo build -p dummy_out --release
cp target/release/dummy_out $(OUTPUT)/dummy_out
cargo build -p list_files --release
cp target/release/list_files $(OUTPUT)/list_files

cargo build -p copy_in_place --release
cp target/release/copy_in_place $(OUTPUT)/copy_in_place

docker_nodes: build-nodes
docker build -f ./nodes/jz_reader/dockerfile -t gitdatateam/jz_reader:latest .
docker build -f ./nodes/jz_writer/dockerfile -t gitdatateam/jz_writer:latest .
docker build -f ./nodes/dummy_in/dockerfile -t gitdatateam/dummy_in:latest .
docker build -f ./nodes/dummy_out/dockerfile -t gitdatateam/dummy_out:latest .
docker build -f ./nodes/make_article/dockerfile -t gitdatateam/make_article:latest .
docker build -f ./nodes/list_files/dockerfile -t gitdatateam/list_files:latest .
docker build -f ./nodes/copy_in_place/dockerfile -t gitdatateam/copy_in_place:latest .

################## minikube
Expand Down
2 changes: 1 addition & 1 deletion nodes/dummy_out/Cargo.toml → nodes/list_files/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "dummy_out"
name = "list_files"
version = "0.1.0"
repository.workspace = true
license.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion nodes/dummy_out/dockerfile → nodes/list_files/dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ WORKDIR /app

RUN mkdir -p /app

ADD dist/dummy_out /dummy_out
ADD dist/list_files /list_files
6 changes: 3 additions & 3 deletions nodes/dummy_out/src/main.rs → nodes/list_files/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use walkdir::WalkDir;

#[derive(Debug, Parser)]
#[command(
name = "dummy_out",
name = "list_files",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-action>",
about = "embed in k8s images"
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn main() -> Result<()> {

{
let token = token.clone();
join_set.spawn(async move { write_dummy(token, args).await });
join_set.spawn(async move { print_files(token, args).await });
}

{
Expand All @@ -79,7 +79,7 @@ async fn main() -> Result<()> {
nodes_sdk::monitor_tasks(&mut join_set).await
}

async fn write_dummy(token: CancellationToken, args: Args) -> Result<()> {
async fn print_files(token: CancellationToken, args: Args) -> Result<()> {
let client = ipc::IPCClientImpl::new(args.unix_socket_addr);
let tmp_path = Path::new(&args.tmp_path);
loop {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "dummy_in"
name = "make_article"
version = "0.1.0"
repository.workspace = true
license.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ WORKDIR /app

RUN mkdir -p /app

ADD dist/dummy_in /dummy_in
ADD dist/make_article /make_article
4 changes: 2 additions & 2 deletions nodes/dummy_in/src/main.rs → nodes/make_article/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn main() -> Result<()> {

{
let token = token.clone();
join_set.spawn(async move { dummy_in(token, args).await });
join_set.spawn(async move { make_article(token, args).await });
}

{
Expand All @@ -84,7 +84,7 @@ async fn main() -> Result<()> {
nodes_sdk::monitor_tasks(&mut join_set).await
}

async fn dummy_in(token: CancellationToken, args: Args) -> Result<()> {
async fn make_article(token: CancellationToken, args: Args) -> Result<()> {
let client = ipc::IPCClientImpl::new(args.unix_socket_addr);
let tmp_path = Path::new(&args.tmp_path);
let count_file = tmp_path.join("number.txt");
Expand Down
14 changes: 7 additions & 7 deletions script/example_dag.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
"version": "v1",
"dag": [
{
"name": "dummy-in",
"name": "make-article",
"spec": {
"image": "gitdatateam/dummy_in:latest",
"command": "/dummy_in",
"image": "gitdatateam/make_article:latest",
"command": "/make_article",
"args": [
"--log-level=debug",
"--total-count=100"
Expand All @@ -17,7 +17,7 @@
"name": "copy-in-place",
"node_type": "ComputeUnit",
"dependency": [
"dummy-in"
"make-article"
],
"spec": {
"image": "gitdatateam/copy_in_place:latest",
Expand All @@ -32,14 +32,14 @@
}
},
{
"name": "dummy-out",
"name": "list-files",
"node_type": "ComputeUnit",
"dependency": [
"copy-in-place"
],
"spec": {
"image": "gitdatateam/dummy_out:latest",
"command": "/dummy_out",
"image": "gitdatateam/list_files:latest",
"command": "/list_files",
"replicas": 3,
"args": [
"--log-level=debug"
Expand Down
1 change: 1 addition & 0 deletions src/core/job_db_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub trait GraphRepo {

pub trait NodeRepo {
fn insert_node(&self, state: &Node) -> impl std::future::Future<Output = Result<()>> + Send;

fn get_node_by_name(
&self,
name: &str,
Expand Down
1 change: 0 additions & 1 deletion src/dbrepo/job_db_mongo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

use crate::{
core::db::{
DataRecord,
Expand Down
38 changes: 20 additions & 18 deletions src/driver/kube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ use handlebars::{
};
use k8s_metrics::v1beta1 as metricsv1;
use k8s_openapi::api::{
apps::v1::StatefulSet,
core::v1::{
Namespace,
PersistentVolumeClaim,
Pod,
Service,
},
};
apps::v1::StatefulSet,
core::v1::{
Namespace,
PersistentVolumeClaim,
Pod,
Service,
},
};
use kube::{
api::{
DeleteParams,
Expand Down Expand Up @@ -364,7 +364,7 @@ impl<R> KubePipelineController<R>
where
R: JobDbRepo,
{
fn new(repo: R, client: Client,topo_sort_nodes: Vec<String>) -> Self {
fn new(repo: R, client: Client, topo_sort_nodes: Vec<String>) -> Self {
Self {
_db_repo: repo,
topo_sort_nodes,
Expand Down Expand Up @@ -544,7 +544,8 @@ where
};
repo.insert_global_state(&graph_record).await?;
let topo_sort_nodes = graph.topo_sort_nodes();
let mut pipeline_ctl = KubePipelineController::new(repo.clone(), self.client.clone(),topo_sort_nodes);
let mut pipeline_ctl =
KubePipelineController::new(repo.clone(), self.client.clone(), topo_sort_nodes);
for node in graph.iter() {
if node.spec.command.is_empty() {
return Err(anyhow!("{} dont have command", &node.name));
Expand Down Expand Up @@ -761,7 +762,8 @@ where
let service_api: Api<Service> = Api::namespaced(self.client.clone(), run_id);

let topo_sort_nodes = graph.topo_sort_nodes();
let mut pipeline_ctl = KubePipelineController::new(repo.clone(), self.client.clone(),topo_sort_nodes);
let mut pipeline_ctl =
KubePipelineController::new(repo.clone(), self.client.clone(), topo_sort_nodes);
for node in graph.iter() {
let up_nodes = graph.get_incomming_nodes(&node.name);
// query channel
Expand Down Expand Up @@ -869,17 +871,17 @@ mod tests {
"version": "v1",
"dag": [
{
"name": "dummy-in",
"name": "make-article",
"spec": {
"image": "gitdatateam/dummy_in:latest",
"command":"/dummy_in",
"image": "gitdatateam/make_article:latest",
"command":"/make_article",
"args": ["--log-level=debug", "--total-count=100"]
}
}, {
"name": "copy-in-place",
"node_type": "ComputeUnit",
"dependency": [
"dummy-in"
"make-article"
],
"spec": {
"image": "gitdatateam/copy_in_place:latest",
Expand All @@ -892,14 +894,14 @@ mod tests {
}
},
{
"name": "dummy-out",
"name": "list-files",
"node_type": "ComputeUnit",
"dependency": [
"copy-in-place"
],
"spec": {
"image": "gitdatateam/dummy_out:latest",
"command":"/dummy_out",
"image": "gitdatateam/list_files:latest",
"command":"/list_files",
"replicas": 3,
"args": ["--log-level=debug"]
},
Expand Down

0 comments on commit 7e209c0

Please sign in to comment.