From b50baa256864c3a471c7907317a1a4a15aa29b69 Mon Sep 17 00:00:00 2001 From: max-lt Date: Sun, 24 Mar 2024 21:14:40 +0100 Subject: [PATCH] Handle request method & body --- Cargo.lock | 5 ++--- Cargo.toml | 8 ++++---- bin/main.rs | 45 +++++++++++++++++++++++++++++++++++++++------ src/store.rs | 1 + src/transform.rs | 8 -------- 5 files changed, 46 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65e9c62..e7ee5d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2504,7 +2504,7 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openworkers-runner" -version = "0.1.5" +version = "0.1.6" dependencies = [ "actix-web", "base64 0.22.0", @@ -2529,8 +2529,7 @@ dependencies = [ [[package]] name = "openworkers-runtime" -version = "0.1.5" -source = "git+https://github.com/openworkers/openworkers-runtime?tag=v0.1.5#50fdebef3c253ec8f8c23198f04c2ca35de4f19c" +version = "0.1.6" dependencies = [ "bytes", "deno_console", diff --git a/Cargo.toml b/Cargo.toml index 90de2ad..619cacb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "openworkers-runner" -version = "0.1.5" +version = "0.1.6" edition = "2021" default-run = "openworkers-runner" @@ -12,10 +12,10 @@ bytes = "1.6.0" log = "0.4.21" tokio = "1.36.0" env_logger = "0.11.2" -http_v02 = { package = "http", version = "0.2.9" } +http_v02 = { package = "http", version = "0.2.12" } sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres", "uuid", "bigdecimal", "rust_decimal" ] } -openworkers-runtime ={ git = "https://github.com/openworkers/openworkers-runtime", tag = "v0.1.5"} -# openworkers-runtime = { path = "../openworkers-runtime" } +# openworkers-runtime ={ git = "https://github.com/openworkers/openworkers-runtime", tag = "v0.1.5"} +openworkers-runtime = { path = "../openworkers-runtime" } nats = "0.24.1" serde_json = "1.0.114" serde = { version = "1.0.197", features = ["derive"] } diff --git a/bin/main.rs b/bin/main.rs index 05743e9..dc84bf1 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -22,7 +22,7 @@ struct AppState { db: Database, } -async fn handle_request(data: Data, req: HttpRequest) -> HttpResponse { +async fn handle_request(data: Data, req: HttpRequest, body: Bytes) -> HttpResponse { debug!( "handle_request of: {} {} in thread {:?}", req.method(), @@ -106,14 +106,47 @@ async fn handle_request(data: Data, req: HttpRequest) -> HttpResponse let start = tokio::time::Instant::now(); - let req: http_v02::Request = http_v02::Request::builder() - .uri(req.uri()) - .body(Default::default()) - .unwrap(); + // Create a new request to forward to the worker. + let request = { + let mut request: http_v02::Request = http_v02::Request::builder() + .uri(format!( + "{}://{}{}", + req.connection_info().scheme(), + req.connection_info().host(), + req.uri() + )) + .method(req.method()) + .body(body) + .unwrap(); + + // Copy headers from the incoming request to the forwarded request. + let headers = request.headers_mut(); + for (k, v) in req.headers() { + headers.insert(k, v.clone()); + } + + // If the worker id is not provided, we add it to the headers. + if req.headers().get("x-worker-id").is_none() { + headers.insert( + "x-worker-id", + http_v02::HeaderValue::from_str(&worker.id).unwrap(), + ); + } + + // If the worker name is not provided, we add it to the headers. + if req.headers().get("x-worker-name").is_none() { + headers.insert( + "x-worker-name", + http_v02::HeaderValue::from_str(&worker.name).unwrap(), + ); + } + + request + }; let (res_tx, res_rx) = channel::>(); - let handle = openworkers_runner::event_fetch::run_fetch(worker, req, res_tx); + let handle = openworkers_runner::event_fetch::run_fetch(worker, request, res_tx); // TODO: select! on res_rx, timeout and handle.join() let response = match res_rx.await { diff --git a/src/store.rs b/src/store.rs index 2c6e799..001ddea 100644 --- a/src/store.rs +++ b/src/store.rs @@ -18,6 +18,7 @@ pub enum WorkerLanguage { #[derive(Debug, FromRow)] pub struct WorkerData { pub id: String, + pub name: String, pub env: Option>>, pub script: String, pub checksum: i64, diff --git a/src/transform.rs b/src/transform.rs index 9381190..6a72680 100644 --- a/src/transform.rs +++ b/src/transform.rs @@ -20,12 +20,6 @@ pub(crate) fn parse_worker_code(worker: &WorkerData) -> FastString { return FastString::from(worker.script.clone()); } WorkerLanguage::Typescript => { - log::debug!( - "parsing typescript worker code: {} {}", - worker.id, - worker.script - ); - let cm = Lrc::new(SourceMap::new(swc_common::FilePathMapping::empty())); let c = swc::Compiler::new(cm.clone()); @@ -37,8 +31,6 @@ pub(crate) fn parse_worker_code(worker: &WorkerData) -> FastString { let js_code = GLOBALS.set(&Default::default(), || to_js(&c, fm.clone())); - log::debug!("parsed typescript worker code: {} {}", worker.id, js_code); - return FastString::from(js_code); } }