Skip to content

Commit

Permalink
Spark task error should print task log
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Nov 28, 2023
1 parent 9e35c2b commit ca101ae
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public enum StorageErrorCode {
FS_NOT_INIT(53001, "please init first"),

INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"),
FS_OOM(53002, "OOM occurred while reading the file");
FS_OOM(53002, "OOM occurred while reading the file"),

FS_ERROR(53003, "Failed to operation fs");

StorageErrorCode(int errorCode, String message) {
this.code = errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <K extends MetaData, V extends Record> ResultSetReader getResultSe
return new StorageResultSetReader<>(resultSet, value);
}

public static ResultSetReader getResultSetReader(String res) {
public static ResultSetReader getResultSetReader(String res) throws IOException {
ResultSetFactory rsFactory = ResultSetFactory.getInstance();
if (rsFactory.isResultSet(res)) {
ResultSet<? extends MetaData, ? extends Record> resultSet = rsFactory.getResultSet(res);
Expand All @@ -58,21 +58,12 @@ public static ResultSetReader getResultSetReader(String res) {
FsPath resPath = new FsPath(res);
ResultSet<? extends MetaData, ? extends Record> resultSet =
rsFactory.getResultSetByPath(resPath);
try {
FSFactory.getFs(resPath).init(null);
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs init failed", e);
}
ResultSetReader reader = null;
try {
reader =
ResultSetReaderFactory.getResultSetReader(
resultSet, FSFactory.getFs(resPath).read(resPath));
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs read failed", e);
}
Fs fs = FSFactory.getFs(resPath);
fs.init(null);
ResultSetReader reader =
ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath));
if (reader instanceof StorageResultSetReader) {
((StorageResultSetReader<?, ?>) reader).setFs(FSFactory.getFs(resPath));
((StorageResultSetReader<?, ?>) reader).setFs(fs);
}
return (StorageResultSetReader<?, ?>) reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,25 @@ org.apache.linkis.common.io.resultset.ResultSetWriter<K, V> getResultSetWriter(
public static Record[] getRecordByWriter(
org.apache.linkis.common.io.resultset.ResultSetWriter<? extends MetaData, ? extends Record>
writer,
long limit) {
long limit)
throws IOException {
String res = writer.toString();
return getRecordByRes(res, limit);
}

public static Record[] getRecordByRes(String res, long limit) {
public static Record[] getRecordByRes(String res, long limit) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
int count = 0;
List<Record> records = new ArrayList<>();
try {
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
} catch (IOException e) {
logger.warn("ResultSetWriter getRecordByRes failed", e);
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
return records.toArray(new Record[0]);
}

public static Record getLastRecordByRes(String res) {
public static Record getLastRecordByRes(String res) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
Record record = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.linkis.storage.*;
import org.apache.linkis.storage.conf.*;
import org.apache.linkis.storage.domain.*;
import org.apache.linkis.storage.exception.StorageErrorException;
import org.apache.linkis.storage.utils.*;

import org.apache.commons.io.IOUtils;
Expand All @@ -37,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.linkis.storage.exception.StorageErrorCode.FS_ERROR;

public class StorageResultSetWriter<K extends MetaData, V extends Record>
extends ResultSetWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(StorageResultSetWriter.class);
Expand Down Expand Up @@ -98,8 +101,9 @@ public void createNewFile() {
fs.init(null);
FileSystemUtils.createNewFile(storePath, proxyUser, true);
outputStream = fs.write(storePath, true);
} catch (IOException e) {
logger.warn("StorageResultSetWriter createNewFile failed", e);
} catch (Exception e) {
throw new StorageErrorException(
FS_ERROR.getCode(), "StorageResultSetWriter createNewFile failed", e);
}
logger.info("Succeed to create a new file:{}", storePath);
fileCreated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ record -> {
if (emptyValue.equals(Dolphin.LINKIS_NULL)) {
return "";
} else {
return nullValue;
return emptyValue;
}
} else if (r instanceof Double) {
return StorageUtils.doubleToString((Double) r);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ public static void createNewFile(FsPath filePath, boolean createParentWhenNotExi
createNewFile(filePath, StorageUtils.getJvmUser(), createParentWhenNotExists);
}

public static void createNewFile(
FsPath filePath, String user, boolean createParentWhenNotExists) {
public static void createNewFile(FsPath filePath, String user, boolean createParentWhenNotExists)
throws Exception {
FileSystem fileSystem = (FileSystem) FSFactory.getFsByProxyUser(filePath, user);
try {
fileSystem.init(null);
createNewFileWithFileSystem(fileSystem, filePath, user, createParentWhenNotExists);
} catch (IOException e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} catch (Exception e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} finally {
IOUtils.closeQuietly(fileSystem);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,72 @@

package org.apache.linkis.engineplugin.impala.executor

import org.apache.commons.collections.MapUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
import org.apache.linkis.engineconn.common.password.{CommandPasswordCallback, StaticPasswordCallback}
import org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor, EngineExecutionContext}
import org.apache.linkis.engineconn.common.password.{
CommandPasswordCallback,
StaticPasswordCallback
}
import org.apache.linkis.engineconn.computation.executor.execute.{
ConcurrentComputationExecutor,
EngineExecutionContext
}
import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineplugin.impala.client.{
ExecutionListener,
ImpalaClient,
ImpalaResultSet
}
import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row
import org.apache.linkis.engineplugin.impala.client.exception.{ImpalaEngineException, ImpalaErrorCodeSummary}
import org.apache.linkis.engineplugin.impala.client.exception.{
ImpalaEngineException,
ImpalaErrorCodeSummary
}
import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress, ExecStatus}
import org.apache.linkis.engineplugin.impala.client.thrift.{ImpalaThriftClient, ImpalaThriftSessionFactory}
import org.apache.linkis.engineplugin.impala.client.{ExecutionListener, ImpalaClient, ImpalaResultSet}
import org.apache.linkis.engineplugin.impala.client.thrift.{
ImpalaThriftClient,
ImpalaThriftSessionFactory
}
import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._
import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadResource, NodeResource}
import org.apache.linkis.manager.common.entity.resource.{
CommonNodeResource,
LoadResource,
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.executer.{CompletedExecuteResponse, ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
import org.apache.linkis.scheduler.executer.{
CompletedExecuteResponse,
ErrorExecuteResponse,
ExecuteResponse,
SuccessExecuteResponse
}
import org.apache.linkis.storage.domain.Column
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}

import org.apache.commons.collections.MapUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils

import org.springframework.util.CollectionUtils

import javax.net.SocketFactory
import javax.net.ssl._
import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}

import java.io.FileInputStream
import java.security.KeyStore
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
import javax.net.SocketFactory
import javax.net.ssl._
import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}

import scala.collection.JavaConverters._

class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package executor;

import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.manager.engineplugin.io.executor.IoEngineConnExecutor;
import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.storage.domain.MethodEntity;
import org.apache.linkis.storage.domain.MethodEntitySerializer;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class IoEngineConnExecutorTest {

@Test
public void testExecuteLine() {
// test init
IoEngineConnExecutor ioEngineConnExecutor = new IoEngineConnExecutor(1, Integer.MAX_VALUE);
EngineExecutionContext engineExecutionContext =
new EngineExecutionContext(ioEngineConnExecutor, "hadoop");
engineExecutionContext.setJobId("jobId-1");
Object[] objects = new Object[10];
MethodEntity methodEntity =
new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "init", objects);
AliasOutputExecuteResponse executeResponse =
(AliasOutputExecuteResponse)
ioEngineConnExecutor.executeLine(
engineExecutionContext, MethodEntitySerializer.serializer(methodEntity));
Assertions.assertThat(executeResponse).isNotNull();
Assertions.assertThat(executeResponse.alias()).isEqualTo("0");

// test write
String filePath = this.getClass().getResource("/testIoResult.dolphin").getFile().toString();
FsPath fsPath = new FsPath(filePath);
String fsPathStr = MethodEntitySerializer.serializerJavaObject(fsPath);
objects = new Object[3];
objects[0] = fsPathStr;
objects[1] = true;
objects[2] = "dolphin000000000300000000040,110000000016aGVsbG8gd29ybGQ=";
methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "write", objects);
ExecuteResponse writeResponse =
ioEngineConnExecutor.executeLine(
engineExecutionContext, MethodEntitySerializer.serializer(methodEntity));
System.out.println(writeResponse);
Assertions.assertThat(executeResponse).isNotNull();

// test read
objects = new Object[1];
objects[0] = fsPathStr;
methodEntity = new MethodEntity(0L, "file", "hadoop", "hadoop", "localhost", "read", objects);
AliasOutputExecuteResponse readResponse =
(AliasOutputExecuteResponse)
ioEngineConnExecutor.executeLine(
engineExecutionContext, MethodEntitySerializer.serializer(methodEntity));
Assertions.assertThat(readResponse).isNotNull();
Assertions.assertThat(readResponse.output())
.isEqualTo("dolphin000000000300000000040,110000000016aGVsbG8gd29ybGQ=");
}
}

This file was deleted.

0 comments on commit ca101ae

Please sign in to comment.