diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml
index def795ebd8..c9c610bcbd 100644
--- a/linkis-commons/linkis-storage/pom.xml
+++ b/linkis-commons/linkis-storage/pom.xml
@@ -99,6 +99,36 @@
aws-java-sdk-s3
1.12.261
+
+
+ org.apache.parquet
+ parquet-avro
+ 1.12.0
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.version}
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ ch.qos.reload4j
+ reload4j
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java
index 74950c15fe..916f9ff469 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java
@@ -25,6 +25,12 @@
public class LinkisStorageConf {
private static final Object CONF_LOCK = new Object();
+ public static final String DOLPHIN = "dolphin";
+
+ public static final String PARQUET = "parquet";
+
+ public static final String PARQUET_FILE_SUFFIX = ".parquet";
+
public static final String HDFS_FILE_SYSTEM_REST_ERRS =
CommonVars.apply(
"wds.linkis.hdfs.rest.errs",
@@ -34,12 +40,17 @@ public class LinkisStorageConf {
public static final String ROW_BYTE_MAX_LEN_STR =
CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue();
+ public static final String ENGINE_RESULT_TYPE =
+ CommonVars.apply("wds.linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue();
+
public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR);
public static final String FILE_TYPE =
CommonVars.apply(
"wds.linkis.storage.file.type",
- "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql")
+ "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql"
+ + ","
+ + PARQUET)
.getValue();
private static volatile String[] fileTypeArr = null;
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java
index db78afac29..9ac4c02cc7 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java
@@ -23,6 +23,7 @@
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.storage.FSFactory;
+import org.apache.linkis.storage.conf.LinkisStorageConf;
import org.apache.linkis.storage.domain.Dolphin;
import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary;
import org.apache.linkis.storage.exception.StorageWarnException;
@@ -134,15 +135,22 @@ public String[] getResultSetType() {
@Override
public ResultSet extends MetaData, ? extends Record> getResultSetByPath(FsPath fsPath, Fs fs) {
+ ResultSet resultSet = null;
try (InputStream inputStream = fs.read(fsPath)) {
- String resultSetType = Dolphin.getType(inputStream);
- if (StringUtils.isEmpty(resultSetType)) {
- throw new StorageWarnException(
- THE_FILE_IS_EMPTY.getErrorCode(),
- MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath()));
+ String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
+ if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
+ String resultSetType = Dolphin.getType(inputStream);
+ if (StringUtils.isEmpty(resultSetType)) {
+ throw new StorageWarnException(
+ THE_FILE_IS_EMPTY.getErrorCode(),
+ MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath()));
+ }
+ // Utils.tryQuietly(fs::close);
+ resultSet = getResultSetByType(resultSetType);
+ } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
+ resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE);
}
- // Utils.tryQuietly(fs::close);
- return getResultSetByType(resultSetType);
+ return resultSet;
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java
new file mode 100644
index 0000000000..07c6c4cd7a
--- /dev/null
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.storage.resultset;
+
+import org.apache.linkis.common.io.FsPath;
+import org.apache.linkis.common.io.MetaData;
+import org.apache.linkis.common.io.Record;
+import org.apache.linkis.common.io.resultset.ResultSet;
+import org.apache.linkis.storage.domain.Column;
+import org.apache.linkis.storage.domain.DataType;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+import org.apache.linkis.storage.resultset.table.TableRecord;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ParquetResultSetReader
+ extends StorageResultSetReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetReader.class);
+
+ private FsPath fsPath;
+
+ private ParquetReader parquetReader;
+
+ private GenericRecord record;
+
+ public ParquetResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath)
+ throws IOException {
+ super(resultSet, inputStream);
+ this.fsPath = fsPath;
+ this.parquetReader =
+ AvroParquetReader.builder(new Path(fsPath.getPath())).build();
+ this.record = parquetReader.read();
+ }
+
+ @Override
+ public MetaData getMetaData() {
+ if (metaData == null) {
+ try {
+ List fields = record.getSchema().getFields();
+ List columnList =
+ fields.stream()
+ .map(
+ field ->
+ new Column(
+ field.name(),
+ DataType.toDataType(field.schema().getType().getName()),
+ ""))
+ .collect(Collectors.toList());
+
+ metaData = new TableMetaData(columnList.toArray(new Column[0]));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to read parquet schema", e);
+ }
+ }
+ return metaData;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (metaData == null) getMetaData();
+ if (record == null) return false;
+ ArrayList