Skip to content

Commit

Permalink
impl WorkerIO in Rt
Browse files Browse the repository at this point in the history
  • Loading branch information
ssddOnTop committed May 7, 2024
1 parent e77b43e commit f8259b7
Show file tree
Hide file tree
Showing 23 changed files with 174 additions and 46 deletions.
10 changes: 5 additions & 5 deletions examples/jsonplaceholder.graphql
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
schema
@server(port: 8000, headers: {cors: {allowOrigins: ["*"], allowHeaders: ["*"], allowMethods: [POST, GET, OPTIONS]}})
@upstream(baseURL: "http://jsonplaceholder.typicode.com", httpCache: true, batch: {delay: 100}) {
@server(port: 8000)
@link(src: "scripts/echo.js", type: Script)
@upstream(baseURL: "http://jsonplaceholder.typicode.com", httpCache: true) {
query: Query
}

type Query {
posts: [Post] @http(path: "/posts")
users: [User] @http(path: "/users")
user(id: Int!): User @http(path: "/users/{{.args.id}}")
}

type User {
Expand All @@ -24,5 +23,6 @@ type Post {
userId: Int!
title: String!
body: String!
user: User @call(steps: [{query: "user", args: {id: "{{.value.userId}}"}}])
curId: Int! @js(name: "hello")
# user: User @call(steps: [{query: "user", args: {id: "{{.value.userId}}"}}])
}
5 changes: 5 additions & 0 deletions examples/scripts/echo.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
function onRequest({request}) {
return {request}
}

function hello(val) {
let json = JSON.parse(val)
return JSON.stringify(json.id)
}
4 changes: 2 additions & 2 deletions generated/.tailcallrc.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ directive @http(
) on FIELD_DEFINITION

directive @js(
script: String!
name: String!
) on FIELD_DEFINITION

"""
Expand Down Expand Up @@ -667,7 +667,7 @@ enum HttpVersion {
HTTP2
}
input JS {
script: String!
name: String!
}
input KeyValue {
key: String!
Expand Down
4 changes: 2 additions & 2 deletions generated/.tailcallrc.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,10 @@
"JS": {
"type": "object",
"required": [
"script"
"name"
],
"properties": {
"script": {
"name": {
"type": "string"
}
}
Expand Down
1 change: 1 addition & 0 deletions src/blueprint/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ pub fn to_field_definition(
.and(update_http().trace(config::Http::trace_name().as_str()))
.and(update_grpc(operation_type).trace(config::Grpc::trace_name().as_str()))
.and(update_const_field().trace(config::Expr::trace_name().as_str()))
.and(update_js_field().trace(config::JS::trace_name().as_str()))
.and(update_graphql(operation_type).trace(config::GraphQL::trace_name().as_str()))
.and(update_modify().trace(config::Modify::trace_name().as_str()))
.and(update_call(operation_type, object_name).trace(config::Call::trace_name().as_str()))
Expand Down
53 changes: 53 additions & 0 deletions src/blueprint/operators/js.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::sync::Arc;

use crate::blueprint::*;
use crate::config;
use crate::config::Field;
use crate::javascript::Runtime;

Check failure on line 6 in src/blueprint/operators/js.rs

View workflow job for this annotation

GitHub Actions / Test AWS Lambda Build

unresolved import `crate::javascript`

Check failure on line 6 in src/blueprint/operators/js.rs

View workflow job for this annotation

GitHub Actions / Run Tests (WASM)

unresolved import `crate::javascript`
use crate::lambda::Expression;
use crate::try_fold::TryFold;
use crate::valid::{Valid, ValidationError, Validator};

pub struct CompileJs<'a> {
pub name: &'a str,
pub script: &'a Option<String>,
}

pub fn compile_js(inputs: CompileJs) -> Valid<Expression, String> {
let name = inputs.name;
let quickjs = rquickjs::Runtime::new().unwrap();

Check failure on line 18 in src/blueprint/operators/js.rs

View workflow job for this annotation

GitHub Actions / Test AWS Lambda Build

failed to resolve: use of undeclared crate or module `rquickjs`

Check failure on line 18 in src/blueprint/operators/js.rs

View workflow job for this annotation

GitHub Actions / Run Tests (WASM)

failed to resolve: use of undeclared crate or module `rquickjs`
let ctx = rquickjs::Context::full(&quickjs).unwrap();

Check failure on line 19 in src/blueprint/operators/js.rs

View workflow job for this annotation

GitHub Actions / Test AWS Lambda Build

failed to resolve: use of undeclared crate or module `rquickjs`

Check failure on line 19 in src/blueprint/operators/js.rs

View workflow job for this annotation

GitHub Actions / Run Tests (WASM)

failed to resolve: use of undeclared crate or module `rquickjs`

Valid::from_option(inputs.script.as_ref(), "script is required".to_string()).and_then(
|script| {
Valid::from(
ctx.with(|ctx| {
ctx.eval::<(), &str>(script)?;
Ok::<_, anyhow::Error>(())
})
.map_err(|e| ValidationError::new(e.to_string())),
)
.map(|_| {
Expression::Js(Arc::new(Runtime::new(
name.to_string(),
Script { source: script.clone(), timeout: None },
)))
})
},
)
}

Check warning on line 38 in src/blueprint/operators/js.rs

View check run for this annotation

Codecov / codecov/patch

src/blueprint/operators/js.rs#L16-L38

Added lines #L16 - L38 were not covered by tests

pub fn update_js_field<'a>(
) -> TryFold<'a, (&'a ConfigModule, &'a Field, &'a config::Type, &'a str), FieldDefinition, String>
{
TryFold::<(&ConfigModule, &Field, &config::Type, &str), FieldDefinition, String>::new(
|(module, field, _, _), b_field| {
let Some(js) = &field.script else {
return Valid::succeed(b_field);
};

compile_js(CompileJs { script: &module.extensions.script, name: &js.name })
.map(|resolver| b_field.resolver(Some(resolver)))

Check warning on line 50 in src/blueprint/operators/js.rs

View check run for this annotation

Codecov / codecov/patch

src/blueprint/operators/js.rs#L49-L50

Added lines #L49 - L50 were not covered by tests
},
)
}
2 changes: 2 additions & 0 deletions src/blueprint/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod expr;
mod graphql;
mod grpc;
mod http;
mod js;
mod modify;
mod protected;

Expand All @@ -11,5 +12,6 @@ pub use expr::*;
pub use graphql::*;
pub use grpc::*;
pub use http::*;
pub use js::*;
pub use modify::*;
pub use protected::*;
2 changes: 0 additions & 2 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
mod command;
mod error;
mod fmt;
#[cfg(feature = "js")]
pub mod javascript;
pub mod metrics;
pub mod server;
mod tc;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn init_file() -> Arc<dyn FileIO> {
fn init_hook_http(http: Arc<impl HttpIO>, script: Option<blueprint::Script>) -> Arc<dyn HttpIO> {
#[cfg(feature = "js")]
if let Some(script) = script {
return crate::cli::javascript::init_http(http, script);
return crate::javascript::init_http(http, script);

Check warning on line 26 in src/cli/runtime/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/cli/runtime/mod.rs#L26

Added line #L26 was not covered by tests
}

#[cfg(not(feature = "js"))]
Expand Down
2 changes: 1 addition & 1 deletion src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl Field {

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, schemars::JsonSchema)]
pub struct JS {
pub script: String,
pub name: String,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, schemars::JsonSchema)]
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/cli/javascript/mod.rs → src/javascript/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn init_http(
script: blueprint::Script,
) -> Arc<dyn HttpIO + Sync + Send> {
tracing::debug!("Initializing JavaScript HTTP filter: {}", script.source);
let script_io = Arc::new(Runtime::new(script));
let script_io = Arc::new(Runtime::new("onRequest".to_string(), script));
Arc::new(RequestFilter::new(http, script_io))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl RequestFilter {
async fn on_request(&self, mut request: reqwest::Request) -> anyhow::Result<Response<Bytes>> {
let js_request = JsRequest::try_from(&request)?;
let event = Event::Request(js_request);
let command = self.worker.call("onRequest".to_string(), event).await?;
let command = self.worker.call(event).await?;
match command {
Some(command) => match command {
Command::Request(js_request) => {
Expand Down Expand Up @@ -104,9 +104,9 @@ mod tests {
use hyper::body::Bytes;
use rquickjs::{Context, FromJs, IntoJs, Object, Runtime, String as JsString};

use crate::cli::javascript::request_filter::Command;
use crate::cli::javascript::{JsRequest, JsResponse};
use crate::http::Response;
use crate::javascript::request_filter::Command;
use crate::javascript::{JsRequest, JsResponse};

#[test]
fn test_command_from_invalid_object() {
Expand Down
102 changes: 82 additions & 20 deletions src/cli/javascript/runtime.rs → src/javascript/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::cell::{OnceCell, RefCell};
use std::fmt::{Debug, Formatter};
use std::thread;

use async_graphql_value::ConstValue;
use rquickjs::{Context, Ctx, FromJs, Function, IntoJs, Value};

use super::request_filter::{Command, Event};
Expand All @@ -25,7 +27,7 @@ fn qjs_print(msg: String, is_err: bool) {

fn setup_builtins(ctx: &Ctx<'_>) -> rquickjs::Result<()> {
ctx.globals().set("__qjs_print", js_qjs_print)?;
let _: Value = ctx.eval_file("src/cli/javascript/shim/console.js")?;
let _: Value = ctx.eval_file("src/javascript/shim/console.js")?;

Ok(())
}
Expand All @@ -47,18 +49,33 @@ impl LocalRuntime {

pub struct Runtime {
script: blueprint::Script,
function_name: String,
// Single threaded JS runtime, that's shared across all tokio workers.
tokio_runtime: Option<tokio::runtime::Runtime>,
}

impl Debug for Runtime {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Runtime {{ script: {:?}, function_name: {} }}",
self.script, self.function_name
)
}

Check warning on line 64 in src/javascript/runtime.rs

View check run for this annotation

Codecov / codecov/patch

src/javascript/runtime.rs#L58-L64

Added lines #L58 - L64 were not covered by tests
}

impl Runtime {
pub fn new(script: blueprint::Script) -> Self {
pub fn new(name: String, script: blueprint::Script) -> Self {
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.expect("JS runtime not initialized");

Self { script, tokio_runtime: Some(tokio_runtime) }
Self {
script,
function_name: name,
tokio_runtime: Some(tokio_runtime),
}
}
}

Expand All @@ -75,27 +92,13 @@ impl Drop for Runtime {

#[async_trait::async_trait]
impl WorkerIO<Event, Command> for Runtime {
async fn call(&self, name: String, event: Event) -> anyhow::Result<Option<Command>> {
async fn call(&self, event: Event) -> anyhow::Result<Option<Command>> {
let script = self.script.clone();
let name = self.function_name.clone();
if let Some(runtime) = &self.tokio_runtime {
runtime
.spawn(async move {
// initialize runtime if this is the first call
// exit if failed to initialize
LOCAL_RUNTIME.with(move |cell| {
if cell.borrow().get().is_none() {
LocalRuntime::try_new(script).and_then(|runtime| {
cell.borrow().set(runtime).map_err(|_| {
anyhow::anyhow!(
"trying to reinitialize an already initialized QuickJS runtime"
)
})
})
} else {
Ok(())
}
})?;

init_rt(script)?;
call(name, event)
})
.await?
Expand All @@ -105,6 +108,40 @@ impl WorkerIO<Event, Command> for Runtime {
}
}

#[async_trait::async_trait]
impl WorkerIO<Option<ConstValue>, ConstValue> for Runtime {
async fn call(&self, input: Option<ConstValue>) -> anyhow::Result<Option<ConstValue>> {
let script = self.script.clone();
let name = self.function_name.clone();
if let Some(runtime) = &self.tokio_runtime {
runtime
.spawn(async move {
init_rt(script)?;
execute_inner(name, input).map(Some)
})
.await?
} else {
anyhow::bail!("JS Runtime is stopped")
}
}

Check warning on line 126 in src/javascript/runtime.rs

View check run for this annotation

Codecov / codecov/patch

src/javascript/runtime.rs#L113-L126

Added lines #L113 - L126 were not covered by tests
}

fn init_rt(script: blueprint::Script) -> anyhow::Result<()> {
// initialize runtime if this is the first call
// exit if failed to initialize
LOCAL_RUNTIME.with(move |cell| {
if cell.borrow().get().is_none() {
LocalRuntime::try_new(script).and_then(|runtime| {
cell.borrow().set(runtime).map_err(|_| {
anyhow::anyhow!("trying to reinitialize an already initialized QuickJS runtime")

Check warning on line 136 in src/javascript/runtime.rs

View check run for this annotation

Codecov / codecov/patch

src/javascript/runtime.rs#L136

Added line #L136 was not covered by tests
})
})
} else {
Ok(())
}
})
}

fn prepare_args<'js>(ctx: &Ctx<'js>, req: JsRequest) -> rquickjs::Result<(Value<'js>,)> {
let object = rquickjs::Object::new(ctx.clone())?;
object.set("request", req.into_js(ctx)?)?;
Expand Down Expand Up @@ -137,3 +174,28 @@ fn call(name: String, event: Event) -> anyhow::Result<Option<Command>> {
})
})
}

fn execute_inner(name: String, value: Option<ConstValue>) -> anyhow::Result<ConstValue> {
let value = value
.map(|v| serde_json::to_string(&v))
.ok_or(anyhow::anyhow!("No graphql value found"))??;

Check warning on line 181 in src/javascript/runtime.rs

View check run for this annotation

Codecov / codecov/patch

src/javascript/runtime.rs#L178-L181

Added lines #L178 - L181 were not covered by tests

LOCAL_RUNTIME.with_borrow_mut(|cell| {
let runtime = cell
.get_mut()
.ok_or(anyhow::anyhow!("JS runtime not initialized"))?;
runtime.0.with(|ctx| {
let fn_as_value = ctx
.globals()
.get::<_, rquickjs::Function>(&name)
.map_err(|_| anyhow::anyhow!("globalThis not initialized"))?;

Check warning on line 191 in src/javascript/runtime.rs

View check run for this annotation

Codecov / codecov/patch

src/javascript/runtime.rs#L183-L191

Added lines #L183 - L191 were not covered by tests

let function = fn_as_value
.as_function()
.ok_or(anyhow::anyhow!("`hello` is not a function"))?;
let val: String = function.call((value, ))
.map_err(|_| anyhow::anyhow!("unable to parse value from js function: {} maybe because it's not returning a string?", name,))?;
Ok::<_, anyhow::Error>(serde_json::from_str(&val)?)
})
})
}

Check warning on line 201 in src/javascript/runtime.rs

View check run for this annotation

Codecov / codecov/patch

src/javascript/runtime.rs#L193-L201

Added lines #L193 - L201 were not covered by tests
File renamed without changes.
9 changes: 8 additions & 1 deletion src/lambda/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use async_graphql_value::ConstValue;
use thiserror::Error;

use super::{Eval, EvaluationContext, ResolverContextLike, IO};
use crate::auth;
use crate::blueprint::DynamicValue;
use crate::javascript::Runtime;

Check failure on line 12 in src/lambda/expression.rs

View workflow job for this annotation

GitHub Actions / Test AWS Lambda Build

unresolved import `crate::javascript`

Check failure on line 12 in src/lambda/expression.rs

View workflow job for this annotation

GitHub Actions / Run Tests (WASM)

unresolved import `crate::javascript`
use crate::json::JsonLike;
use crate::lambda::cache::Cache;
use crate::serde_value_ext::ValueExt;
use crate::{auth, WorkerIO};

Check failure on line 16 in src/lambda/expression.rs

View workflow job for this annotation

GitHub Actions / Test AWS Lambda Build

unused import: `WorkerIO`

Check failure on line 16 in src/lambda/expression.rs

View workflow job for this annotation

GitHub Actions / Run Tests (WASM)

unused import: `WorkerIO`

#[derive(Clone, Debug)]
pub enum Expression {
Expand All @@ -22,6 +23,7 @@ pub enum Expression {
Cache(Cache),
Path(Box<Expression>, Vec<String>),
Protect(Box<Expression>),
Js(Arc<Runtime>),
}

impl Display for Expression {
Expand All @@ -33,6 +35,7 @@ impl Display for Expression {
Expression::Cache(_) => write!(f, "Cache"),
Expression::Path(_, _) => write!(f, "Input"),
Expression::Protect(expr) => write!(f, "Protected({expr})"),
Expression::Js(_) => write!(f, "Js"),

Check warning on line 38 in src/lambda/expression.rs

View check run for this annotation

Codecov / codecov/patch

src/lambda/expression.rs#L38

Added line #L38 was not covered by tests
}
}
}
Expand Down Expand Up @@ -190,6 +193,10 @@ impl Eval for Expression {
}
Expression::IO(operation) => operation.eval(ctx).await,
Expression::Cache(cached) => cached.eval(ctx).await,
Expression::Js(js) => {
let val = js.call(ctx.value().cloned()).await?;
Ok(val.unwrap_or(async_graphql::Value::Null))

Check warning on line 198 in src/lambda/expression.rs

View check run for this annotation

Codecov / codecov/patch

src/lambda/expression.rs#L196-L198

Added lines #L196 - L198 were not covered by tests
}
}
})
}
Expand Down
Loading

0 comments on commit f8259b7

Please sign in to comment.