From 2f1abf6006e503ab8f739d7721e5712e56adb5fd Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Thu, 18 Jul 2024 19:36:54 +0800 Subject: [PATCH] feat: add docker build --- .gitignore | 3 +++ Cargo.toml | 2 +- makefile | 26 +++++++++++++++++++ runner/compute_data_runner/src/main.rs | 26 ++++++++++++++++--- runner/compute_data_runner/src/program.rs | 12 ++++----- runner/{dc_runner => dp_runner}/Cargo.toml | 2 +- runner/{dc_runner => dp_runner}/src/main.rs | 2 +- runner/{dc_runner => dp_runner}/src/mprc.rs | 0 .../{dc_runner => dp_runner}/src/program.rs | 0 runner/{dc_runner => dp_runner}/src/unit.rs | 0 script/cd.dockerfile | 9 +++++++ script/dp.dockerfile | 9 +++++++ src/core/cnode.rs | 6 ++--- src/driver/kubetpl/deployment.tpl | 10 ++++++- 14 files changed, 90 insertions(+), 17 deletions(-) create mode 100644 makefile rename runner/{dc_runner => dp_runner}/Cargo.toml (95%) rename runner/{dc_runner => dp_runner}/src/main.rs (99%) rename runner/{dc_runner => dp_runner}/src/mprc.rs (100%) rename runner/{dc_runner => dp_runner}/src/program.rs (100%) rename runner/{dc_runner => dp_runner}/src/unit.rs (100%) create mode 100644 script/cd.dockerfile create mode 100644 script/dp.dockerfile diff --git a/.gitignore b/.gitignore index 6985cf1..3efd79d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + + +dist/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 6e1e5f3..3f5d107 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] members = [ - "runner/dc_runner", + "runner/dp_runner", "runner/compute_data_runner", ] diff --git a/makefile b/makefile new file mode 100644 index 0000000..efb11d7 --- /dev/null +++ b/makefile @@ -0,0 +1,26 @@ +OUTPUT = dist + +$(OUTPUT): + mkdir -p $(OUTPUT) + +build-cd: $(OUTPUT) + cargo build -p compute_data_runner --release + cp target/release/compute_data_runner $(OUTPUT)/compute_data_runner + +build-dp: $(OUTPUT) + cargo build -p dp_runner --release + cp target/release/dp_runner $(OUTPUT)/dp_runner + +build: build-cd build-dp + cargo build --release + +docker_cd: build-cd + docker build -f ./script/cd.dockerfile -t jz-action/compute_data_runner:latest . + +docker_dp: build-dp + docker build -f ./script/dp.dockerfile -t jz-action/dp_runner:latest . + +docker: docker_cd docker_dp + +clean: + rm -rf $(OUTPUT) target diff --git a/runner/compute_data_runner/src/main.rs b/runner/compute_data_runner/src/main.rs index 8e6a86e..5572b95 100644 --- a/runner/compute_data_runner/src/main.rs +++ b/runner/compute_data_runner/src/main.rs @@ -18,7 +18,7 @@ use tokio::select; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc; use tokio::sync::Mutex; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::transport::Server; use tracing::{info, Level}; use unit::DataNodeControllerServer; use unit::UnitDataStream; @@ -34,6 +34,12 @@ struct Args { #[arg(short, long, default_value = "INFO")] log_level: String, + #[arg(short, long, default_value = "/app/tmp")] + tmp_path: String, + + #[arg(short, long, default_value = "")] + unix_socket_addr: String, + #[arg(long, default_value = "[::1]:25431")] host_port: String, } @@ -47,15 +53,16 @@ async fn main() -> Result<()> { .anyhow()?; let addr = args.host_port.parse()?; - let program = BatchProgram::new(PathBuf::new()); + let program = BatchProgram::new(PathBuf::from_str(args.tmp_path.as_str())?); let program_safe = Arc::new(Mutex::new(program)); + let node_controller = DataNodeControllerServer { program: program_safe.clone(), }; let data_stream = UnitDataStream { - program: program_safe, + program: program_safe.clone(), }; let (shutdown_tx, mut shutdown_rx) = mpsc::channel::>(1); @@ -70,13 +77,24 @@ async fn main() -> Result<()> { .await .anyhow() { - let _ = shutdown_tx_arc.send(Err(e)).await; + let _ = shutdown_tx_arc.send(Err(anyhow!("start data controller {e}"))).await; } }); info!("node listening on {}", addr); } + { + //listen unix socket + let unix_socket_addr = args.unix_socket_addr.clone(); + let program = program_safe.clone(); + let shutdown_tx_arc = shutdown_tx.clone(); + let _ = tokio::spawn(async move { + if let Err(e) = ipc::start_ipc_server(unix_socket_addr, program) { + let _ = shutdown_tx_arc.send(Err(anyhow!("start unix socket server {e}"))).await; + } + }); + } { //catch signal let _ = tokio::spawn(async move { diff --git a/runner/compute_data_runner/src/program.rs b/runner/compute_data_runner/src/program.rs index 5406d8b..5ed881e 100644 --- a/runner/compute_data_runner/src/program.rs +++ b/runner/compute_data_runner/src/program.rs @@ -1,5 +1,5 @@ use crate::ipc::{DataResponse, SubmitResultReq}; -use anyhow::{anyhow, Ok, Result}; +use anyhow::{anyhow, Result}; use jz_action::network::common::Empty; use jz_action::network::datatransfer::data_stream_client::DataStreamClient; use jz_action::network::datatransfer::{DataBatchResponse, DataCell}; @@ -90,7 +90,7 @@ impl BatchProgram { } error!("unable read data from stream"); - Ok(()) + anyhow::Ok(()) }); info!("listen data from upstream {}", upstream); @@ -160,11 +160,11 @@ impl BatchProgram { let mut entry_count = 0 ; for entry in WalkDir::new(tmp_out_path) { match entry { - std::result::Result::Ok(entry) => { + Ok(entry) => { if entry.file_type().is_file() { let path = entry.path(); match fs::read(path) { - std::result::Result::Ok(content) => { + Ok(content) => { new_batch.cells.push(DataCell{ size: content.len() as i32, path: path.to_str().unwrap().to_string(), @@ -174,7 +174,7 @@ impl BatchProgram { } Err(e) => error!("read file({:?}) fail {}", path, e), } - + println!("{}", entry.path().display()); } } @@ -182,7 +182,7 @@ impl BatchProgram { } } new_batch.size = entry_count; - + //write another if new_batch.size >0 { if let Err(e) = out_going_tx.send(new_batch) { diff --git a/runner/dc_runner/Cargo.toml b/runner/dp_runner/Cargo.toml similarity index 95% rename from runner/dc_runner/Cargo.toml rename to runner/dp_runner/Cargo.toml index bc8196f..05ba58f 100644 --- a/runner/dc_runner/Cargo.toml +++ b/runner/dp_runner/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "dc_runner" +name = "dp_runner" version = "0.1.0" edition = "2021" diff --git a/runner/dc_runner/src/main.rs b/runner/dp_runner/src/main.rs similarity index 99% rename from runner/dc_runner/src/main.rs rename to runner/dp_runner/src/main.rs index d479a2c..57c88f6 100644 --- a/runner/dc_runner/src/main.rs +++ b/runner/dp_runner/src/main.rs @@ -23,7 +23,7 @@ use unit::UnitDataStream; #[derive(Debug, Parser)] #[command( - name = "dc_runner", + name = "dp_runner", version = "0.0.1", author = "Author Name ", about = "embed in k8s images" diff --git a/runner/dc_runner/src/mprc.rs b/runner/dp_runner/src/mprc.rs similarity index 100% rename from runner/dc_runner/src/mprc.rs rename to runner/dp_runner/src/mprc.rs diff --git a/runner/dc_runner/src/program.rs b/runner/dp_runner/src/program.rs similarity index 100% rename from runner/dc_runner/src/program.rs rename to runner/dp_runner/src/program.rs diff --git a/runner/dc_runner/src/unit.rs b/runner/dp_runner/src/unit.rs similarity index 100% rename from runner/dc_runner/src/unit.rs rename to runner/dp_runner/src/unit.rs diff --git a/script/cd.dockerfile b/script/cd.dockerfile new file mode 100644 index 0000000..899a3a7 --- /dev/null +++ b/script/cd.dockerfile @@ -0,0 +1,9 @@ +FROM ubuntu:22.04 + +RUN mkdir -p /app + +WORKDIR /app + +ADD dist/compute_data_runner /app/compute_data_runner + +CMD ["compute_data_runner"] \ No newline at end of file diff --git a/script/dp.dockerfile b/script/dp.dockerfile new file mode 100644 index 0000000..b7dcd9d --- /dev/null +++ b/script/dp.dockerfile @@ -0,0 +1,9 @@ +FROM ubuntu:22.04 + +RUN mkdir -p /app + +WORKDIR /app + +ADD dist/dp_runner /app/dp_runner + +CMD ["compute_data_runner"] \ No newline at end of file diff --git a/src/core/cnode.rs b/src/core/cnode.rs index 70baf44..a948814 100644 --- a/src/core/cnode.rs +++ b/src/core/cnode.rs @@ -1,9 +1,9 @@ use super::{MachineSpec, GID}; use serde::{Deserialize, Serialize}; -// Channel use to definite data transfer channel +// DataPoint use to definite data transfer channel #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Channel { +pub struct DataPoint { pub spec: MachineSpec, } @@ -20,7 +20,7 @@ where pub spec: MachineSpec, - pub channel: Option, + pub channel: Option, #[serde(bound(deserialize = ""))] pub(crate) dependency: Vec, diff --git a/src/driver/kubetpl/deployment.tpl b/src/driver/kubetpl/deployment.tpl index 0404ef1..c9e8f70 100644 --- a/src/driver/kubetpl/deployment.tpl +++ b/src/driver/kubetpl/deployment.tpl @@ -24,7 +24,7 @@ "spec": { "containers": [ { - "name": "compute-unit", + "name": "compute-data-unit", "image": "{{{spec.image}}}", "command": [ "sleep" ], "args": [ "infinity" ], @@ -35,6 +35,14 @@ ] } ] + "containers": [ + { + "name": "compute-user-unit", + "image": "{{{spec.image}}}", + "command": [ "sleep" ], + "args": [ "infinity" ] + } + ] } } }