Skip to content

Commit

Permalink
Catch R exceptions in Online
Browse files Browse the repository at this point in the history
  • Loading branch information
Kopilov committed Aug 12, 2021
1 parent 56bd7b6 commit 6b884a1
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 20 deletions.
8 changes: 4 additions & 4 deletions RCaller/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ description = 'RCaller is a software library which simplifies performing data an

dependencies {
implementation 'org.apache.commons:commons-lang3:3.12.0'
compileOnly 'org.apache.arrow:arrow-vector:[3.0.0,)'
compileOnly 'org.apache.arrow:arrow-vector:[5.0.0,)'

testImplementation 'junit:junit:4.13.1'
testImplementation 'org.apache.arrow:arrow-vector:[3.0.0,)'
testImplementation 'org.apache.arrow:arrow-memory-netty:[3.0.0,)'
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.apache.arrow:arrow-vector:[5.0.0,)'
testImplementation 'org.apache.arrow:arrow-memory-netty:[5.0.0,)'
}

java.sourceCompatibility = JavaVersion.VERSION_11
Expand Down
6 changes: 3 additions & 3 deletions RCaller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -75,13 +75,13 @@
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>[3.0.0,)</version>
<version>[5.0.0,)</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>[3.0.0,)</version>
<version>[5.0.0,)</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ public long[] getAsLongArray(String name) {
}
}

@Override
public double[][] getAsDoubleMatrix(String name) {
var vector = findVector(name);
if (vector instanceof FixedSizeListVector || vector instanceof ListVector) {
Expand Down
22 changes: 17 additions & 5 deletions RCaller/src/main/java/com/github/rcaller/rstuff/RCaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.io.*;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.logging.Level;
Expand Down Expand Up @@ -288,7 +290,7 @@ public void runAndReturnResultOnline(String var) throws ExecutionException {
logger.log(Level.INFO, "Retrying online R execution");
}

File outputFile, resultReadyControlFile;
File outputFile, errorFile, resultReadyControlFile;

if (rCallerOptions.getrExecutable() == null) {
if (handleRFailure("RExecutable is not defined.Please set this" + " variable to full path of R executable binary file.")) {
Expand All @@ -298,6 +300,7 @@ public void runAndReturnResultOnline(String var) throws ExecutionException {

try {
outputFile = tempFileService.createOutputFile();
errorFile = tempFileService.createControlFile();
resultReadyControlFile = tempFileService.createControlFile();
} catch (Exception e) {
if (handleRFailure("Can not create a temporary file for storing the R results: " + e.getMessage())) {
Expand All @@ -308,7 +311,6 @@ public void runAndReturnResultOnline(String var) throws ExecutionException {
}

rCode.appendStandardCodeToAppend(outputFile, var);
rCode.appendEndSignalCode(resultReadyControlFile);
if (rInput == null || rOutput == null || rError == null || process == null) {
try {
startOnlineProcess();
Expand All @@ -321,7 +323,8 @@ public void runAndReturnResultOnline(String var) throws ExecutionException {
}

try {
rInput.write(rCode.toString().getBytes(Globals.standardCharset));
var script = rCode.toTryCatchScript(errorFile) + rCode.createEndSignalCode(resultReadyControlFile);
rInput.write(script.getBytes(Globals.standardCharset));
rInput.flush();
} catch (IOException e) {
rInput = null;
Expand All @@ -337,12 +340,21 @@ public void runAndReturnResultOnline(String var) throws ExecutionException {
}

// an error might occur before any output is written
if (!isProcessAlive() && errorMessageSaver.getMessage().length() > 0) {
if (handleRFailure("R stderr: " + errorMessageSaver.getMessage())) {
if (!isProcessAlive()) {
if (handleRFailure("R process died, stderr: " + errorMessageSaver.getMessage())) {
continue;
}
}

if (errorFile.length() > 0) {
parser.setIPCResource(errorFile.toURI());
parser.parse();
String errorMessage = parser.getAsStringArray("exception")[0];
String stack = String.join("\n----\n",List.of(parser.getAsStringArray("stacktrace")));

throw new ExecutionException("R code throw an error:\n" + errorMessage + "\nDetailed stack:\n----\n" + stack);
}

parser.setIPCResource(outputFile.toURI());

try {
Expand Down
38 changes: 36 additions & 2 deletions RCaller/src/main/java/com/github/rcaller/rstuff/RCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,21 @@ public final void clear() {
addRCode(RCodeIO.getInterprocessDependencies(rCallerOptions));
}

/**
* Adding R code that exports toplevel var to outputFile for reading by Java. Internal use only.
* @param outputFile file to be created
* @param var name of variable to be exported
*/
public void appendStandardCodeToAppend(File outputFile, String var) {
addRCode(RCodeIO.getVariableExporting(rCallerOptions, var, URI.create(outputFile.getAbsolutePath())));
}

static String createEndSignalCode(File outputFile) {
return ("cat(1, file=\"" + outputFile.getPath().replace("\\", "/") + "\")\n");
}

public void appendEndSignalCode(File outputFile) {
addRCode("cat(1, file=\"" + outputFile.getPath().replace("\\", "/") + "\")\n");
addRCode(createEndSignalCode(outputFile));
}

public void clearOnline(){
Expand Down Expand Up @@ -245,7 +254,7 @@ public void R_source(String sourceFile) {
}

public void deleteTempFiles(){
if(tempFileService != null){
if (tempFileService != null){
tempFileService.deleteRCallerTempFiles();
}
}
Expand All @@ -254,4 +263,29 @@ public void deleteTempFiles(){
public String toString() {
return this.code.toString();
}

/**
* Wrap current code to standard tryCatch function.
* Error handler saves details to errorOutputFile if the error occurs.
* @param errorOutputFile
* @return
*/
String toTryCatchScript(File errorOutputFile) {
//Using code snippet "An improved “error handler”" with withCallingHandlers nested in tryCatch
//from https://cran.r-project.org/web/packages/tryCatchLog/vignettes/tryCatchLog-intro.html

var script = new StringBuilder("tryCatch( withCallingHandlers({\n");
script.append(this.code);

script.append("}, error=function(e) {\n" +
" stacktrace <- as.character(sys.calls())\n" +
" exception <- as.character(e)\n" +
" caught <- list()\n" +
" caught[[1]] <- exception\n" +
" caught[[2]] <- stacktrace[6:(length(stacktrace)-2)]\n" + //remove wrapping steps
" names(caught) <- c(\"exception\", \"stacktrace\")\n");
script.append(RCodeIO.getVariableExporting(rCallerOptions, "caught", URI.create(errorOutputFile.getAbsolutePath()))).append("\n");
script.append("}), error = function(e) { })\n");
return script.toString();
}
}
10 changes: 5 additions & 5 deletions RCaller/src/main/resources/arrow_bridge.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ require("arrow")
send_element_by_arrow <- function(obj, name, stream) {
if (is.data.frame(obj)) {
#Export by Arrow as is
write_ipc_stream(obj, stream)
arrow::write_ipc_stream(obj, stream)
return()
}
if (is.array(obj)) {
Expand All @@ -22,7 +22,7 @@ send_element_by_arrow <- function(obj, name, stream) {
schema_fixed_size <- schema(matrix_column=fixed_size_list_of(type = batch_typedetect$schema$fields[[1]]$type$value_type, columns))
batch <- record_batch(matrix_column=matrix_jagged_fixedsize, schema=schema_fixed_size)
names(batch) <- name
write_ipc_stream(batch, stream)
arrow::write_ipc_stream(batch, stream)
return()
}
if (length(dim(obj)) > 2) {
Expand All @@ -40,7 +40,7 @@ send_element_by_arrow <- function(obj, name, stream) {
#Union typed and nested lists might not work
batch <- record_batch(list_column=obj)
names(batch) <- name
write_ipc_stream(batch, stream)
arrow::write_ipc_stream(batch, stream)
return()
} else if (length(names(obj)) > 0) {
#Export each field separatly
Expand Down Expand Up @@ -68,15 +68,15 @@ send_element_by_arrow <- function(obj, name, stream) {
#export filled vector with auto type detect
batch <- record_batch(vector_column=obj)
names(batch) <- name
write_ipc_stream(batch, stream)
arrow::write_ipc_stream(batch, stream)
return()
} else if (length(obj) == 0) {
#export empty element
obj <- c(1)
type_example_batch <- record_batch(empty_column=obj)
length(obj) <- 0
empty_batch <- record_batch(empty_column=obj, schema=type_example_batch$schema)
write_ipc_stream(empty_batch, stream)
arrow::write_ipc_stream(empty_batch, stream)
return()
# } else {
# stop("Probably unsupported output")
Expand Down
10 changes: 9 additions & 1 deletion RCaller/src/test/java/com/github/rcaller/RunOnlineTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ public void stopAsyncTest() {
}
}

@Test(expected = ExecutionException.class)
public void exceptionCatchTest() {
RCaller rcaller = RCaller.create();
RCode code = rcaller.getRCode();
code.addRCode("a <- log(\"not a number\")");
rcaller.runAndReturnResultOnline("a");
}

@Test
public void rHaltedTest() {
System.out.println("R HALTED TEST");
Expand All @@ -226,7 +234,7 @@ public void rHaltedTest() {
rcaller.runAndReturnResultOnline("a");
} catch (ExecutionException ex) {
Logger.getLogger(RunOnlineTest.class.getName()).log(Level.SEVERE, ex.getMessage());
if (ex.getMessage().contains("R stderr:")) {
if (ex.getMessage().contains("R code throw an error:")) {
exceptionThrown = true;
}
rcaller.stopRCallerOnline();
Expand Down

0 comments on commit 6b884a1

Please sign in to comment.