From ca101ae73542f0744d5fdace1f249fafa491ac4e Mon Sep 17 00:00:00 2001 From: peacewong Date: Tue, 28 Nov 2023 22:40:19 +0800 Subject: [PATCH] Spark task error should print task log --- .../storage/exception/StorageErrorCode.java | 4 +- .../resultset/ResultSetReaderFactory.java | 21 ++--- .../resultset/ResultSetWriterFactory.java | 19 ++--- .../resultset/StorageResultSetWriter.java | 8 +- .../storage/source/ResultsetFileSource.java | 2 +- .../linkis/storage/utils/FileSystemUtils.java | 8 +- .../executor/ImpalaEngineConnExecutor.scala | 57 ++++++++++---- .../executor/IoEngineConnExecutorTest.java | 77 +++++++++++++++++++ .../io/executor/IoEngineConnExecutorTest.java | 73 ------------------ 9 files changed, 146 insertions(+), 123 deletions(-) create mode 100644 linkis-engineconn-plugins/io_file/src/test/java/executor/IoEngineConnExecutorTest.java delete mode 100644 linkis-engineconn-plugins/io_file/src/test/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutorTest.java diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java index fad0d83a12..308e548f27 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java @@ -23,7 +23,9 @@ public enum StorageErrorCode { FS_NOT_INIT(53001, "please init first"), INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"), - FS_OOM(53002, "OOM occurred while reading the file"); + FS_OOM(53002, "OOM occurred while reading the file"), + + FS_ERROR(53003, "Failed to operation fs"); StorageErrorCode(int errorCode, String message) { this.code = errorCode; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 3047b715a0..5e56b099d7 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -49,7 +49,7 @@ public static ResultSetReader getResultSe return new StorageResultSetReader<>(resultSet, value); } - public static ResultSetReader getResultSetReader(String res) { + public static ResultSetReader getResultSetReader(String res) throws IOException { ResultSetFactory rsFactory = ResultSetFactory.getInstance(); if (rsFactory.isResultSet(res)) { ResultSet resultSet = rsFactory.getResultSet(res); @@ -58,21 +58,12 @@ public static ResultSetReader getResultSetReader(String res) { FsPath resPath = new FsPath(res); ResultSet resultSet = rsFactory.getResultSetByPath(resPath); - try { - FSFactory.getFs(resPath).init(null); - } catch (IOException e) { - logger.warn("ResultSetReaderFactory fs init failed", e); - } - ResultSetReader reader = null; - try { - reader = - ResultSetReaderFactory.getResultSetReader( - resultSet, FSFactory.getFs(resPath).read(resPath)); - } catch (IOException e) { - logger.warn("ResultSetReaderFactory fs read failed", e); - } + Fs fs = FSFactory.getFs(resPath); + fs.init(null); + ResultSetReader reader = + ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath)); if (reader instanceof StorageResultSetReader) { - ((StorageResultSetReader) reader).setFs(FSFactory.getFs(resPath)); + ((StorageResultSetReader) reader).setFs(fs); } return (StorageResultSetReader) reader; } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java index 1abeaf0937..d70319c9bd 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java @@ -51,28 +51,25 @@ org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( public static Record[] getRecordByWriter( org.apache.linkis.common.io.resultset.ResultSetWriter writer, - long limit) { + long limit) + throws IOException { String res = writer.toString(); return getRecordByRes(res, limit); } - public static Record[] getRecordByRes(String res, long limit) { + public static Record[] getRecordByRes(String res, long limit) throws IOException { ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res); int count = 0; List records = new ArrayList<>(); - try { - reader.getMetaData(); - while (reader.hasNext() && count < limit) { - records.add(reader.getRecord()); - count++; - } - } catch (IOException e) { - logger.warn("ResultSetWriter getRecordByRes failed", e); + reader.getMetaData(); + while (reader.hasNext() && count < limit) { + records.add(reader.getRecord()); + count++; } return records.toArray(new Record[0]); } - public static Record getLastRecordByRes(String res) { + public static Record getLastRecordByRes(String res) throws IOException { ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res); Record record = null; try { diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java index 5109ed44df..ea513664bd 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java @@ -24,6 +24,7 @@ import org.apache.linkis.storage.*; import org.apache.linkis.storage.conf.*; import org.apache.linkis.storage.domain.*; +import org.apache.linkis.storage.exception.StorageErrorException; import org.apache.linkis.storage.utils.*; import org.apache.commons.io.IOUtils; @@ -37,6 +38,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.linkis.storage.exception.StorageErrorCode.FS_ERROR; + public class StorageResultSetWriter extends ResultSetWriter { private static final Logger logger = LoggerFactory.getLogger(StorageResultSetWriter.class); @@ -98,8 +101,9 @@ public void createNewFile() { fs.init(null); FileSystemUtils.createNewFile(storePath, proxyUser, true); outputStream = fs.write(storePath, true); - } catch (IOException e) { - logger.warn("StorageResultSetWriter createNewFile failed", e); + } catch (Exception e) { + throw new StorageErrorException( + FS_ERROR.getCode(), "StorageResultSetWriter createNewFile failed", e); } logger.info("Succeed to create a new file:{}", storePath); fileCreated = true; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java index fb064a8f4f..54fd64daad 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java @@ -47,7 +47,7 @@ record -> { if (emptyValue.equals(Dolphin.LINKIS_NULL)) { return ""; } else { - return nullValue; + return emptyValue; } } else if (r instanceof Double) { return StorageUtils.doubleToString((Double) r); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/FileSystemUtils.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/FileSystemUtils.java index 2809c83eec..4c50479637 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/FileSystemUtils.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/FileSystemUtils.java @@ -61,16 +61,12 @@ public static void createNewFile(FsPath filePath, boolean createParentWhenNotExi createNewFile(filePath, StorageUtils.getJvmUser(), createParentWhenNotExists); } - public static void createNewFile( - FsPath filePath, String user, boolean createParentWhenNotExists) { + public static void createNewFile(FsPath filePath, String user, boolean createParentWhenNotExists) + throws Exception { FileSystem fileSystem = (FileSystem) FSFactory.getFsByProxyUser(filePath, user); try { fileSystem.init(null); createNewFileWithFileSystem(fileSystem, filePath, user, createParentWhenNotExists); - } catch (IOException e) { - logger.warn("FileSystemUtils createNewFile failed", e); - } catch (Exception e) { - logger.warn("FileSystemUtils createNewFile failed", e); } finally { IOUtils.closeQuietly(fileSystem); } diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala index 23cd1a0e6f..97613f3f94 100644 --- a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala @@ -17,43 +17,72 @@ package org.apache.linkis.engineplugin.impala.executor -import org.apache.commons.collections.MapUtils -import org.apache.commons.io.IOUtils -import org.apache.commons.lang3.StringUtils -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{OverloadUtils, Utils} -import org.apache.linkis.engineconn.common.password.{CommandPasswordCallback, StaticPasswordCallback} -import org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor, EngineExecutionContext} +import org.apache.linkis.engineconn.common.password.{ + CommandPasswordCallback, + StaticPasswordCallback +} +import org.apache.linkis.engineconn.computation.executor.execute.{ + ConcurrentComputationExecutor, + EngineExecutionContext +} import org.apache.linkis.engineconn.core.EngineConnObject +import org.apache.linkis.engineplugin.impala.client.{ + ExecutionListener, + ImpalaClient, + ImpalaResultSet +} import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row -import org.apache.linkis.engineplugin.impala.client.exception.{ImpalaEngineException, ImpalaErrorCodeSummary} +import org.apache.linkis.engineplugin.impala.client.exception.{ + ImpalaEngineException, + ImpalaErrorCodeSummary +} import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress, ExecStatus} -import org.apache.linkis.engineplugin.impala.client.thrift.{ImpalaThriftClient, ImpalaThriftSessionFactory} -import org.apache.linkis.engineplugin.impala.client.{ExecutionListener, ImpalaClient, ImpalaResultSet} +import org.apache.linkis.engineplugin.impala.client.thrift.{ + ImpalaThriftClient, + ImpalaThriftSessionFactory +} import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._ import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig import org.apache.linkis.governance.common.paser.SQLCodeParser -import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadResource, NodeResource} +import org.apache.linkis.manager.common.entity.resource.{ + CommonNodeResource, + LoadResource, + NodeResource +} import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel} import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.rpc.Sender -import org.apache.linkis.scheduler.executer.{CompletedExecuteResponse, ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse} +import org.apache.linkis.scheduler.executer.{ + CompletedExecuteResponse, + ErrorExecuteResponse, + ExecuteResponse, + SuccessExecuteResponse +} import org.apache.linkis.storage.domain.Column import org.apache.linkis.storage.resultset.ResultSetFactory import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord} + +import org.apache.commons.collections.MapUtils +import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils +import org.apache.commons.lang3.exception.ExceptionUtils + import org.springframework.util.CollectionUtils +import javax.net.SocketFactory +import javax.net.ssl._ +import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback} + import java.io.FileInputStream import java.security.KeyStore import java.util import java.util.concurrent.ConcurrentHashMap import java.util.function.Consumer -import javax.net.SocketFactory -import javax.net.ssl._ -import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback} + import scala.collection.JavaConverters._ class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) diff --git a/linkis-engineconn-plugins/io_file/src/test/java/executor/IoEngineConnExecutorTest.java b/linkis-engineconn-plugins/io_file/src/test/java/executor/IoEngineConnExecutorTest.java new file mode 100644 index 0000000000..252ec95ab5 --- /dev/null +++ b/linkis-engineconn-plugins/io_file/src/test/java/executor/IoEngineConnExecutorTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package executor; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext; +import org.apache.linkis.manager.engineplugin.io.executor.IoEngineConnExecutor; +import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse; +import org.apache.linkis.scheduler.executer.ExecuteResponse; +import org.apache.linkis.storage.domain.MethodEntity; +import org.apache.linkis.storage.domain.MethodEntitySerializer; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +class IoEngineConnExecutorTest { + + @Test + public void testExecuteLine() { + // test init + IoEngineConnExecutor ioEngineConnExecutor = new IoEngineConnExecutor(1, Integer.MAX_VALUE); + EngineExecutionContext engineExecutionContext = + new EngineExecutionContext(ioEngineConnExecutor, "hadoop"); + engineExecutionContext.setJobId("jobId-1"); + Object[] objects = new Object[10]; + MethodEntity methodEntity = + new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "init", objects); + AliasOutputExecuteResponse executeResponse = + (AliasOutputExecuteResponse) + ioEngineConnExecutor.executeLine( + engineExecutionContext, MethodEntitySerializer.serializer(methodEntity)); + Assertions.assertThat(executeResponse).isNotNull(); + Assertions.assertThat(executeResponse.alias()).isEqualTo("0"); + + // test write + String filePath = this.getClass().getResource("/testIoResult.dolphin").getFile().toString(); + FsPath fsPath = new FsPath(filePath); + String fsPathStr = MethodEntitySerializer.serializerJavaObject(fsPath); + objects = new Object[3]; + objects[0] = fsPathStr; + objects[1] = true; + objects[2] = "dolphin000000000300000000040,110000000016aGVsbG8gd29ybGQ="; + methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "write", objects); + ExecuteResponse writeResponse = + ioEngineConnExecutor.executeLine( + engineExecutionContext, MethodEntitySerializer.serializer(methodEntity)); + System.out.println(writeResponse); + Assertions.assertThat(executeResponse).isNotNull(); + + // test read + objects = new Object[1]; + objects[0] = fsPathStr; + methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "read", objects); + AliasOutputExecuteResponse readResponse = + (AliasOutputExecuteResponse) + ioEngineConnExecutor.executeLine( + engineExecutionContext, MethodEntitySerializer.serializer(methodEntity)); + Assertions.assertThat(readResponse).isNotNull(); + Assertions.assertThat(readResponse.output()) + .isEqualTo("dolphin000000000300000000040,110000000016aGVsbG8gd29ybGQ="); + } +} diff --git a/linkis-engineconn-plugins/io_file/src/test/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutorTest.java b/linkis-engineconn-plugins/io_file/src/test/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutorTest.java deleted file mode 100644 index 5dcc883114..0000000000 --- a/linkis-engineconn-plugins/io_file/src/test/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutorTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.manager.engineplugin.io.executor; - -import org.apache.linkis.common.io.FsPath; -import org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor; -import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext; -import org.apache.linkis.manager.engineplugin.io.conf.IOEngineConnConfiguration; -import org.apache.linkis.manager.engineplugin.io.factory.IoEngineConnFactory; -import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse; -import org.apache.linkis.scheduler.executer.ExecuteResponse; -import org.apache.linkis.storage.domain.MethodEntity; -import org.apache.linkis.storage.domain.MethodEntitySerializer; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.Test; - - - -class IoEngineConnExecutorTest { - - @Test - public void testExecuteLine() { - //test init - IoEngineConnExecutor ioEngineConnExecutor = new IoEngineConnExecutor(1, Integer.MAX_VALUE); - EngineExecutionContext engineExecutionContext = new EngineExecutionContext(ioEngineConnExecutor, "hadoop"); - engineExecutionContext.setJobId("jobId-1"); - Object[] objects = new Object[10]; - MethodEntity methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", - "localhost", "init", objects); - AliasOutputExecuteResponse executeResponse = (AliasOutputExecuteResponse)ioEngineConnExecutor.executeLine(engineExecutionContext, MethodEntitySerializer.serializer(methodEntity)); - Assertions.assertThat(executeResponse).isNotNull(); - Assertions.assertThat(executeResponse.alias()).isEqualTo("0"); - - //test write - String filePath = this.getClass().getResource("/testIoResult.dolphin").getFile().toString(); - FsPath fsPath = new FsPath(filePath); - String fsPathStr = MethodEntitySerializer.serializerJavaObject(fsPath); - objects = new Object[3]; - objects[0] = fsPathStr; - objects[1] = true; - objects[2] = "dolphin000000000300000000040,110000000016aGVsbG8gd29ybGQ="; - methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", - "localhost", "write", objects); - ExecuteResponse writeResponse = ioEngineConnExecutor.executeLine(engineExecutionContext, MethodEntitySerializer.serializer(methodEntity)); - System.out.println(writeResponse); - Assertions.assertThat(executeResponse).isNotNull(); - - //test read - objects = new Object[1]; - objects[0] = fsPathStr; - methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", - "localhost", "read", objects); - AliasOutputExecuteResponse readResponse = (AliasOutputExecuteResponse)ioEngineConnExecutor.executeLine(engineExecutionContext, MethodEntitySerializer.serializer(methodEntity)); - Assertions.assertThat(readResponse).isNotNull(); - Assertions.assertThat(readResponse.output()).isEqualTo("dolphin000000000300000000040,110000000016aGVsbG8gd29ybGQ="); - } - -} \ No newline at end of file