Skip to content

Commit

Permalink
Fix repl get task result bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Dec 12, 2023
1 parent 4c17598 commit f126d9a
Showing 1 changed file with 30 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.linkis.engineplugin.repl.executor;

import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.common.utils.OverloadUtils;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
Expand All @@ -36,15 +39,23 @@
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
import org.apache.linkis.storage.LineMetaData;
import org.apache.linkis.storage.LineRecord;
import org.apache.linkis.storage.resultset.ResultSetFactory;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;

import org.springframework.util.CollectionUtils;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -132,14 +143,33 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext,

threadCache.put(taskId, Thread.currentThread());

ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024);
PrintStream cacheStream = new PrintStream(outputStream);
PrintStream oldStream = System.out;
System.setOut(cacheStream);

try {
replAdapter.executorCode(realCode, classpathDir, methodName);
} catch (Exception e) {
String errorMessage = ExceptionUtils.getStackTrace(e);
logger.error("Repl engine execute failed : {}", errorMessage);
engineExecutorContext.appendStdout(LogUtils.generateERROR(errorMessage));
return new ErrorExecuteResponse(errorMessage, null);
}

String message = outputStream.toString();
System.setOut(oldStream);
System.out.println(message);
ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE);
try {
resultSetWriter.addMetaData(new LineMetaData());
resultSetWriter.addRecord(new LineRecord(message));
} catch (IOException e) {
logger.error("Failed to get the task result");
} finally {
IOUtils.closeQuietly(resultSetWriter);
}
return new SuccessExecuteResponse();
}

Expand Down

0 comments on commit f126d9a

Please sign in to comment.