diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/mod.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/mod.rs index 0714015efd06..252ba620b5ff 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/mod.rs +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/mod.rs @@ -27,3 +27,4 @@ mod prisma_7010; mod prisma_7072; mod prisma_7434; mod prisma_8265; +mod prisma_engines_4286; diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_engines_4286.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_engines_4286.rs new file mode 100644 index 000000000000..313a29cdacf4 --- /dev/null +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_engines_4286.rs @@ -0,0 +1,24 @@ +use query_engine_tests::*; + +#[test_suite(schema(generic), only(Sqlite("libsql.js")))] +mod sqlite { + #[connector_test] + async fn close_tx_on_error(runner: Runner) -> TestResult<()> { + // Try to open a transaction with unsupported isolation error in SQLite. + let result = runner.start_tx(2000, 5000, Some("ReadUncommitted".to_owned())).await; + assert!(result.is_err()); + + // Without the changes from https://github.com/prisma/prisma-engines/pull/4286 or + // https://github.com/prisma/prisma-engines/pull/4489 this second `start_tx` call will + // either hang infinitely with libSQL driver adapter, or fail with a "cannot start a + // transaction within a transaction" error. + // A more future proof way to check this would be to make both transactions EXCLUSIVE or + // IMMEDIATE if we had control over SQLite transaction type here, as that would not rely on + // both transactions using the same connection if we were to pool multiple SQLite + // connections in the future. + let tx = runner.start_tx(2000, 5000, None).await?; + runner.rollback_tx(tx).await?.unwrap(); + + Ok(()) + } +} diff --git a/query-engine/driver-adapters/src/async_js_function.rs b/query-engine/driver-adapters/src/async_js_function.rs index 5f535334ffb9..4926534f58b1 100644 --- a/query-engine/driver-adapters/src/async_js_function.rs +++ b/query-engine/driver-adapters/src/async_js_function.rs @@ -55,6 +55,10 @@ where .map_err(into_quaint_error)?; js_result.into() } + + pub(crate) fn as_raw(&self) -> &ThreadsafeFunction<ArgType, ErrorStrategy::Fatal> { + &self.threadsafe_fn + } } impl<ArgType, ReturnType> FromNapiValue for AsyncJsFunction<ArgType, ReturnType> diff --git a/query-engine/driver-adapters/src/proxy.rs b/query-engine/driver-adapters/src/proxy.rs index 642c2491757a..19693453988e 100644 --- a/query-engine/driver-adapters/src/proxy.rs +++ b/query-engine/driver-adapters/src/proxy.rs @@ -1,12 +1,12 @@ use std::borrow::Cow; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use crate::async_js_function::AsyncJsFunction; use crate::conversion::JSArg; use crate::transaction::JsTransaction; use metrics::increment_gauge; use napi::bindgen_prelude::{FromNapiValue, ToNapiValue}; -use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction}; use napi::{JsObject, JsString}; use napi_derive::napi; use quaint::connector::ResultSet as QuaintResultSet; @@ -52,9 +52,8 @@ pub(crate) struct TransactionProxy { /// rollback transaction rollback: AsyncJsFunction<(), ()>, - /// dispose transaction, cleanup logic executed at the end of the transaction lifecycle - /// on drop. - dispose: ThreadsafeFunction<(), ErrorStrategy::Fatal>, + /// whether the transaction has already been committed or rolled back + closed: AtomicBool, } /// This result set is more convenient to be manipulated from both Rust and NodeJS. @@ -581,14 +580,13 @@ impl TransactionProxy { pub fn new(js_transaction: &JsObject) -> napi::Result<Self> { let commit = js_transaction.get_named_property("commit")?; let rollback = js_transaction.get_named_property("rollback")?; - let dispose = js_transaction.get_named_property("dispose")?; let options = js_transaction.get_named_property("options")?; Ok(Self { commit, rollback, - dispose, options, + closed: AtomicBool::new(false), }) } @@ -596,19 +594,56 @@ impl TransactionProxy { &self.options } + /// Commits the transaction via the driver adapter. + /// + /// ## Cancellation safety + /// + /// The future is cancellation-safe as long as the underlying Node-API call + /// is cancellation-safe and no new await points are introduced between storing true in + /// [`TransactionProxy::closed`] and calling the underlying JS function. + /// + /// - If `commit` is called but never polled or awaited, it's a no-op, the transaction won't be + /// committed and [`TransactionProxy::closed`] will not be changed. + /// + /// - If it is polled at least once, `true` will be stored in [`TransactionProxy::closed`] and + /// the underlying FFI call will be delivered to JavaScript side in lockstep, so the destructor + /// will not attempt rolling the transaction back even if the `commit` future was dropped while + /// waiting on the JavaScript call to complete and deliver response. pub async fn commit(&self) -> quaint::Result<()> { + self.closed.store(true, Ordering::Relaxed); self.commit.call(()).await } + /// Rolls back the transaction via the driver adapter. + /// + /// ## Cancellation safety + /// + /// The future is cancellation-safe as long as the underlying Node-API call + /// is cancellation-safe and no new await points are introduced between storing true in + /// [`TransactionProxy::closed`] and calling the underlying JS function. + /// + /// - If `rollback` is called but never polled or awaited, it's a no-op, the transaction won't be + /// rolled back yet and [`TransactionProxy::closed`] will not be changed. + /// + /// - If it is polled at least once, `true` will be stored in [`TransactionProxy::closed`] and + /// the underlying FFI call will be delivered to JavaScript side in lockstep, so the destructor + /// will not attempt rolling back again even if the `rollback` future was dropped while waiting + /// on the JavaScript call to complete and deliver response. pub async fn rollback(&self) -> quaint::Result<()> { + self.closed.store(true, Ordering::Relaxed); self.rollback.call(()).await } } impl Drop for TransactionProxy { fn drop(&mut self) { + if self.closed.swap(true, Ordering::Relaxed) { + return; + } + _ = self - .dispose + .rollback + .as_raw() .call((), napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking); } }