Skip to content

Commit

Permalink
apacheGH-20379: [Java] Dataset Failed to update reservation while fre…
Browse files Browse the repository at this point in the history
…eing bytes (apache#40101)

### Rationale for this change
Better controls JNI Thread management in java dataset module to fix apache#20379 
Re-use the same code found in the java arrow-c-data module : https://github.com/apache/arrow/blob/main/java/c/src/main/cpp/jni_wrapper.cc#L107

May JNIEnvGuard class code can be put in a common place for both libraries ...

### What changes are included in this PR?
N/A

### Are these changes tested?
These changes has been tested with :  https://gist.github.com/fb64/71880cde297bc5234b02b68b785670fd 
on Linux X86_64 architecture

### Are there any user-facing changes?
N/A

* Closes: apache#20379

Authored-by: Florian Bernard <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
fb64 authored and thisisnic committed Mar 8, 2024
1 parent 199001e commit 952d87a
Showing 1 changed file with 50 additions and 12 deletions.
62 changes: 50 additions & 12 deletions java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,40 @@ void ThrowIfError(const arrow::Status& status) {
}
}

class JNIEnvGuard {
public:
explicit JNIEnvGuard(JavaVM* vm) : vm_(vm), env_(nullptr), should_detach_(false) {
JNIEnv* env;
jint code = vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION);
if (code == JNI_EDETACHED) {
JavaVMAttachArgs args;
args.version = JNI_VERSION;
args.name = NULL;
args.group = NULL;
code = vm->AttachCurrentThread(reinterpret_cast<void**>(&env), &args);
should_detach_ = (code == JNI_OK);
}
if (code != JNI_OK) {
ThrowPendingException("Failed to attach the current thread to a Java VM");
}
env_ = env;
}

JNIEnv* env() { return env_; }

~JNIEnvGuard() {
if (should_detach_) {
vm_->DetachCurrentThread();
should_detach_ = false;
}
}

private:
JavaVM* vm_;
JNIEnv* env_;
bool should_detach_;
};

template <typename T>
T JniGetOrThrow(arrow::Result<T> result) {
const arrow::Status& status = result.status();
Expand Down Expand Up @@ -126,23 +160,27 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener {
: vm_(vm), java_reservation_listener_(java_reservation_listener) {}

arrow::Status OnReservation(int64_t size) override {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION) != JNI_OK) {
return arrow::Status::Invalid("JNIEnv was not attached to current thread");
try {
JNIEnvGuard guard(vm_);
JNIEnv* env = guard.env();
env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size);
RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
return arrow::Status::OK();
} catch (const JniPendingException& e) {
return arrow::Status::Invalid(e.what());
}
env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size);
RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
return arrow::Status::OK();
}

arrow::Status OnRelease(int64_t size) override {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION) != JNI_OK) {
return arrow::Status::Invalid("JNIEnv was not attached to current thread");
try {
JNIEnvGuard guard(vm_);
JNIEnv* env = guard.env();
env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size);
RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
return arrow::Status::OK();
} catch (const JniPendingException& e) {
return arrow::Status::Invalid(e.what());
}
env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size);
RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
return arrow::Status::OK();
}

jobject GetJavaReservationListener() { return java_reservation_listener_; }
Expand Down

0 comments on commit 952d87a

Please sign in to comment.