Skip to content

Commit

Permalink
Merge pull request #4996 from WeDataSphere/master-ec-conf
Browse files Browse the repository at this point in the history
[Bug][1.5.0] Spark scala task error should print task log
  • Loading branch information
casionone authored Nov 29, 2023
2 parents ce0bf43 + a1bf43f commit 03540ea
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public enum StorageErrorCode {
FS_NOT_INIT(53001, "please init first"),

INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"),
FS_OOM(53002, "OOM occurred while reading the file");
FS_OOM(53002, "OOM occurred while reading the file"),

FS_ERROR(53003, "Failed to operation fs");

StorageErrorCode(int errorCode, String message) {
this.code = errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <K extends MetaData, V extends Record> ResultSetReader getResultSe
return new StorageResultSetReader<>(resultSet, value);
}

public static ResultSetReader getResultSetReader(String res) {
public static ResultSetReader getResultSetReader(String res) throws IOException {
ResultSetFactory rsFactory = ResultSetFactory.getInstance();
if (rsFactory.isResultSet(res)) {
ResultSet<? extends MetaData, ? extends Record> resultSet = rsFactory.getResultSet(res);
Expand All @@ -58,21 +58,12 @@ public static ResultSetReader getResultSetReader(String res) {
FsPath resPath = new FsPath(res);
ResultSet<? extends MetaData, ? extends Record> resultSet =
rsFactory.getResultSetByPath(resPath);
try {
FSFactory.getFs(resPath).init(null);
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs init failed", e);
}
ResultSetReader reader = null;
try {
reader =
ResultSetReaderFactory.getResultSetReader(
resultSet, FSFactory.getFs(resPath).read(resPath));
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs read failed", e);
}
Fs fs = FSFactory.getFs(resPath);
fs.init(null);
ResultSetReader reader =
ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath));
if (reader instanceof StorageResultSetReader) {
((StorageResultSetReader<?, ?>) reader).setFs(FSFactory.getFs(resPath));
((StorageResultSetReader<?, ?>) reader).setFs(fs);
}
return (StorageResultSetReader<?, ?>) reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,25 @@ org.apache.linkis.common.io.resultset.ResultSetWriter<K, V> getResultSetWriter(
public static Record[] getRecordByWriter(
org.apache.linkis.common.io.resultset.ResultSetWriter<? extends MetaData, ? extends Record>
writer,
long limit) {
long limit)
throws IOException {
String res = writer.toString();
return getRecordByRes(res, limit);
}

public static Record[] getRecordByRes(String res, long limit) {
public static Record[] getRecordByRes(String res, long limit) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
int count = 0;
List<Record> records = new ArrayList<>();
try {
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
} catch (IOException e) {
logger.warn("ResultSetWriter getRecordByRes failed", e);
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
return records.toArray(new Record[0]);
}

public static Record getLastRecordByRes(String res) {
public static Record getLastRecordByRes(String res) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
Record record = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.linkis.storage.*;
import org.apache.linkis.storage.conf.*;
import org.apache.linkis.storage.domain.*;
import org.apache.linkis.storage.exception.StorageErrorException;
import org.apache.linkis.storage.utils.*;

import org.apache.commons.io.IOUtils;
Expand All @@ -37,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.linkis.storage.exception.StorageErrorCode.FS_ERROR;

public class StorageResultSetWriter<K extends MetaData, V extends Record>
extends ResultSetWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(StorageResultSetWriter.class);
Expand Down Expand Up @@ -98,8 +101,9 @@ public void createNewFile() {
fs.init(null);
FileSystemUtils.createNewFile(storePath, proxyUser, true);
outputStream = fs.write(storePath, true);
} catch (IOException e) {
logger.warn("StorageResultSetWriter createNewFile failed", e);
} catch (Exception e) {
throw new StorageErrorException(
FS_ERROR.getCode(), "StorageResultSetWriter createNewFile failed", e);
}
logger.info("Succeed to create a new file:{}", storePath);
fileCreated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ record -> {
if (emptyValue.equals(Dolphin.LINKIS_NULL)) {
return "";
} else {
return nullValue;
return emptyValue;
}
} else if (r instanceof Double) {
return StorageUtils.doubleToString((Double) r);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ public static void createNewFile(FsPath filePath, boolean createParentWhenNotExi
createNewFile(filePath, StorageUtils.getJvmUser(), createParentWhenNotExists);
}

public static void createNewFile(
FsPath filePath, String user, boolean createParentWhenNotExists) {
public static void createNewFile(FsPath filePath, String user, boolean createParentWhenNotExists)
throws Exception {
FileSystem fileSystem = (FileSystem) FSFactory.getFsByProxyUser(filePath, user);
try {
fileSystem.init(null);
createNewFileWithFileSystem(fileSystem, filePath, user, createParentWhenNotExists);
} catch (IOException e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} catch (Exception e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} finally {
IOUtils.closeQuietly(fileSystem);
}
Expand Down
4 changes: 2 additions & 2 deletions linkis-dist/bin/checkEnv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

source ~/.bash_profile
shellDir=`dirname $0`
workDir=`cd ${shellDir}/..;pwd`
source ${workDir}/bin/common.sh
source ${workDir}/deploy-config/linkis-env.sh
source ${workDir}/deploy-config/db.sh

source
say() {
printf 'check command fail \n %s\n' "$1"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,72 @@

package org.apache.linkis.engineplugin.impala.executor

import org.apache.commons.collections.MapUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
import org.apache.linkis.engineconn.common.password.{CommandPasswordCallback, StaticPasswordCallback}
import org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor, EngineExecutionContext}
import org.apache.linkis.engineconn.common.password.{
CommandPasswordCallback,
StaticPasswordCallback
}
import org.apache.linkis.engineconn.computation.executor.execute.{
ConcurrentComputationExecutor,
EngineExecutionContext
}
import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineplugin.impala.client.{
ExecutionListener,
ImpalaClient,
ImpalaResultSet
}
import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row
import org.apache.linkis.engineplugin.impala.client.exception.{ImpalaEngineException, ImpalaErrorCodeSummary}
import org.apache.linkis.engineplugin.impala.client.exception.{
ImpalaEngineException,
ImpalaErrorCodeSummary
}
import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress, ExecStatus}
import org.apache.linkis.engineplugin.impala.client.thrift.{ImpalaThriftClient, ImpalaThriftSessionFactory}
import org.apache.linkis.engineplugin.impala.client.{ExecutionListener, ImpalaClient, ImpalaResultSet}
import org.apache.linkis.engineplugin.impala.client.thrift.{
ImpalaThriftClient,
ImpalaThriftSessionFactory
}
import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._
import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadResource, NodeResource}
import org.apache.linkis.manager.common.entity.resource.{
CommonNodeResource,
LoadResource,
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.executer.{CompletedExecuteResponse, ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
import org.apache.linkis.scheduler.executer.{
CompletedExecuteResponse,
ErrorExecuteResponse,
ExecuteResponse,
SuccessExecuteResponse
}
import org.apache.linkis.storage.domain.Column
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}

import org.apache.commons.collections.MapUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils

import org.springframework.util.CollectionUtils

import javax.net.SocketFactory
import javax.net.ssl._
import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}

import java.io.FileInputStream
import java.security.KeyStore
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
import javax.net.SocketFactory
import javax.net.ssl._
import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}

import scala.collection.JavaConverters._

class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package executor;

import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.manager.engineplugin.io.executor.IoEngineConnExecutor;
import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.storage.domain.MethodEntity;
import org.apache.linkis.storage.domain.MethodEntitySerializer;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class IoEngineConnExecutorTest {

@Test
public void testExecuteLine() {
// test init
IoEngineConnExecutor ioEngineConnExecutor = new IoEngineConnExecutor(1, Integer.MAX_VALUE);
EngineExecutionContext engineExecutionContext =
new EngineExecutionContext(ioEngineConnExecutor, "hadoop");
engineExecutionContext.setJobId("jobId-1");
Object[] objects = new Object[10];
MethodEntity methodEntity =
new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "init", objects);
AliasOutputExecuteResponse executeResponse =
(AliasOutputExecuteResponse)
ioEngineConnExecutor.executeLine(
engineExecutionContext, MethodEntitySerializer.serializer(methodEntity));
Assertions.assertThat(executeResponse).isNotNull();
Assertions.assertThat(executeResponse.alias()).isEqualTo("0");

// test write
String filePath = this.getClass().getResource("/testIoResult.dolphin").getFile().toString();
FsPath fsPath = new FsPath(filePath);
String fsPathStr = MethodEntitySerializer.serializerJavaObject(fsPath);
objects = new Object[3];
objects[0] = fsPathStr;
objects[1] = true;
objects[2] = "dolphin000000000300000000040,110000000016aGVsbG8gd29ybGQ=";
methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "write", objects);
ExecuteResponse writeResponse =
ioEngineConnExecutor.executeLine(
engineExecutionContext, MethodEntitySerializer.serializer(methodEntity));
System.out.println(writeResponse);
Assertions.assertThat(executeResponse).isNotNull();

// test read
objects = new Object[1];
objects[0] = fsPathStr;
methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "read", objects);
AliasOutputExecuteResponse readResponse =
(AliasOutputExecuteResponse)
ioEngineConnExecutor.executeLine(
engineExecutionContext, MethodEntitySerializer.serializer(methodEntity));
Assertions.assertThat(readResponse).isNotNull();
Assertions.assertThat(readResponse.output())
.isEqualTo("dolphin000000000300000000040,110000000016aGVsbG8gd29ybGQ=");
}
}

This file was deleted.

Loading

0 comments on commit 03540ea

Please sign in to comment.