Skip to content

Commit

Permalink
Allow multiple tasks per instance
Browse files Browse the repository at this point in the history
  • Loading branch information
max-lt committed Feb 26, 2024
1 parent 8843b8c commit 311a4d3
Show file tree
Hide file tree
Showing 15 changed files with 461 additions and 316 deletions.
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,3 @@ tokio = { version = "1.36.0", features = ["full"] }

[dev-dependencies]
actix-web = { version = "4.5.1", features = ["macros"] }

[[example]]
name = "serve"
path = "examples/serve.rs"
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# OpenWorkers runtime

OpenWorkers is a runtime for running javascript code in a serverless environment.
It is designed to be used with the [OpenWorkers CLI](https://github.com/openworkers/openworkers-cli).

This repository contains the runtime library.

## Usage

### Build all examples
```bash
cargo build --release --examples
```

### Snapshot the runtime
```bash
cargo run --example snapshot
```

### Run the demo server
#### With a new runtime instance for each request
```bash
cargo run --example serve-new -- examples/serve.js
```

#### With the same runtime instance for each request
```bash
cargo run --example serve-same -- examples/serve.js
```

### Execute a scheduled task
```bash
export RUST_LOG=openworkers_runtime=debug,serve=debug # Optional

cargo run --example scheduled -- examples/scheduled.js
```
6 changes: 4 additions & 2 deletions examples/scheduled.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ async function handleSchedule(scheduledDate) {

const res = await fetch("https://echo.workers.rocks/data.json");

console.log("Done waiting!", res.status, await res.json());
let data = await res.json();

console.log("Done waiting!", res.status, { agent: data["user-agent"] });

return "Called deploy hook!";
}

setTimeout(() => console.log("Hello from timeout"), 2000);
setTimeout(() => console.log("Hello from timeout"), 200);
51 changes: 36 additions & 15 deletions examples/scheduled.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use log::debug;
use log::error;
use openworkers_runtime::module_url;
use openworkers_runtime::AnyError;
use openworkers_runtime::ScheduledInit;
use openworkers_runtime::Task;
use openworkers_runtime::Worker;
Expand Down Expand Up @@ -33,8 +32,8 @@ async fn main() -> Result<(), ()> {
path
};

let (shutdown_tx, shutdown_rx) = oneshot::channel::<Option<AnyError>>();
let (done_tx, done_rx) = oneshot::channel::<()>();
let (res_tx, res_rx) = oneshot::channel::<()>();
let (end_tx, end_rx) = oneshot::channel::<()>();

let url = module_url(file_path.as_str());

Expand All @@ -43,24 +42,46 @@ async fn main() -> Result<(), ()> {
.expect("Time went backwards")
.as_secs();

std::thread::spawn(move || Worker::new(url, shutdown_tx).exec(Task::Scheduled(Some(ScheduledInit::new(done_tx, time)))));
let handle = std::thread::spawn(move || {
let local = tokio::task::LocalSet::new();

debug!("js worker for {:?} started", file_path);
local.spawn_local(async move {
let mut worker = Worker::new(url).await.unwrap();

match worker
.exec(Task::Scheduled(Some(ScheduledInit::new(res_tx, time))))
.await
{
Ok(()) => debug!("exec completed"),
Err(err) => error!("exec did not complete: {err}"),
}
});

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

match local.block_on(&rt, async { end_rx.await }) {
Ok(()) => {},
Err(err) => error!("failed to wait for end: {err}"),
}
});

debug!("worker started");

tokio::select! {
_ = tokio::signal::ctrl_c() => debug!("ctrl-c received"),
// wait for completion signal
done = done_rx => match done {
Ok(()) => debug!("js task for {file_path} completed"),
Err(err) => error!("js task for {file_path} did not complete: {err}"),
},
// wait for shutdown signal
end = shutdown_rx => match end {
Ok(None) => error!("js worker for {file_path} stopped before replying"),
Ok(Some(err)) => error!("js worker for {file_path} error: {err}"),
Err(err) => error!("js worker for {file_path} error: {err}"),
// wait for task completion signal
done = res_rx => match done {
Ok(()) => debug!("task completed"),
Err(err) => error!("task did not complete: {err}"),
}
}

end_tx.send(()).unwrap();

handle.join().unwrap();

Ok(())
}
135 changes: 135 additions & 0 deletions examples/serve-new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use bytes::Bytes;

use log::debug;
use log::error;
use openworkers_runtime::FetchInit;
use openworkers_runtime::Task;
use openworkers_runtime::Url;
use openworkers_runtime::Worker;

use tokio::sync::oneshot::channel;

use actix_web::{App, HttpServer};

use actix_web::web;
use actix_web::web::Data;
use actix_web::HttpRequest;
use actix_web::HttpResponse;

struct AppState {
url: Url,
}

async fn handle_request(data: Data<AppState>, req: HttpRequest) -> HttpResponse {
debug!(
"handle_request of {}: {} {} in thread {:?}",
data.url.path().split('/').last().unwrap(),
req.method(),
req.uri(),
std::thread::current().id()
);

let url = data.url.clone();
let start = tokio::time::Instant::now();

let req = http_v02::Request::builder()
.uri(req.uri())
.body(Default::default())
.unwrap();

let url_clone = url.clone();

let (res_tx, res_rx) = channel::<http_v02::Response<Bytes>>();
let task = Task::Fetch(Some(FetchInit::new(req, res_tx)));

let handle = std::thread::spawn(move || {
let local = tokio::task::LocalSet::new();

let tasks = local.spawn_local(async move {
debug!("create worker");
let mut worker = Worker::new(url_clone).await.unwrap();

debug!("exec fetch task");
match worker.exec(task).await {
Ok(()) => debug!("exec completed"),
Err(err) => error!("exec did not complete: {err}"),
}
});

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

match local.block_on(&rt, tasks) {
Ok(()) => {}
Err(err) => error!("failed to wait for end: {err}"),
}
});

let response = match res_rx.await {
Ok(res) => {
let mut rb = HttpResponse::build(res.status());

for (k, v) in res.headers() {
rb.append_header((k, v));
}

rb.body(res.body().clone())
}
Err(err) => {
error!("worker fetch error: {}, ensure the worker registered a listener for the 'fetch' event", err);
HttpResponse::InternalServerError().body(err.to_string())
}
};

debug!("handle_request done t={}", start.elapsed().as_millis());

debug!("waiting for js worker to finish t={}", start.elapsed().as_millis());
handle.join().unwrap();

response
}

fn get_path() -> String {
std::env::args()
.nth(1)
.unwrap_or_else(|| String::from("examples/serve.js"))
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
if !std::env::var("RUST_LOG").is_ok() {
std::env::set_var("RUST_LOG", "info");
}

env_logger::init();

debug!("start main");

// Check that the path is correct
{
let path = get_path();
if !std::path::Path::new(&path).is_file() {
eprintln!("file not found: {}", path);
std::process::exit(1);
}
}

println!("Listening on http://localhost:8080");

HttpServer::new(|| {
App::new()
.app_data(Data::new({
let path = get_path();
let url: Url = openworkers_runtime::module_url(path.as_str());

AppState { url }
}))
.default_service(web::to(handle_request))
})
.bind(("127.0.0.1", 8080))?
.workers(4)
.run()
.await
}
Loading

0 comments on commit 311a4d3

Please sign in to comment.