diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index d2d976677bd6b..19a43c8d2fa41 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -83,6 +83,40 @@ void ThrowIfError(const arrow::Status& status) { } } +class JNIEnvGuard { + public: + explicit JNIEnvGuard(JavaVM* vm) : vm_(vm), env_(nullptr), should_detach_(false) { + JNIEnv* env; + jint code = vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); + if (code == JNI_EDETACHED) { + JavaVMAttachArgs args; + args.version = JNI_VERSION; + args.name = NULL; + args.group = NULL; + code = vm->AttachCurrentThread(reinterpret_cast(&env), &args); + should_detach_ = (code == JNI_OK); + } + if (code != JNI_OK) { + ThrowPendingException("Failed to attach the current thread to a Java VM"); + } + env_ = env; + } + + JNIEnv* env() { return env_; } + + ~JNIEnvGuard() { + if (should_detach_) { + vm_->DetachCurrentThread(); + should_detach_ = false; + } + } + + private: + JavaVM* vm_; + JNIEnv* env_; + bool should_detach_; +}; + template T JniGetOrThrow(arrow::Result result) { const arrow::Status& status = result.status(); @@ -126,23 +160,27 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener { : vm_(vm), java_reservation_listener_(java_reservation_listener) {} arrow::Status OnReservation(int64_t size) override { - JNIEnv* env; - if (vm_->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { - return arrow::Status::Invalid("JNIEnv was not attached to current thread"); + try { + JNIEnvGuard guard(vm_); + JNIEnv* env = guard.env(); + env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size); + RETURN_NOT_OK(arrow::dataset::jni::CheckException(env)); + return arrow::Status::OK(); + } catch (const JniPendingException& e) { + return arrow::Status::Invalid(e.what()); } - env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size); - RETURN_NOT_OK(arrow::dataset::jni::CheckException(env)); - return arrow::Status::OK(); } arrow::Status OnRelease(int64_t size) override { - JNIEnv* env; - if (vm_->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { - return arrow::Status::Invalid("JNIEnv was not attached to current thread"); + try { + JNIEnvGuard guard(vm_); + JNIEnv* env = guard.env(); + env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size); + RETURN_NOT_OK(arrow::dataset::jni::CheckException(env)); + return arrow::Status::OK(); + } catch (const JniPendingException& e) { + return arrow::Status::Invalid(e.what()); } - env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size); - RETURN_NOT_OK(arrow::dataset::jni::CheckException(env)); - return arrow::Status::OK(); } jobject GetJavaReservationListener() { return java_reservation_listener_; }