Skip to content

Commit

Permalink
Support for storage resultSet in Parquet and Orc format (#5024)
Browse files Browse the repository at this point in the history
* Support for storing result sets in Parquet format

* Modify known-dependencies.txt

* Fix bugs

* Fix bugs

* Support for storing result sets in orc format

* Formatted code

* Modify known-dependencies.txt

* Modify known-dependencies.txt

* Fix bugs

* Change comments to English

* Code optimization

* maven packaging optimization

* maven packaging optimization

* maven packaging optimization

* Override skip method

* maven packaging optimization
  • Loading branch information
ChengJie1053 authored Jan 24, 2024
1 parent cdf80e1 commit 20c2654
Show file tree
Hide file tree
Showing 19 changed files with 1,059 additions and 42 deletions.
46 changes: 46 additions & 0 deletions linkis-commons/linkis-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,52 @@
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.261</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet-avro.version}</version>
<scope>${storage.parquet.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>${storage.parquet.scope}</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<!-- for hadoop 3.3.3 -->
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>${orc-core.version}</version>
<classifier>nohive</classifier>
<scope>${storage.orc.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@
public class LinkisStorageConf {
private static final Object CONF_LOCK = new Object();

public static final String DOLPHIN = "dolphin";

public static final String PARQUET = "parquet";

public static final String PARQUET_FILE_SUFFIX = ".parquet";

public static final String ORC = "orc";

public static final String ORC_FILE_SUFFIX = ".orc";

public static final String HDFS_FILE_SYSTEM_REST_ERRS =
CommonVars.apply(
"wds.linkis.hdfs.rest.errs",
Expand All @@ -34,12 +44,19 @@ public class LinkisStorageConf {
public static final String ROW_BYTE_MAX_LEN_STR =
CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue();

public static final String ENGINE_RESULT_TYPE =
CommonVars.apply("linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue();

public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR);

public static final String FILE_TYPE =
CommonVars.apply(
"wds.linkis.storage.file.type",
"dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql")
"dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql"
+ ","
+ PARQUET
+ ","
+ ORC)
.getValue();

private static volatile String[] fileTypeArr = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ public enum StorageErrorCode {
INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"),
FS_OOM(53002, "OOM occurred while reading the file"),

FS_ERROR(53003, "Failed to operation fs");
FS_ERROR(53003, "Failed to operation fs"),

READ_PARQUET_FAILED(53004, "Failed to read parquet file"),

READ_ORC_FAILED(53005, "Failed to read orc file");

StorageErrorCode(int errorCode, String message) {
this.code = errorCode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.storage.exception;

import org.apache.linkis.common.exception.ErrorException;

public class StorageReadException extends ErrorException {

public StorageReadException(int errCode, String desc) {
super(errCode, desc);
}

public StorageReadException(int errCode, String desc, Throwable t) {
super(errCode, desc);
initCause(t);
}

public StorageReadException(int errCode, String desc, String ip, int port, String serviceKind) {
super(errCode, desc, ip, port, serviceKind);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.storage.FSFactory;
import org.apache.linkis.storage.conf.LinkisStorageConf;
import org.apache.linkis.storage.domain.Dolphin;
import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary;
import org.apache.linkis.storage.exception.StorageWarnException;
Expand Down Expand Up @@ -108,7 +109,9 @@ public boolean exists(String resultSetType) {

@Override
public boolean isResultSetPath(String path) {
return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX);
return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)
|| path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)
|| path.endsWith(LinkisStorageConf.ORC_FILE_SUFFIX);
}

@Override
Expand All @@ -134,15 +137,22 @@ public String[] getResultSetType() {

@Override
public ResultSet<? extends MetaData, ? extends Record> getResultSetByPath(FsPath fsPath, Fs fs) {
ResultSet resultSet = null;
try (InputStream inputStream = fs.read(fsPath)) {
String resultSetType = Dolphin.getType(inputStream);
if (StringUtils.isEmpty(resultSetType)) {
throw new StorageWarnException(
THE_FILE_IS_EMPTY.getErrorCode(),
MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath()));
if (fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) {
String resultSetType = Dolphin.getType(inputStream);
if (StringUtils.isEmpty(resultSetType)) {
throw new StorageWarnException(
THE_FILE_IS_EMPTY.getErrorCode(),
MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath()));
}
// Utils.tryQuietly(fs::close);
resultSet = getResultSetByType(resultSetType);
} else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)
|| fsPath.getPath().endsWith(LinkisStorageConf.ORC_FILE_SUFFIX)) {
resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE);
}
// Utils.tryQuietly(fs::close);
return getResultSetByType(resultSetType);
return resultSet;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit 20c2654

Please sign in to comment.