Skip to content

Commit

Permalink
feat(driver-adapters): enable Wasm on query-core (#4445)
Browse files Browse the repository at this point in the history
* feat(quaint): allow wasm32-unknown-unknown compilation; currently fails on native

* feat(quaint): split postgres connector into native and wasm submodules

* feat(quaint): split mysql connector into native and wasm submodules

* feat(quaint): recover wasm error for mysql

* feat(quaint): split mssql connector into native and wasm submodules

* feat(quaint): split sqlite connector into native and wasm submodules

* chore(quaint): fix clippy when compiling natively

* chore(quaint): fix clippy when compiling to wasm32-unknown-unknown

* chore(quaint): update README

* chore(quaint): rename "*-connector" feature flag to "*-native"

* feat(quaint): enable pure Wasm SqliteError

* feat(query-connect): allow wasm32-unknown-unknown compilation

* feat(sql-query-connector): allow wasm32-unknown-unknown compilation

* chore(query-engine-wasm): add currently unused local crates to test wasm32-unknown-unknown compilation

* chore: update Cargo.lock

* chore: remove leftover comments

* feat(query-core): allow wasm32-unknown-unknown compilation

* chore(sql-query-connector): fix clipppy on wasm32

* chore: remove leftover comment

* WIP: refactor mysql module to flatten its structure

* feat(quaint): flatten mssql connector module

* feat(quaint): flatten postgres connector module

* feat(quaint): flatten sqlite connector module

* chore(quaint): export all public definitions in connector "url" modules

* chore(quaint): refactor tests for connectors, addressing feedback

* chore: add comment on MysqlAsyncError

* chore: add comment on ffi.rs for sqlite

* chore: replace awkward "super::super::" with "crate::..."

* chore: add comments around "query_core::executor::task"

* chore: move "task" module into its own file

* fix(driver-adapters): ci for "request-handlers"

---------

Co-authored-by: Miguel Fernandez <[email protected]>
  • Loading branch information
jkomyno and miguelff authored Nov 20, 2023
1 parent efbc865 commit ebb702b
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 14 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ once_cell = "1"
qe-setup = { path = "../qe-setup" }
request-handlers = { path = "../../request-handlers" }
tokio.workspace = true
query-core = { path = "../../core" }
query-core = { path = "../../core", features = ["metrics"] }
sql-query-connector = { path = "../../connectors/sql-query-connector" }
query-engine = { path = "../../query-engine"}
psl.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion query-engine/core-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
dissimilar = "1.0.4"
user-facing-errors = { path = "../../libs/user-facing-errors" }
request-handlers = { path = "../request-handlers" }
query-core = { path = "../core" }
query-core = { path = "../core", features = ["metrics"] }
schema = { path = "../schema" }
psl.workspace = true
serde_json.workspace = true
13 changes: 11 additions & 2 deletions query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ edition = "2021"
name = "query-core"
version = "0.1.0"

[features]
metrics = ["query-engine-metrics"]

[dependencies]
async-trait = "0.1"
bigdecimal = "0.3"
Expand All @@ -18,11 +21,11 @@ once_cell = "1"
petgraph = "0.4"
prisma-models = { path = "../prisma-models", features = ["default_generators"] }
opentelemetry = { version = "0.17.0", features = ["rt-tokio", "serialize"] }
query-engine-metrics = {path = "../metrics"}
query-engine-metrics = { path = "../metrics", optional = true }
serde.workspace = true
serde_json.workspace = true
thiserror = "1.0"
tokio.workspace = true
tokio = { version = "1.0", features = ["macros", "time"] }
tracing = { version = "0.1", features = ["attributes"] }
tracing-futures = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand All @@ -34,3 +37,9 @@ schema = { path = "../schema" }
lru = "0.7.7"
enumflags2 = "0.7"

pin-project = "1"
wasm-bindgen-futures = "0.4"

[target.'cfg(target_arch = "wasm32")'.dependencies]
pin-project = "1"
wasm-bindgen-futures = "0.4"
11 changes: 11 additions & 0 deletions query-engine/core/src/executor/execute_operation.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
#![cfg_attr(target_arch = "wasm32", allow(unused_variables))]

use super::pipeline::QueryPipeline;
use crate::{
executor::request_context, protocol::EngineProtocol, CoreError, IrSerializer, Operation, QueryGraph,
QueryGraphBuilder, QueryInterpreter, ResponseData,
};
use connector::{Connection, ConnectionLike, Connector};
use futures::future;

#[cfg(feature = "metrics")]
use query_engine_metrics::{
histogram, increment_counter, metrics, PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, PRISMA_CLIENT_QUERIES_TOTAL,
};

use schema::{QuerySchema, QuerySchemaRef};
use std::time::{Duration, Instant};
use tracing::Instrument;
Expand All @@ -24,6 +29,7 @@ pub async fn execute_single_operation(
let (graph, serializer) = build_graph(&query_schema, operation.clone())?;
let result = execute_on(conn, graph, serializer, query_schema.as_ref(), trace_id).await;

#[cfg(feature = "metrics")]
histogram!(PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, operation_timer.elapsed());

result
Expand All @@ -45,6 +51,8 @@ pub async fn execute_many_operations(
for (i, (graph, serializer)) in queries.into_iter().enumerate() {
let operation_timer = Instant::now();
let result = execute_on(conn, graph, serializer, query_schema.as_ref(), trace_id.clone()).await;

#[cfg(feature = "metrics")]
histogram!(PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, operation_timer.elapsed());

match result {
Expand Down Expand Up @@ -98,6 +106,7 @@ pub async fn execute_many_self_contained<C: Connector + Send + Sync>(

let dispatcher = crate::get_current_dispatcher();
for op in operations {
#[cfg(feature = "metrics")]
increment_counter!(PRISMA_CLIENT_QUERIES_TOTAL);

let conn_span = info_span!(
Expand Down Expand Up @@ -158,6 +167,7 @@ async fn execute_self_contained(
execute_self_contained_without_retry(conn, graph, serializer, force_transactions, &query_schema, trace_id).await
};

#[cfg(feature = "metrics")]
histogram!(PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, operation_timer.elapsed());

result
Expand Down Expand Up @@ -259,6 +269,7 @@ async fn execute_on<'a>(
query_schema: &'a QuerySchema,
trace_id: Option<String>,
) -> crate::Result<ResponseData> {
#[cfg(feature = "metrics")]
increment_counter!(PRISMA_CLIENT_QUERIES_TOTAL);

let interpreter = QueryInterpreter::new(conn);
Expand Down
1 change: 1 addition & 0 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod execute_operation;
mod interpreting_executor;
mod pipeline;
mod request_context;
pub(crate) mod task;

pub use self::{execute_operation::*, interpreting_executor::InterpretingExecutor};

Expand Down
59 changes: 59 additions & 0 deletions query-engine/core/src/executor/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! This module provides a unified interface for spawning asynchronous tasks, regardless of the target platform.
pub use arch::{spawn, JoinHandle};
use futures::Future;

// On native targets, `tokio::spawn` spawns a new asynchronous task.
#[cfg(not(target_arch = "wasm32"))]
mod arch {
use super::*;

pub type JoinHandle<T> = tokio::task::JoinHandle<T>;

pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(future)
}
}

// On Wasm targets, `wasm_bindgen_futures::spawn_local` spawns a new asynchronous task.
#[cfg(target_arch = "wasm32")]
mod arch {
use super::*;
use tokio::sync::oneshot::{self};

// Wasm-compatible alternative to `tokio::task::JoinHandle<T>`.
// `pin_project` enables pin-projection and a `Pin`-compatible implementation of the `Future` trait.
pub struct JoinHandle<T>(oneshot::Receiver<T>);

impl<T> Future for JoinHandle<T> {
type Output = Result<T, oneshot::error::RecvError>;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
// the `self.project()` method is provided by the `pin_project` macro
core::pin::Pin::new(&mut self.0).poll(cx)
}
}

impl<T> JoinHandle<T> {
pub fn abort(&mut self) {
// abort is noop on Wasm targets
}
}

pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (sender, receiver) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let result = future.await;
sender.send(result).ok();
});
JoinHandle(receiver)
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::executor::task::JoinHandle;
use crate::{protocol::EngineProtocol, ClosedTx, Operation, ResponseData};
use connector::Connection;
use lru::LruCache;
Expand All @@ -9,7 +10,6 @@ use tokio::{
mpsc::{channel, Sender},
RwLock,
},
task::JoinHandle,
time::Duration,
};

Expand Down
15 changes: 10 additions & 5 deletions query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse};
use crate::executor::task::{spawn, JoinHandle};
use crate::{
execute_many_operations, execute_single_operation, protocol::EngineProtocol,
telemetry::helpers::set_span_link_from_traceparent, ClosedTx, Operation, ResponseData, TxId,
execute_many_operations, execute_single_operation, protocol::EngineProtocol, ClosedTx, Operation, ResponseData,
TxId,
};
use connector::Connection;
use schema::QuerySchemaRef;
Expand All @@ -11,13 +12,15 @@ use tokio::{
mpsc::{channel, Receiver, Sender},
oneshot, RwLock,
},
task::JoinHandle,
time::{self, Duration, Instant},
};
use tracing::Span;
use tracing_futures::Instrument;
use tracing_futures::WithSubscriber;

#[cfg(feature = "metrics")]
use crate::telemetry::helpers::set_span_link_from_traceparent;

#[derive(PartialEq)]
enum RunState {
Continue,
Expand Down Expand Up @@ -81,6 +84,8 @@ impl<'a> ITXServer<'a> {
traceparent: Option<String>,
) -> crate::Result<ResponseData> {
let span = info_span!("prisma:engine:itx_query_builder", user_facing = true);

#[cfg(feature = "metrics")]
set_span_link_from_traceparent(&span, traceparent.clone());

let conn = self.cached_tx.as_open()?;
Expand Down Expand Up @@ -267,7 +272,7 @@ pub(crate) async fn spawn_itx_actor(
};
let (open_transaction_send, open_transaction_rcv) = oneshot::channel();

tokio::task::spawn(
spawn(
crate::executor::with_request_context(engine_protocol, async move {
// We match on the result in order to send the error to the parent task and abort this
// task, on error. This is a separate task (actor), not a function where we can just bubble up the
Expand Down Expand Up @@ -380,7 +385,7 @@ pub(crate) fn spawn_client_list_clear_actor(
closed_txs: Arc<RwLock<lru::LruCache<TxId, Option<ClosedTx>>>>,
mut rx: Receiver<(TxId, Option<ClosedTx>)>,
) -> JoinHandle<()> {
tokio::task::spawn(async move {
spawn(async move {
loop {
if let Some((id, closed_tx)) = rx.recv().await {
trace!("removing {} from client list", id);
Expand Down
7 changes: 6 additions & 1 deletion query-engine/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,20 @@ pub mod protocol;
pub mod query_document;
pub mod query_graph_builder;
pub mod response_ir;

#[cfg(feature = "metrics")]
pub mod telemetry;

pub use self::{
error::{CoreError, FieldConversionError},
executor::{QueryExecutor, TransactionOptions},
interactive_transactions::{ExtendedTransactionUserFacingError, TransactionError, TxId},
query_document::*,
telemetry::*,
};

#[cfg(feature = "metrics")]
pub use self::telemetry::*;

pub use connector::{
error::{ConnectorError, ErrorKind as ConnectorErrorKind},
Connector,
Expand Down
2 changes: 1 addition & 1 deletion query-engine/query-engine-node-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ driver-adapters = ["request-handlers/driver-adapters", "sql-connector/driver-ada
[dependencies]
anyhow = "1"
async-trait = "0.1"
query-core = { path = "../core" }
query-core = { path = "../core", features = ["metrics"] }
request-handlers = { path = "../request-handlers" }
query-connector = { path = "../connectors/query-connector" }
user-facing-errors = { path = "../../libs/user-facing-errors" }
Expand Down
1 change: 1 addition & 0 deletions query-engine/query-engine-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ prisma-models = { path = "../prisma-models" }
quaint = { path = "../../quaint" }
connector = { path = "../connectors/query-connector", package = "query-connector" }
sql-query-connector = { path = "../connectors/sql-query-connector" }
query-core = { path = "../core" }

thiserror = "1"
connection-string.workspace = true
Expand Down
Loading

0 comments on commit ebb702b

Please sign in to comment.