Skip to content

Commit

Permalink
feat: add docker build
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jul 18, 2024
1 parent e1a0285 commit 2f1abf6
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ Cargo.lock

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb


dist/
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
members = [
"runner/dc_runner",
"runner/dp_runner",
"runner/compute_data_runner",
]

Expand Down
26 changes: 26 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
OUTPUT = dist

$(OUTPUT):
mkdir -p $(OUTPUT)

build-cd: $(OUTPUT)
cargo build -p compute_data_runner --release
cp target/release/compute_data_runner $(OUTPUT)/compute_data_runner

build-dp: $(OUTPUT)
cargo build -p dp_runner --release
cp target/release/dp_runner $(OUTPUT)/dp_runner

build: build-cd build-dp
cargo build --release

docker_cd: build-cd
docker build -f ./script/cd.dockerfile -t jz-action/compute_data_runner:latest .

docker_dp: build-dp
docker build -f ./script/dp.dockerfile -t jz-action/dp_runner:latest .

docker: docker_cd docker_dp

clean:
rm -rf $(OUTPUT) target
26 changes: 22 additions & 4 deletions runner/compute_data_runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tonic::{transport::Server, Request, Response, Status};
use tonic::transport::Server;
use tracing::{info, Level};
use unit::DataNodeControllerServer;
use unit::UnitDataStream;
Expand All @@ -34,6 +34,12 @@ struct Args {
#[arg(short, long, default_value = "INFO")]
log_level: String,

#[arg(short, long, default_value = "/app/tmp")]
tmp_path: String,

#[arg(short, long, default_value = "")]
unix_socket_addr: String,

#[arg(long, default_value = "[::1]:25431")]
host_port: String,
}
Expand All @@ -47,15 +53,16 @@ async fn main() -> Result<()> {
.anyhow()?;

let addr = args.host_port.parse()?;
let program = BatchProgram::new(PathBuf::new());
let program = BatchProgram::new(PathBuf::from_str(args.tmp_path.as_str())?);
let program_safe = Arc::new(Mutex::new(program));


let node_controller = DataNodeControllerServer {
program: program_safe.clone(),
};

let data_stream = UnitDataStream {
program: program_safe,
program: program_safe.clone(),
};

let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<Result<()>>(1);
Expand All @@ -70,13 +77,24 @@ async fn main() -> Result<()> {
.await
.anyhow()
{
let _ = shutdown_tx_arc.send(Err(e)).await;
let _ = shutdown_tx_arc.send(Err(anyhow!("start data controller {e}"))).await;
}
});

info!("node listening on {}", addr);
}

{
//listen unix socket
let unix_socket_addr = args.unix_socket_addr.clone();
let program = program_safe.clone();
let shutdown_tx_arc = shutdown_tx.clone();
let _ = tokio::spawn(async move {
if let Err(e) = ipc::start_ipc_server(unix_socket_addr, program) {
let _ = shutdown_tx_arc.send(Err(anyhow!("start unix socket server {e}"))).await;
}
});
}
{
//catch signal
let _ = tokio::spawn(async move {
Expand Down
12 changes: 6 additions & 6 deletions runner/compute_data_runner/src/program.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::ipc::{DataResponse, SubmitResultReq};
use anyhow::{anyhow, Ok, Result};
use anyhow::{anyhow, Result};
use jz_action::network::common::Empty;
use jz_action::network::datatransfer::data_stream_client::DataStreamClient;
use jz_action::network::datatransfer::{DataBatchResponse, DataCell};
Expand Down Expand Up @@ -90,7 +90,7 @@ impl BatchProgram {
}

error!("unable read data from stream");
Ok(())
anyhow::Ok(())
});

info!("listen data from upstream {}", upstream);
Expand Down Expand Up @@ -160,11 +160,11 @@ impl BatchProgram {
let mut entry_count = 0 ;
for entry in WalkDir::new(tmp_out_path) {
match entry {
std::result::Result::Ok(entry) => {
Ok(entry) => {
if entry.file_type().is_file() {
let path = entry.path();
match fs::read(path) {
std::result::Result::Ok(content) => {
Ok(content) => {
new_batch.cells.push(DataCell{
size: content.len() as i32,
path: path.to_str().unwrap().to_string(),
Expand All @@ -174,15 +174,15 @@ impl BatchProgram {
}
Err(e) => error!("read file({:?}) fail {}", path, e),
}

println!("{}", entry.path().display());
}
}
Err(e) => error!("walk out dir({:?}) fail {}", &req.path, e),
}
}
new_batch.size = entry_count;

//write another
if new_batch.size >0 {
if let Err(e) = out_going_tx.send(new_batch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "dc_runner"
name = "dp_runner"
version = "0.1.0"
edition = "2021"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use unit::UnitDataStream;

#[derive(Debug, Parser)]
#[command(
name = "dc_runner",
name = "dp_runner",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-action>",
about = "embed in k8s images"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
9 changes: 9 additions & 0 deletions script/cd.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM ubuntu:22.04

RUN mkdir -p /app

WORKDIR /app

ADD dist/compute_data_runner /app/compute_data_runner

CMD ["compute_data_runner"]
9 changes: 9 additions & 0 deletions script/dp.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM ubuntu:22.04

RUN mkdir -p /app

WORKDIR /app

ADD dist/dp_runner /app/dp_runner

CMD ["compute_data_runner"]
6 changes: 3 additions & 3 deletions src/core/cnode.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::{MachineSpec, GID};
use serde::{Deserialize, Serialize};

// Channel use to definite data transfer channel
// DataPoint use to definite data transfer channel
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Channel {
pub struct DataPoint {
pub spec: MachineSpec,
}

Expand All @@ -20,7 +20,7 @@ where

pub spec: MachineSpec,

pub channel: Option<Channel>,
pub channel: Option<DataPoint>,

#[serde(bound(deserialize = ""))]
pub(crate) dependency: Vec<ID>,
Expand Down
10 changes: 9 additions & 1 deletion src/driver/kubetpl/deployment.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"spec": {
"containers": [
{
"name": "compute-unit",
"name": "compute-data-unit",
"image": "{{{spec.image}}}",
"command": [ "sleep" ],
"args": [ "infinity" ],
Expand All @@ -35,6 +35,14 @@
]
}
]
"containers": [
{
"name": "compute-user-unit",
"image": "{{{spec.image}}}",
"command": [ "sleep" ],
"args": [ "infinity" ]
}
]
}
}
}
Expand Down

0 comments on commit 2f1abf6

Please sign in to comment.