From 197e9a3c948e10265187f1505fd6271b807caaac Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Wed, 6 Dec 2023 17:13:46 +0800 Subject: [PATCH 01/16] Support for storing result sets in Parquet format --- linkis-commons/linkis-storage/pom.xml | 30 +++++ .../storage/conf/LinkisStorageConf.java | 13 +- .../resultset/DefaultResultSetFactory.java | 22 +++- .../resultset/ParquetResultSetReader.java | 122 ++++++++++++++++++ .../resultset/ParquetResultSetWriter.java | 95 ++++++++++++++ .../resultset/ResultSetReaderFactory.java | 20 ++- .../resultset/ResultSetWriterFactory.java | 19 ++- .../storage/resultset/StorageResultSet.java | 12 +- .../resultset/StorageResultSetReader.java | 8 +- .../resultset/StorageResultSetWriter.java | 24 ++-- .../linkis/storage/source/FileSource.java | 13 +- .../linkis/storage/utils/StorageHelper.java | 5 +- .../filesystem/restful/api/FsRestfulApi.java | 4 + 13 files changed, 351 insertions(+), 36 deletions(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index def795ebd8..c9c610bcbd 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -99,6 +99,36 @@ aws-java-sdk-s3 1.12.261 + + + org.apache.parquet + parquet-avro + 1.12.0 + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 74950c15fe..916f9ff469 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -25,6 +25,12 @@ public class LinkisStorageConf { private static final Object CONF_LOCK = new Object(); + public static final String DOLPHIN = "dolphin"; + + public static final String PARQUET = "parquet"; + + public static final String PARQUET_FILE_SUFFIX = ".parquet"; + public static final String HDFS_FILE_SYSTEM_REST_ERRS = CommonVars.apply( "wds.linkis.hdfs.rest.errs", @@ -34,12 +40,17 @@ public class LinkisStorageConf { public static final String ROW_BYTE_MAX_LEN_STR = CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue(); + public static final String ENGINE_RESULT_TYPE = + CommonVars.apply("wds.linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); + public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR); public static final String FILE_TYPE = CommonVars.apply( "wds.linkis.storage.file.type", - "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql") + "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql" + + "," + + PARQUET) .getValue(); private static volatile String[] fileTypeArr = null; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java index db78afac29..9ac4c02cc7 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java @@ -23,6 +23,7 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.storage.FSFactory; +import org.apache.linkis.storage.conf.LinkisStorageConf; import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary; import org.apache.linkis.storage.exception.StorageWarnException; @@ -134,15 +135,22 @@ public String[] getResultSetType() { @Override public ResultSet getResultSetByPath(FsPath fsPath, Fs fs) { + ResultSet resultSet = null; try (InputStream inputStream = fs.read(fsPath)) { - String resultSetType = Dolphin.getType(inputStream); - if (StringUtils.isEmpty(resultSetType)) { - throw new StorageWarnException( - THE_FILE_IS_EMPTY.getErrorCode(), - MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath())); + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + String resultSetType = Dolphin.getType(inputStream); + if (StringUtils.isEmpty(resultSetType)) { + throw new StorageWarnException( + THE_FILE_IS_EMPTY.getErrorCode(), + MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath())); + } + // Utils.tryQuietly(fs::close); + resultSet = getResultSetByType(resultSetType); + } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE); } - // Utils.tryQuietly(fs::close); - return getResultSetByType(resultSetType); + return resultSet; } catch (IOException e) { throw new RuntimeException(e); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java new file mode 100644 index 0000000000..07c6c4cd7a --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.domain.DataType; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ParquetResultSetReader + extends StorageResultSetReader { + + private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetReader.class); + + private FsPath fsPath; + + private ParquetReader parquetReader; + + private GenericRecord record; + + public ParquetResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath) + throws IOException { + super(resultSet, inputStream); + this.fsPath = fsPath; + this.parquetReader = + AvroParquetReader.builder(new Path(fsPath.getPath())).build(); + this.record = parquetReader.read(); + } + + @Override + public MetaData getMetaData() { + if (metaData == null) { + try { + List fields = record.getSchema().getFields(); + List columnList = + fields.stream() + .map( + field -> + new Column( + field.name(), + DataType.toDataType(field.schema().getType().getName()), + "")) + .collect(Collectors.toList()); + + metaData = new TableMetaData(columnList.toArray(new Column[0])); + } catch (Exception e) { + throw new RuntimeException("Failed to read parquet schema", e); + } + } + return metaData; + } + + @Override + public boolean hasNext() throws IOException { + if (metaData == null) getMetaData(); + if (record == null) return false; + ArrayList resultList = new ArrayList<>(); + TableMetaData tableMetaData = (TableMetaData) metaData; + int length = tableMetaData.getColumns().length; + for (int i = 0; i < length; i++) { + resultList.add(record.get(i)); + } + row = new TableRecord(resultList.toArray(new Object[0])); + if (row == null) return false; + return record != null; + } + + @Override + public Record getRecord() { + if (metaData == null) throw new RuntimeException("Must read metadata first(必须先读取metadata)"); + if (row == null) { + throw new RuntimeException( + "Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)"); + } + try { + this.record = parquetReader.read(); + } catch (IOException e) { + throw new RuntimeException("Failed to read parquet record", e); + } + return row; + } + + @Override + public void close() throws IOException { + super.close(); + parquetReader.close(); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java new file mode 100644 index 0000000000..31dd6c7a5c --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ParquetResultSetWriter + extends StorageResultSetWriter { + private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetWriter.class); + + private Schema schema = null; + + public ParquetResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) { + super(resultSet, maxCacheSize, storePath); + } + + @Override + public void addMetaData(MetaData metaData) throws IOException { + if (!moveToWriteRow) { + rMetaData = metaData; + SchemaBuilder.FieldAssembler fieldAssembler = SchemaBuilder.record("linkis").fields(); + TableMetaData tableMetaData = (TableMetaData) rMetaData; + for (Column column : tableMetaData.columns) { + fieldAssembler + .name(column.getColumnName().replaceAll("\\.", "_").replaceAll("[^a-zA-Z0-9_]", "")) + .doc(column.getComment()) + .type(column.getDataType().getTypeName().toLowerCase()) + .noDefault(); + } + schema = fieldAssembler.endRecord(); + moveToWriteRow = true; + } + } + + @Override + public void addRecord(Record record) { + if (moveToWriteRow) { + try { + TableRecord tableRecord = (TableRecord) record; + Object[] row = tableRecord.row; + try (ParquetWriter writer = + AvroParquetWriter.builder(new Path(storePath.getPath())) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build()) { + + GenericRecord genericRecord = new GenericData.Record(schema); + for (int i = 0; i < row.length; i++) { + genericRecord.put(schema.getFields().get(i).name(), row[i]); + } + writer.write(genericRecord); + } + } catch (Exception e) { + logger.warn("addMetaDataAndRecordString failed", e); + } + } + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 5e56b099d7..03434abdbe 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -24,6 +24,7 @@ import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.common.io.resultset.ResultSetReader; import org.apache.linkis.storage.FSFactory; +import org.apache.linkis.storage.conf.LinkisStorageConf; import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.resultset.table.TableMetaData; @@ -40,8 +41,19 @@ public class ResultSetReaderFactory { private static final Logger logger = LoggerFactory.getLogger(ResultSetReaderFactory.class); public static ResultSetReader getResultSetReader( - ResultSet resultSet, InputStream inputStream) { - return new StorageResultSetReader<>(resultSet, inputStream); + ResultSet resultSet, InputStream inputStream, FsPath fsPath) { + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + StorageResultSetReader resultSetReader = null; + if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + resultSetReader = new StorageResultSetReader<>(resultSet, inputStream); + } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + try { + resultSetReader = new ParquetResultSetReader<>(resultSet, inputStream, fsPath); + } catch (IOException e) { + throw new RuntimeException("Failed to read parquet", e); + } + } + return resultSetReader; } public static ResultSetReader getResultSetReader( @@ -61,7 +73,7 @@ public static ResultSetReader getResultSetReader(String res) throws IOException Fs fs = FSFactory.getFs(resPath); fs.init(null); ResultSetReader reader = - ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath)); + ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath), resPath); if (reader instanceof StorageResultSetReader) { ((StorageResultSetReader) reader).setFs(fs); } @@ -96,7 +108,7 @@ public static ResultSetReader getTableResultReader(String res) { InputStream read = fs.read(resPath); return ResultSetReaderFactory.getResultSetReader( - (TableResultSet) resultSet, read); + (TableResultSet) resultSet, read, resPath); } catch (IOException e) { throw new StorageWarnException( LinkisStorageErrorCodeSummary.TABLE_ARE_NOT_SUPPORTED.getErrorCode(), diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java index d70319c9bd..3e50692186 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java @@ -22,6 +22,7 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.common.io.resultset.ResultSetReader; +import org.apache.linkis.storage.conf.LinkisStorageConf; import java.io.IOException; import java.util.ArrayList; @@ -36,14 +37,26 @@ public class ResultSetWriterFactory { public static org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( ResultSet resultSet, long maxCacheSize, FsPath storePath) { - return new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + StorageResultSetWriter writer = null; + if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } + return writer; } public static org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( ResultSet resultSet, long maxCacheSize, FsPath storePath, String proxyUser) { - StorageResultSetWriter writer = - new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + StorageResultSetWriter writer = null; + if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } writer.setProxyUser(proxyUser); return writer; } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java index c83661de2e..9ec8f27cb3 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.MetaData; import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.storage.conf.LinkisStorageConf; import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.utils.StorageConfiguration; @@ -49,10 +50,17 @@ public String charset() { @Override public FsPath getResultSetPath(FsPath parentDir, String fileName) { + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + String fileSuffix = null; + if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + fileSuffix = Dolphin.DOLPHIN_FILE_SUFFIX; + } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + fileSuffix = LinkisStorageConf.PARQUET_FILE_SUFFIX; + } final String path = parentDir.getPath().endsWith("/") - ? parentDir.getUriString() + fileName + Dolphin.DOLPHIN_FILE_SUFFIX - : parentDir.getUriString() + "/" + fileName + Dolphin.DOLPHIN_FILE_SUFFIX; + ? parentDir.getUriString() + fileName + fileSuffix + : parentDir.getUriString() + "/" + fileName + fileSuffix; logger.info("Get result set path: {}", path); return new FsPath(path); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java index c0222cc848..595ebda983 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java @@ -43,10 +43,10 @@ public class StorageResultSetReader private static final Logger logger = LoggerFactory.getLogger(StorageResultSetReader.class); private final ResultSet resultSet; - private final InputStream inputStream; - private final ResultDeserializer deserializer; - private K metaData; - private Record row; + public final InputStream inputStream; + public final ResultDeserializer deserializer; + public K metaData; + public Record row; private int colCount = 0; private int rowCount = 0; private Fs fs; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java index ea513664bd..bdea8ee4e7 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java @@ -17,15 +17,19 @@ package org.apache.linkis.storage.resultset; -import org.apache.linkis.common.io.*; -import org.apache.linkis.common.io.resultset.*; +import org.apache.linkis.common.io.Fs; +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSerializer; +import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.common.io.resultset.ResultSetWriter; -import org.apache.linkis.common.utils.*; -import org.apache.linkis.storage.*; -import org.apache.linkis.storage.conf.*; -import org.apache.linkis.storage.domain.*; +import org.apache.linkis.storage.FSFactory; +import org.apache.linkis.storage.conf.LinkisStorageConf; +import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.exception.StorageErrorException; -import org.apache.linkis.storage.utils.*; +import org.apache.linkis.storage.utils.FileSystemUtils; +import org.apache.linkis.storage.utils.StorageUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -46,15 +50,15 @@ public class StorageResultSetWriter private final ResultSet resultSet; private final long maxCacheSize; - private final FsPath storePath; + public final FsPath storePath; private final ResultSerializer serializer; - private boolean moveToWriteRow = false; + public boolean moveToWriteRow = false; private OutputStream outputStream = null; private int rowCount = 0; private final List buffer = new ArrayList(); private Fs fs = null; - private MetaData rMetaData = null; + public MetaData rMetaData = null; private String proxyUser = StorageUtils.getJvmUser(); private boolean fileCreated = false; private boolean closed = false; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index 0ed650186d..9a71e64420 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -73,7 +73,12 @@ public interface FileSource extends Closeable { (path, suffix) -> path.endsWith("." + suffix); static boolean isResultSet(String path) { - return suffixPredicate.apply(path, fileType[0]); + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + String type = fileType[0]; + if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + type = LinkisStorageConf.PARQUET; + } + return suffixPredicate.apply(path, type); } static boolean isResultSet(FsPath fsPath) { @@ -137,7 +142,8 @@ static FileSource create(FsPath fsPath, InputStream is) { static FileSplit createResultSetFileSplit(FsPath fsPath, InputStream is) { logger.info("try create result set file split with path:{}", fsPath.getPath()); ResultSet resultset = ResultSetFactory.getInstance().getResultSetByPath(fsPath); - ResultSetReader resultsetReader = ResultSetReaderFactory.getResultSetReader(resultset, is); + ResultSetReader resultsetReader = + ResultSetReaderFactory.getResultSetReader(resultset, is, fsPath); return new FileSplit(resultsetReader, resultset.resultSetType()); } @@ -145,7 +151,8 @@ static FileSplit createResultSetFileSplit(FsPath fsPath, Fs fs) { ResultSet resultset = ResultSetFactory.getInstance().getResultSetByPath(fsPath, fs); ResultSetReader resultsetReader = null; try { - resultsetReader = ResultSetReaderFactory.getResultSetReader(resultset, fs.read(fsPath)); + resultsetReader = + ResultSetReaderFactory.getResultSetReader(resultset, fs.read(fsPath), fsPath); } catch (IOException e) { logger.warn("FileSource createResultSetFileSplit failed", e); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageHelper.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageHelper.java index e1dee151ca..491c3d7af4 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageHelper.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageHelper.java @@ -80,7 +80,8 @@ public static void getTableResLines(String[] args) { resultSetFactory.getResultSetByType(ResultSetFactory.TABLE_TYPE); Fs fs = FSFactory.getFs(resPath); fs.init(null); - resultSetReader = ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath)); + resultSetReader = + ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath), resPath); TableMetaData metaData = (TableMetaData) resultSetReader.getMetaData(); Arrays.stream(metaData.getColumns()).forEach(column -> logger.info(column.toString())); int num = 0; @@ -116,7 +117,7 @@ public static void getTableRes(String[] args) { fs.init(null); ResultSetReader reader = - ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath)); + ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath), resPath); MetaData rmetaData = reader.getMetaData(); Arrays.stream(((TableMetaData) rmetaData).getColumns()) .forEach(column -> logger.info(column.toString())); diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index 189ab711fd..2af3caaaba 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -404,6 +404,10 @@ public Message getDirFileTrees( FsPathListWithError fsPathListWithError = fileSystem.listPathWithError(fsPath); if (fsPathListWithError != null) { for (FsPath children : fsPathListWithError.getFsPaths()) { + // 兼容parquet,跳过parquet.crc文件 + if (children.getPath().endsWith(".parquet.crc")) { + continue; + } DirFileTree dirFileTreeChildren = new DirFileTree(); dirFileTreeChildren.setName(new File(children.getPath()).getName()); dirFileTreeChildren.setPath(children.getSchemaPath()); From 34433b3056053ef8b05c97c2e216be4c317aa23b Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 7 Dec 2023 11:46:59 +0800 Subject: [PATCH 02/16] Modify known-dependencies.txt --- tool/dependencies/known-dependencies.txt | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index 73f1abede8..d4487ac37a 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -34,6 +34,8 @@ automaton-1.11-8.jar avatica-1.11.0.jar avro-1.7.7.jar avro-1.8.2.jar +avro-1.10.1.jar +avro-1.10.2.jar aws-java-sdk-core-1.11.792.jar aws-java-sdk-kms-1.11.792.jar aws-java-sdk-s3-1.11.792.jar @@ -514,6 +516,18 @@ pagehelper-5.3.1.jar paranamer-2.3.jar paranamer-2.8.jar parquet-hadoop-bundle-1.10.0.jar +parquet-avro-1.12.0.jar +parquet-column-1.12.0.jar +parquet-column-1.12.2.jar +parquet-common-1.12.0.jar +parquet-common-1.12.2.jar +parquet-encoding-1.12.0.jar +parquet-encoding-1.12.2.jar +parquet-format-structures-1.12.0.jar +parquet-hadoop-1.12.0.jar +parquet-hadoop-1.12.2.jar +parquet-jackson-1.12.0.jar +parquet-jackson-1.12.2.jar poi-5.2.3.jar poi-ooxml-5.2.3.jar poi-ooxml-lite-5.2.3.jar @@ -578,6 +592,7 @@ snakeyaml-1.33.jar snappy-java-1.1.4.jar snappy-java-1.1.7.7.jar snappy-java-1.1.8.2.jar +snappy-java-1.1.8.4.jar spark-redis_2.12-2.6.0.jar spring-aop-5.2.23.RELEASE.jar spring-beans-5.2.23.RELEASE.jar @@ -682,6 +697,8 @@ zookeeper-3.5.9.jar zookeeper-jute-3.5.9.jar zstd-jni-1.4.4-7.jar zstd-jni-1.4.5-6.jar +zstd-jni-1.4.9-1.jar +zstd-jni-1.5.0-4.jar zjsonpatch-0.3.0.jar agrona-1.12.0.jar audience-annotations-0.13.0.jar From c9e810f22a7ab0b67cf4d404decc5820c0736077 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 7 Dec 2023 14:46:34 +0800 Subject: [PATCH 03/16] Fix bugs --- .../storage/conf/LinkisStorageConf.java | 2 +- .../resultset/DefaultResultSetFactory.java | 9 ++- .../resultset/ParquetResultSetReader.java | 31 +++++++- .../resultset/ParquetResultSetWriter.java | 76 ++++++++++++++----- .../resultset/ResultSetReaderFactory.java | 2 +- .../resultset/ResultSetWriterFactory.java | 23 +++--- .../resultset/StorageResultSetReader.java | 8 +- .../resultset/StorageResultSetWriter.java | 6 +- tool/dependencies/known-dependencies.txt | 1 + 9 files changed, 113 insertions(+), 45 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 916f9ff469..2c4509486a 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -41,7 +41,7 @@ public class LinkisStorageConf { CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue(); public static final String ENGINE_RESULT_TYPE = - CommonVars.apply("wds.linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); + CommonVars.apply("linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java index 9ac4c02cc7..e75a2d96bc 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java @@ -109,7 +109,8 @@ public boolean exists(String resultSetType) { @Override public boolean isResultSetPath(String path) { - return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX); + return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX) + || path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX); } @Override @@ -138,7 +139,8 @@ public String[] getResultSetType() { ResultSet resultSet = null; try (InputStream inputStream = fs.read(fsPath)) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + if (engineResultType.equals(LinkisStorageConf.DOLPHIN) + || fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) { String resultSetType = Dolphin.getType(inputStream); if (StringUtils.isEmpty(resultSetType)) { throw new StorageWarnException( @@ -147,7 +149,8 @@ public String[] getResultSetType() { } // Utils.tryQuietly(fs::close); resultSet = getResultSetByType(resultSetType); - } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + } else if (engineResultType.equals(LinkisStorageConf.PARQUET) + || fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) { resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE); } return resultSet; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java index 07c6c4cd7a..c4c43d6f29 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.MetaData; import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetReader; import org.apache.linkis.storage.domain.Column; import org.apache.linkis.storage.domain.DataType; import org.apache.linkis.storage.resultset.table.TableMetaData; @@ -28,6 +29,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; @@ -42,12 +44,20 @@ import org.slf4j.LoggerFactory; public class ParquetResultSetReader - extends StorageResultSetReader { + extends ResultSetReader { private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetReader.class); private FsPath fsPath; + private final ResultSet resultSet; + + private final InputStream inputStream; + + private MetaData metaData; + + private Record row; + private ParquetReader parquetReader; private GenericRecord record; @@ -55,6 +65,8 @@ public class ParquetResultSetReader public ParquetResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath) throws IOException { super(resultSet, inputStream); + this.resultSet = resultSet; + this.inputStream = inputStream; this.fsPath = fsPath; this.parquetReader = AvroParquetReader.builder(new Path(fsPath.getPath())).build(); @@ -84,6 +96,21 @@ public MetaData getMetaData() { return metaData; } + @Override + public int skip(int recordNum) throws IOException { + return 0; + } + + @Override + public long getPosition() throws IOException { + return 0; + } + + @Override + public long available() throws IOException { + return 0; + } + @Override public boolean hasNext() throws IOException { if (metaData == null) getMetaData(); @@ -116,7 +143,7 @@ public Record getRecord() { @Override public void close() throws IOException { - super.close(); + IOUtils.closeQuietly(inputStream); parquetReader.close(); } } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java index 31dd6c7a5c..6fbac3c8cb 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.MetaData; import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.storage.domain.Column; import org.apache.linkis.storage.resultset.table.TableMetaData; import org.apache.linkis.storage.resultset.table.TableRecord; @@ -41,21 +42,36 @@ import org.slf4j.LoggerFactory; public class ParquetResultSetWriter - extends StorageResultSetWriter { + extends ResultSetWriter { private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetWriter.class); - private Schema schema = null; + private Schema schema; + + private ParquetWriter parquetWriter; + + private boolean moveToWriteRow = false; + + private MetaData metaData = null; + + private final FsPath storePath; + + private final long maxCacheSize; + + private final ResultSet resultSet; public ParquetResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) { super(resultSet, maxCacheSize, storePath); + this.resultSet = resultSet; + this.maxCacheSize = maxCacheSize; + this.storePath = storePath; } @Override public void addMetaData(MetaData metaData) throws IOException { if (!moveToWriteRow) { - rMetaData = metaData; + this.metaData = metaData; SchemaBuilder.FieldAssembler fieldAssembler = SchemaBuilder.record("linkis").fields(); - TableMetaData tableMetaData = (TableMetaData) rMetaData; + TableMetaData tableMetaData = (TableMetaData) this.metaData; for (Column column : tableMetaData.columns) { fieldAssembler .name(column.getColumnName().replaceAll("\\.", "_").replaceAll("[^a-zA-Z0-9_]", "")) @@ -65,31 +81,55 @@ public void addMetaData(MetaData metaData) throws IOException { } schema = fieldAssembler.endRecord(); moveToWriteRow = true; + if (parquetWriter == null) { + parquetWriter = + AvroParquetWriter.builder(new Path(storePath.getPath())) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build(); + } } } @Override public void addRecord(Record record) { if (moveToWriteRow) { + TableRecord tableRecord = (TableRecord) record; try { - TableRecord tableRecord = (TableRecord) record; Object[] row = tableRecord.row; - try (ParquetWriter writer = - AvroParquetWriter.builder(new Path(storePath.getPath())) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build()) { - - GenericRecord genericRecord = new GenericData.Record(schema); - for (int i = 0; i < row.length; i++) { - genericRecord.put(schema.getFields().get(i).name(), row[i]); - } - writer.write(genericRecord); + GenericRecord genericRecord = new GenericData.Record(schema); + for (int i = 0; i < row.length; i++) { + genericRecord.put(schema.getFields().get(i).name(), row[i]); } - } catch (Exception e) { + parquetWriter.write(genericRecord); + } catch (IOException e) { logger.warn("addMetaDataAndRecordString failed", e); } } } + + @Override + public FsPath toFSPath() { + return storePath; + } + + @Override + public String toString() { + return storePath.getSchemaPath(); + } + + @Override + public void addMetaDataAndRecordString(String content) {} + + @Override + public void addRecordString(String content) {} + + @Override + public void close() throws IOException { + parquetWriter.close(); + } + + @Override + public void flush() throws IOException {} } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 03434abdbe..36dbd130b0 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -43,7 +43,7 @@ public class ResultSetReaderFactory { public static ResultSetReader getResultSetReader( ResultSet resultSet, InputStream inputStream, FsPath fsPath) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - StorageResultSetReader resultSetReader = null; + ResultSetReader resultSetReader = null; if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { resultSetReader = new StorageResultSetReader<>(resultSet, inputStream); } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java index 3e50692186..35df51e779 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java @@ -22,6 +22,7 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.common.io.resultset.ResultSetReader; +import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.storage.conf.LinkisStorageConf; import java.io.IOException; @@ -34,11 +35,10 @@ public class ResultSetWriterFactory { private static final Logger logger = LoggerFactory.getLogger(ResultSetWriterFactory.class); - public static - org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( - ResultSet resultSet, long maxCacheSize, FsPath storePath) { + public static ResultSetWriter getResultSetWriter( + ResultSet resultSet, long maxCacheSize, FsPath storePath) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - StorageResultSetWriter writer = null; + ResultSetWriter writer = null; if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { @@ -47,25 +47,22 @@ org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( return writer; } - public static - org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( - ResultSet resultSet, long maxCacheSize, FsPath storePath, String proxyUser) { + public static ResultSetWriter getResultSetWriter( + ResultSet resultSet, long maxCacheSize, FsPath storePath, String proxyUser) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - StorageResultSetWriter writer = null; + ResultSetWriter writer = null; if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + StorageResultSetWriter storageResultSetWriter = (StorageResultSetWriter) writer; + storageResultSetWriter.setProxyUser(proxyUser); } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); } - writer.setProxyUser(proxyUser); return writer; } public static Record[] getRecordByWriter( - org.apache.linkis.common.io.resultset.ResultSetWriter - writer, - long limit) - throws IOException { + ResultSetWriter writer, long limit) throws IOException { String res = writer.toString(); return getRecordByRes(res, limit); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java index 595ebda983..c0222cc848 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java @@ -43,10 +43,10 @@ public class StorageResultSetReader private static final Logger logger = LoggerFactory.getLogger(StorageResultSetReader.class); private final ResultSet resultSet; - public final InputStream inputStream; - public final ResultDeserializer deserializer; - public K metaData; - public Record row; + private final InputStream inputStream; + private final ResultDeserializer deserializer; + private K metaData; + private Record row; private int colCount = 0; private int rowCount = 0; private Fs fs; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java index bdea8ee4e7..230c68301c 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java @@ -50,15 +50,15 @@ public class StorageResultSetWriter private final ResultSet resultSet; private final long maxCacheSize; - public final FsPath storePath; + private final FsPath storePath; private final ResultSerializer serializer; - public boolean moveToWriteRow = false; + private boolean moveToWriteRow = false; private OutputStream outputStream = null; private int rowCount = 0; private final List buffer = new ArrayList(); private Fs fs = null; - public MetaData rMetaData = null; + private MetaData rMetaData = null; private String proxyUser = StorageUtils.getJvmUser(); private boolean fileCreated = false; private boolean closed = false; diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index d4487ac37a..cfaf437f72 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -701,6 +701,7 @@ zstd-jni-1.4.9-1.jar zstd-jni-1.5.0-4.jar zjsonpatch-0.3.0.jar agrona-1.12.0.jar +audience-annotations-0.12.0.jar audience-annotations-0.13.0.jar commons-crypto-1.1.0.jar disruptor-3.4.2.jar From d5d14eb92dec547cbfe0b860692458f33907453f Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 7 Dec 2023 19:43:36 +0800 Subject: [PATCH 04/16] Fix bugs --- .../storage/resultset/DefaultResultSetFactory.java | 7 ++----- .../storage/resultset/ResultSetReaderFactory.java | 6 +++--- .../storage/resultset/ResultSetWriterFactory.java | 13 +++++++------ .../linkis/storage/resultset/StorageResultSet.java | 7 +++---- .../apache/linkis/storage/source/FileSource.java | 8 ++------ 5 files changed, 17 insertions(+), 24 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java index e75a2d96bc..d1d60d8a2c 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java @@ -138,9 +138,7 @@ public String[] getResultSetType() { public ResultSet getResultSetByPath(FsPath fsPath, Fs fs) { ResultSet resultSet = null; try (InputStream inputStream = fs.read(fsPath)) { - String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - if (engineResultType.equals(LinkisStorageConf.DOLPHIN) - || fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) { + if (fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) { String resultSetType = Dolphin.getType(inputStream); if (StringUtils.isEmpty(resultSetType)) { throw new StorageWarnException( @@ -149,8 +147,7 @@ public String[] getResultSetType() { } // Utils.tryQuietly(fs::close); resultSet = getResultSetByType(resultSetType); - } else if (engineResultType.equals(LinkisStorageConf.PARQUET) - || fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) { + } else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) { resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE); } return resultSet; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 36dbd130b0..8e20de99e6 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -25,6 +25,7 @@ import org.apache.linkis.common.io.resultset.ResultSetReader; import org.apache.linkis.storage.FSFactory; import org.apache.linkis.storage.conf.LinkisStorageConf; +import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.resultset.table.TableMetaData; @@ -42,11 +43,10 @@ public class ResultSetReaderFactory { public static ResultSetReader getResultSetReader( ResultSet resultSet, InputStream inputStream, FsPath fsPath) { - String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; ResultSetReader resultSetReader = null; - if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + if (fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) { resultSetReader = new StorageResultSetReader<>(resultSet, inputStream); - } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + } else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) { try { resultSetReader = new ParquetResultSetReader<>(resultSet, inputStream, fsPath); } catch (IOException e) { diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java index 35df51e779..981ce1b023 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java @@ -24,6 +24,7 @@ import org.apache.linkis.common.io.resultset.ResultSetReader; import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.storage.conf.LinkisStorageConf; +import org.apache.linkis.storage.resultset.table.TableResultSet; import java.io.IOException; import java.util.ArrayList; @@ -39,10 +40,10 @@ public static ResultSetWriter getRe ResultSet resultSet, long maxCacheSize, FsPath storePath) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; ResultSetWriter writer = null; - if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { - writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); - } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) { writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else { + writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); } return writer; } @@ -51,12 +52,12 @@ public static ResultSetWriter getRe ResultSet resultSet, long maxCacheSize, FsPath storePath, String proxyUser) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; ResultSetWriter writer = null; - if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) { + writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); StorageResultSetWriter storageResultSetWriter = (StorageResultSetWriter) writer; storageResultSetWriter.setProxyUser(proxyUser); - } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { - writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); } return writer; } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java index 9ec8f27cb3..5c25837d8f 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java @@ -23,6 +23,7 @@ import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.storage.conf.LinkisStorageConf; import org.apache.linkis.storage.domain.Dolphin; +import org.apache.linkis.storage.resultset.table.TableResultSet; import org.apache.linkis.storage.utils.StorageConfiguration; import org.slf4j.Logger; @@ -51,10 +52,8 @@ public String charset() { @Override public FsPath getResultSetPath(FsPath parentDir, String fileName) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - String fileSuffix = null; - if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { - fileSuffix = Dolphin.DOLPHIN_FILE_SUFFIX; - } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + String fileSuffix = Dolphin.DOLPHIN_FILE_SUFFIX; + if (engineResultType.equals(LinkisStorageConf.PARQUET) && this instanceof TableResultSet) { fileSuffix = LinkisStorageConf.PARQUET_FILE_SUFFIX; } final String path = diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index 9a71e64420..df7ae931a6 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -73,12 +73,8 @@ public interface FileSource extends Closeable { (path, suffix) -> path.endsWith("." + suffix); static boolean isResultSet(String path) { - String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - String type = fileType[0]; - if (engineResultType.equals(LinkisStorageConf.PARQUET)) { - type = LinkisStorageConf.PARQUET; - } - return suffixPredicate.apply(path, type); + return suffixPredicate.apply(path, fileType[0]) + || suffixPredicate.apply(path, LinkisStorageConf.PARQUET); } static boolean isResultSet(FsPath fsPath) { From 14a9601ab435464befa04437dc95a14a96e66e02 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 11 Dec 2023 18:03:04 +0800 Subject: [PATCH 05/16] Support for storing result sets in orc format --- linkis-commons/linkis-storage/pom.xml | 15 +- .../storage/conf/LinkisStorageConf.java | 10 +- .../resultset/DefaultResultSetFactory.java | 6 +- .../storage/resultset/OrcResultSetReader.java | 203 ++++++++++++++++++ .../storage/resultset/OrcResultSetWriter.java | 141 ++++++++++++ .../resultset/ResultSetReaderFactory.java | 6 + .../resultset/ResultSetWriterFactory.java | 6 + .../storage/resultset/StorageResultSet.java | 2 + .../linkis/storage/source/FileSource.java | 3 +- .../apache/linkis/storage/utils/OrcUtils.java | 150 +++++++++++++ .../filesystem/restful/api/FsRestfulApi.java | 6 +- pom.xml | 3 + 12 files changed, 542 insertions(+), 9 deletions(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index c9c610bcbd..93628db808 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -103,7 +103,7 @@ org.apache.parquet parquet-avro - 1.12.0 + ${parquet-avro.version} org.apache.hadoop @@ -129,6 +129,19 @@ + + org.apache.orc + orc-core + ${orc-core.version} + nohive + + + org.apache.hive + hive-storage-api + + + + diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 2c4509486a..58ff80b8c5 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -31,6 +31,10 @@ public class LinkisStorageConf { public static final String PARQUET_FILE_SUFFIX = ".parquet"; + public static final String ORC = "orc"; + + public static final String ORC_FILE_SUFFIX = ".orc"; + public static final String HDFS_FILE_SYSTEM_REST_ERRS = CommonVars.apply( "wds.linkis.hdfs.rest.errs", @@ -41,7 +45,7 @@ public class LinkisStorageConf { CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue(); public static final String ENGINE_RESULT_TYPE = - CommonVars.apply("linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); + CommonVars.apply("linkis.engine.resultSet.type", ORC, "Result type").getValue(); public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR); @@ -50,7 +54,9 @@ public class LinkisStorageConf { "wds.linkis.storage.file.type", "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql" + "," - + PARQUET) + + PARQUET + + "," + + ORC) .getValue(); private static volatile String[] fileTypeArr = null; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java index d1d60d8a2c..da51a2f13d 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java @@ -110,7 +110,8 @@ public boolean exists(String resultSetType) { @Override public boolean isResultSetPath(String path) { return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX) - || path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX); + || path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX) + || path.endsWith(LinkisStorageConf.ORC_FILE_SUFFIX); } @Override @@ -147,7 +148,8 @@ public String[] getResultSetType() { } // Utils.tryQuietly(fs::close); resultSet = getResultSetByType(resultSetType); - } else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) { + } else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX) + || fsPath.getPath().endsWith(LinkisStorageConf.ORC_FILE_SUFFIX)) { resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE); } return resultSet; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java new file mode 100644 index 0000000000..7061cfcd48 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetReader; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.domain.DataType; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.*; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrcResultSetReader + extends ResultSetReader { + + private static final Logger logger = LoggerFactory.getLogger(OrcResultSetReader.class); + + private FsPath fsPath; + + private final ResultSet resultSet; + + private final InputStream inputStream; + + private MetaData metaData; + + private Record row; + + private RecordReader rows; + + private Reader reader; + + public OrcResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath) + throws IOException { + super(resultSet, inputStream); + this.resultSet = resultSet; + this.inputStream = inputStream; + this.fsPath = fsPath; + this.reader = + OrcFile.createReader( + new Path(fsPath.getPath()), OrcFile.readerOptions(new Configuration())); + this.rows = reader.rows(); + } + + @Override + public MetaData getMetaData() { + if (metaData == null) { + try { + List fieldNames = reader.getSchema().getFieldNames(); + List typeDescriptions = reader.getSchema().getChildren(); + List columnList = new ArrayList<>(); + for (int i = 0; i < fieldNames.size(); i++) { + Column column = + new Column( + fieldNames.get(i), + DataType.toDataType(typeDescriptions.get(i).getCategory().getName()), + ""); + columnList.add(column); + } + + metaData = new TableMetaData(columnList.toArray(new Column[0])); + } catch (Exception e) { + throw new RuntimeException("Failed to read parquet schema", e); + } + } + return metaData; + } + + @Override + public int skip(int recordNum) throws IOException { + return 0; + } + + @Override + public long getPosition() throws IOException { + return 0; + } + + @Override + public long available() throws IOException { + return 0; + } + + @Override + public boolean hasNext() throws IOException { + if (metaData == null) getMetaData(); + if (rows == null) return false; + VectorizedRowBatch batch = + reader.getSchema().createRowBatch(TypeDescription.RowBatchVersion.ORIGINAL, 1); + TableMetaData tableMetaData = (TableMetaData) metaData; + + if (rows.nextBatch(batch)) { + int rowNum = 0; + Object[] rowData = new Object[tableMetaData.getColumns().length]; + for (int i = 0; i < batch.numCols; i++) { + ColumnVector columnVector = batch.cols[i]; + if (columnVector instanceof BytesColumnVector) { + BytesColumnVector vector = (BytesColumnVector) columnVector; + rowData[i] = vector.toString(rowNum); + } else if (columnVector instanceof Decimal64ColumnVector) { + Decimal64ColumnVector vector = (Decimal64ColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof DecimalColumnVector) { + DecimalColumnVector vector = (DecimalColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof DoubleColumnVector) { + DoubleColumnVector vector = (DoubleColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof ListColumnVector) { + ListColumnVector vector = (ListColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof IntervalDayTimeColumnVector) { + IntervalDayTimeColumnVector vector = (IntervalDayTimeColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof LongColumnVector) { + LongColumnVector vector = (LongColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof MapColumnVector) { + MapColumnVector vector = (MapColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof MultiValuedColumnVector) { + MultiValuedColumnVector vector = (MultiValuedColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof StructColumnVector) { + StructColumnVector vector = (StructColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof TimestampColumnVector) { + TimestampColumnVector vector = (TimestampColumnVector) columnVector; + rowData[i] = vector.time[rowNum]; + } else if (columnVector instanceof UnionColumnVector) { + UnionColumnVector vector = (UnionColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } + } + row = new TableRecord(rowData); + } else { + return false; + } + return row != null; + } + + @Override + public Record getRecord() { + if (metaData == null) throw new RuntimeException("Must read metadata first(必须先读取metadata)"); + if (row == null) { + throw new RuntimeException( + "Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)"); + } + return row; + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(inputStream); + rows.close(); + reader.close(); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java new file mode 100644 index 0000000000..c4809f6499 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetWriter; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; +import org.apache.linkis.storage.utils.OrcUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrcResultSetWriter + extends ResultSetWriter { + private static final Logger logger = LoggerFactory.getLogger(OrcResultSetWriter.class); + + private TypeDescription schema; + + private Writer writer; + + private boolean moveToWriteRow = false; + + private MetaData metaData = null; + + private final FsPath storePath; + + private final long maxCacheSize; + + private final ResultSet resultSet; + + public OrcResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) { + super(resultSet, maxCacheSize, storePath); + this.resultSet = resultSet; + this.maxCacheSize = maxCacheSize; + this.storePath = storePath; + } + + @Override + public void addMetaData(MetaData metaData) throws IOException { + if (!moveToWriteRow) { + this.metaData = metaData; + if (this.schema == null) { + this.schema = TypeDescription.createStruct(); + } + TableMetaData tableMetaData = (TableMetaData) this.metaData; + for (Column column : tableMetaData.columns) { + schema.addField(column.getColumnName(), OrcUtils.dataTypeToOrcType(column.getDataType())); + } + moveToWriteRow = true; + if (writer == null) { + writer = + OrcFile.createWriter( + new Path(storePath.getPath()), + OrcFile.writerOptions(new Configuration()) + .setSchema(schema) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.V_0_12)); + } + } + } + + @Override + public void addRecord(Record record) { + if (moveToWriteRow) { + TableRecord tableRecord = (TableRecord) record; + TableMetaData tableMetaData = (TableMetaData) metaData; + try { + Object[] row = tableRecord.row; + VectorizedRowBatch batch = schema.createRowBatch(); + int rowCount = batch.size++; + + for (int i = 0; i < row.length; i++) { + OrcUtils.setColumn( + rowCount, batch.cols[i], tableMetaData.columns[i].getDataType(), row[i]); + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + writer.addRowBatch(batch); + + } catch (IOException e) { + logger.warn("addMetaDataAndRecordString failed", e); + } + } + } + + @Override + public FsPath toFSPath() { + return storePath; + } + + @Override + public String toString() { + return storePath.getSchemaPath(); + } + + @Override + public void addMetaDataAndRecordString(String content) {} + + @Override + public void addRecordString(String content) {} + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public void flush() throws IOException {} +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 8e20de99e6..c38a5ac250 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -52,6 +52,12 @@ public static ResultSetReader getResultSe } catch (IOException e) { throw new RuntimeException("Failed to read parquet", e); } + } else if (fsPath.getPath().endsWith(LinkisStorageConf.ORC_FILE_SUFFIX)) { + try { + resultSetReader = new OrcResultSetReader<>(resultSet, inputStream, fsPath); + } catch (IOException e) { + throw new RuntimeException("Failed to read parquet", e); + } } return resultSetReader; } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java index 981ce1b023..9cb1999367 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java @@ -42,6 +42,9 @@ public static ResultSetWriter getRe ResultSetWriter writer = null; if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) { writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else if (engineResultType.equals(LinkisStorageConf.ORC) + && resultSet instanceof TableResultSet) { + writer = new OrcResultSetWriter<>(resultSet, maxCacheSize, storePath); } else { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); } @@ -54,6 +57,9 @@ public static ResultSetWriter getRe ResultSetWriter writer = null; if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) { writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else if (engineResultType.equals(LinkisStorageConf.ORC) + && resultSet instanceof TableResultSet) { + writer = new OrcResultSetWriter<>(resultSet, maxCacheSize, storePath); } else { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); StorageResultSetWriter storageResultSetWriter = (StorageResultSetWriter) writer; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java index 5c25837d8f..c708f5faf6 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java @@ -55,6 +55,8 @@ public FsPath getResultSetPath(FsPath parentDir, String fileName) { String fileSuffix = Dolphin.DOLPHIN_FILE_SUFFIX; if (engineResultType.equals(LinkisStorageConf.PARQUET) && this instanceof TableResultSet) { fileSuffix = LinkisStorageConf.PARQUET_FILE_SUFFIX; + } else if (engineResultType.equals(LinkisStorageConf.ORC) && this instanceof TableResultSet) { + fileSuffix = LinkisStorageConf.ORC_FILE_SUFFIX; } final String path = parentDir.getPath().endsWith("/") diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index df7ae931a6..b3071ac410 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -74,7 +74,8 @@ public interface FileSource extends Closeable { static boolean isResultSet(String path) { return suffixPredicate.apply(path, fileType[0]) - || suffixPredicate.apply(path, LinkisStorageConf.PARQUET); + || suffixPredicate.apply(path, LinkisStorageConf.PARQUET) + || suffixPredicate.apply(path, LinkisStorageConf.ORC); } static boolean isResultSet(FsPath fsPath) { diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java new file mode 100644 index 0000000000..8c949a5c5e --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.utils; + +import org.apache.linkis.storage.domain.DataType; + +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +/** + * Inspired by: + * https://github.com/apache/flink/blob/master/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java + */ +public class OrcUtils { + + public static TypeDescription dataTypeToOrcType(DataType type) { + switch (type) { + case CharType: + return TypeDescription.createChar().withMaxLength(1024); + case StringType: + return TypeDescription.createString(); + case VarcharType: + return TypeDescription.createVarchar().withMaxLength(1024); + case BooleanType: + return TypeDescription.createBoolean(); + case BinaryType: + return TypeDescription.createBinary(); + case DecimalType: + return TypeDescription.createDecimal().withScale(10).withPrecision(38); + case TinyIntType: + return TypeDescription.createByte(); + case ShortIntType: + return TypeDescription.createShort(); + case IntType: + return TypeDescription.createInt(); + case BigIntType: + return TypeDescription.createLong(); + case FloatType: + return TypeDescription.createFloat(); + case DoubleType: + return TypeDescription.createDouble(); + case DateType: + return TypeDescription.createDate(); + case TimestampType: + return TypeDescription.createTimestamp(); + case ArrayType: + return TypeDescription.createList(dataTypeToOrcType(DataType.VarcharType)); + case MapType: + return TypeDescription.createMap( + dataTypeToOrcType(DataType.VarcharType), dataTypeToOrcType(DataType.VarcharType)); + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + public static void setColumn(int columnId, ColumnVector column, DataType type, Object value) { + switch (type) { + case CharType: + case VarcharType: + case BinaryType: + case StringType: + { + BytesColumnVector vector = (BytesColumnVector) column; + vector.setVal(columnId, String.valueOf(value).getBytes()); + break; + } + case BooleanType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Boolean.valueOf(value.toString()) ? 1 : 0; + break; + } + case DecimalType: + { + DecimalColumnVector vector = (DecimalColumnVector) column; + vector.set(columnId, HiveDecimal.create(new BigDecimal(value.toString()))); + break; + } + case TinyIntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = (byte) value; + break; + } + case DateType: + case IntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Integer.valueOf(value.toString()); + break; + } + case BigIntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Long.valueOf(value.toString()); + break; + } + case FloatType: + { + DoubleColumnVector vector = (DoubleColumnVector) column; + vector.vector[columnId] = Float.valueOf(value.toString()); + break; + } + case DoubleType: + { + DoubleColumnVector vector = (DoubleColumnVector) column; + vector.vector[columnId] = Double.valueOf(value.toString()); + break; + } + case TimestampType: + { + TimestampColumnVector vector = (TimestampColumnVector) column; + try { + vector.set( + columnId, + new Timestamp( + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ") + .parse(value.toString()) + .getTime())); + } catch (ParseException e) { + vector.set(columnId, new Timestamp(System.currentTimeMillis())); + } + break; + } + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } +} diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index 2af3caaaba..d1113569df 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -404,9 +404,9 @@ public Message getDirFileTrees( FsPathListWithError fsPathListWithError = fileSystem.listPathWithError(fsPath); if (fsPathListWithError != null) { for (FsPath children : fsPathListWithError.getFsPaths()) { - // 兼容parquet,跳过parquet.crc文件 - if (children.getPath().endsWith(".parquet.crc")) { - continue; + // 兼容parquet和orc,跳过.crc文件 + if (children.getPath().endsWith(".crc")) { + continue; } DirFileTree dirFileTreeChildren = new DirFileTree(); dirFileTreeChildren.setName(new File(children.getPath()).getName()); diff --git a/pom.xml b/pom.xml index 327a455a30..d0d925a9a1 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,9 @@ 3.16.3 + 1.10.0 + 1.5.8 + 1.19.4 2.23.1 4.1.86.Final From 4efe0600a30a4dcc8598deb16299c4ae130f1270 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 11 Dec 2023 18:12:30 +0800 Subject: [PATCH 06/16] Formatted code --- .../apache/linkis/filesystem/restful/api/FsRestfulApi.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index d1113569df..384b9aeabc 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -404,9 +404,9 @@ public Message getDirFileTrees( FsPathListWithError fsPathListWithError = fileSystem.listPathWithError(fsPath); if (fsPathListWithError != null) { for (FsPath children : fsPathListWithError.getFsPaths()) { - // 兼容parquet和orc,跳过.crc文件 - if (children.getPath().endsWith(".crc")) { - continue; + // 兼容parquet和orc,跳过.crc文件 + if (children.getPath().endsWith(".crc")) { + continue; } DirFileTree dirFileTreeChildren = new DirFileTree(); dirFileTreeChildren.setName(new File(children.getPath()).getName()); From 9e2f627f1575f794532fea21023bf38f7bc81817 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 11 Dec 2023 18:33:24 +0800 Subject: [PATCH 07/16] Modify known-dependencies.txt --- tool/dependencies/known-dependencies.txt | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index cfaf437f72..b343720945 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -120,6 +120,7 @@ expiringmap-0.5.6.jar failsafe-2.4.0.jar failureaccess-1.0.1.jar fastutil-6.5.6.jar +fastutil-7.0.13.jar feign-core-10.12.jar feign-form-3.8.0.jar feign-form-spring-3.8.0.jar @@ -508,6 +509,7 @@ opentracing-api-0.33.0.jar opentracing-noop-0.33.0.jar opentracing-util-0.33.0.jar orc-core-1.5.8.jar +orc-core-1.5.8-nohive.jar orc-shims-1.5.8.jar org.jacoco.agent-0.8.5-runtime.jar osgi-resource-locator-1.0.1.jar @@ -516,18 +518,14 @@ pagehelper-5.3.1.jar paranamer-2.3.jar paranamer-2.8.jar parquet-hadoop-bundle-1.10.0.jar -parquet-avro-1.12.0.jar -parquet-column-1.12.0.jar -parquet-column-1.12.2.jar -parquet-common-1.12.0.jar -parquet-common-1.12.2.jar -parquet-encoding-1.12.0.jar -parquet-encoding-1.12.2.jar -parquet-format-structures-1.12.0.jar -parquet-hadoop-1.12.0.jar -parquet-hadoop-1.12.2.jar -parquet-jackson-1.12.0.jar -parquet-jackson-1.12.2.jar +parquet-avro-1.10.0.jar +parquet-column-1.10.0.jar +parquet-common-1.10.0.jar +parquet-encoding-1.10.0.jar +parquet-format-2.4.0.jar +parquet-format-structures-1.12.2.jar +parquet-hadoop-1.10.0.jar +parquet-jackson-1.10.0.jar poi-5.2.3.jar poi-ooxml-5.2.3.jar poi-ooxml-lite-5.2.3.jar From 38c14e98dfd86014a338e9e6e2c3904b7e263d7a Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 11 Dec 2023 19:16:57 +0800 Subject: [PATCH 08/16] Modify known-dependencies.txt --- tool/dependencies/known-dependencies.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index b343720945..c8ba127570 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -520,12 +520,17 @@ paranamer-2.8.jar parquet-hadoop-bundle-1.10.0.jar parquet-avro-1.10.0.jar parquet-column-1.10.0.jar +parquet-column-1.12.2.jar parquet-common-1.10.0.jar +parquet-common-1.12.2.jar parquet-encoding-1.10.0.jar +parquet-encoding-1.12.2.jar parquet-format-2.4.0.jar parquet-format-structures-1.12.2.jar parquet-hadoop-1.10.0.jar +parquet-hadoop-1.12.2.jar parquet-jackson-1.10.0.jar +parquet-jackson-1.12.2.jar poi-5.2.3.jar poi-ooxml-5.2.3.jar poi-ooxml-lite-5.2.3.jar From 00c151974fadcd5034936638e396f64e12c9a7a2 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 12 Dec 2023 14:21:04 +0800 Subject: [PATCH 09/16] Fix bugs --- .../java/org/apache/linkis/storage/conf/LinkisStorageConf.java | 2 +- .../src/main/java/org/apache/linkis/storage/utils/OrcUtils.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 58ff80b8c5..7fb5c1b4ac 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -45,7 +45,7 @@ public class LinkisStorageConf { CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue(); public static final String ENGINE_RESULT_TYPE = - CommonVars.apply("linkis.engine.resultSet.type", ORC, "Result type").getValue(); + CommonVars.apply("linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java index 8c949a5c5e..72c2f8c355 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java @@ -40,6 +40,8 @@ public static TypeDescription dataTypeToOrcType(DataType type) { return TypeDescription.createChar().withMaxLength(1024); case StringType: return TypeDescription.createString(); + case LongType: + return TypeDescription.createLong(); case VarcharType: return TypeDescription.createVarchar().withMaxLength(1024); case BooleanType: From ae439a1c7bb0d060046c5d782e20efa9e3a5a2f3 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Wed, 17 Jan 2024 11:15:47 +0800 Subject: [PATCH 10/16] Change comments to English --- .../org/apache/linkis/filesystem/restful/api/FsRestfulApi.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index 384b9aeabc..0256689e1c 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -404,7 +404,7 @@ public Message getDirFileTrees( FsPathListWithError fsPathListWithError = fileSystem.listPathWithError(fsPath); if (fsPathListWithError != null) { for (FsPath children : fsPathListWithError.getFsPaths()) { - // 兼容parquet和orc,跳过.crc文件 + // parquet and orc compatible, skipping.crc files if (children.getPath().endsWith(".crc")) { continue; } From 7e677ccb797adb8719cb9df1ac55ddd5d20ba8a2 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 18 Jan 2024 12:51:40 +0800 Subject: [PATCH 11/16] Code optimization --- .../storage/exception/StorageErrorCode.java | 6 +++- .../exception/StorageReadException.java | 36 +++++++++++++++++++ .../storage/resultset/OrcResultSetReader.java | 6 ++-- .../resultset/ParquetResultSetReader.java | 6 ++-- .../resultset/ResultSetReaderFactory.java | 12 +++++-- .../linkis/storage/source/FileSource.java | 2 +- 6 files changed, 58 insertions(+), 10 deletions(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageReadException.java diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java index 308e548f27..7a8452141b 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java @@ -25,7 +25,11 @@ public enum StorageErrorCode { INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"), FS_OOM(53002, "OOM occurred while reading the file"), - FS_ERROR(53003, "Failed to operation fs"); + FS_ERROR(53003, "Failed to operation fs"), + + READ_PARQUET_FAILED(53004, "Failed to read parquet file"), + + READ_ORC_FAILED(53005, "Failed to read orc file"); StorageErrorCode(int errorCode, String message) { this.code = errorCode; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageReadException.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageReadException.java new file mode 100644 index 0000000000..dedad1140c --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageReadException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class StorageReadException extends ErrorException { + + public StorageReadException(int errCode, String desc) { + super(errCode, desc); + } + + public StorageReadException(int errCode, String desc, Throwable t) { + super(errCode, desc); + initCause(t); + } + + public StorageReadException(int errCode, String desc, String ip, int port, String serviceKind) { + super(errCode, desc, ip, port, serviceKind); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java index 7061cfcd48..6df61c7998 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java @@ -101,17 +101,17 @@ public MetaData getMetaData() { @Override public int skip(int recordNum) throws IOException { - return 0; + throw new UnsupportedOperationException("Storeage Unsupported type: skip"); } @Override public long getPosition() throws IOException { - return 0; + throw new UnsupportedOperationException("Storeage Unsupported type: getPosition"); } @Override public long available() throws IOException { - return 0; + throw new UnsupportedOperationException("Storeage Unsupported type: available"); } @Override diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java index c4c43d6f29..9774ba1dba 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java @@ -98,17 +98,17 @@ public MetaData getMetaData() { @Override public int skip(int recordNum) throws IOException { - return 0; + throw new UnsupportedOperationException("Storeage Unsupported type: skip"); } @Override public long getPosition() throws IOException { - return 0; + throw new UnsupportedOperationException("Storeage Unsupported type: getPosition"); } @Override public long available() throws IOException { - return 0; + throw new UnsupportedOperationException("Storeage Unsupported type: available"); } @Override diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index c38a5ac250..bf46f49f9b 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -27,6 +27,8 @@ import org.apache.linkis.storage.conf.LinkisStorageConf; import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary; +import org.apache.linkis.storage.exception.StorageErrorCode; +import org.apache.linkis.storage.exception.StorageReadException; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.resultset.table.TableMetaData; import org.apache.linkis.storage.resultset.table.TableRecord; @@ -50,13 +52,19 @@ public static ResultSetReader getResultSe try { resultSetReader = new ParquetResultSetReader<>(resultSet, inputStream, fsPath); } catch (IOException e) { - throw new RuntimeException("Failed to read parquet", e); + throw new StorageReadException( + StorageErrorCode.READ_PARQUET_FAILED.getCode(), + StorageErrorCode.READ_PARQUET_FAILED.getMessage(), + e); } } else if (fsPath.getPath().endsWith(LinkisStorageConf.ORC_FILE_SUFFIX)) { try { resultSetReader = new OrcResultSetReader<>(resultSet, inputStream, fsPath); } catch (IOException e) { - throw new RuntimeException("Failed to read parquet", e); + throw new StorageReadException( + StorageErrorCode.READ_ORC_FAILED.getCode(), + StorageErrorCode.READ_ORC_FAILED.getMessage(), + e); } } return resultSetReader; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index b3071ac410..f7a5d3e512 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -73,7 +73,7 @@ public interface FileSource extends Closeable { (path, suffix) -> path.endsWith("." + suffix); static boolean isResultSet(String path) { - return suffixPredicate.apply(path, fileType[0]) + return suffixPredicate.apply(path, LinkisStorageConf.DOLPHIN) || suffixPredicate.apply(path, LinkisStorageConf.PARQUET) || suffixPredicate.apply(path, LinkisStorageConf.ORC); } From 72d1adf3c16f66af30cf210321f0ae9dff2dd11d Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 18 Jan 2024 16:25:02 +0800 Subject: [PATCH 12/16] maven packaging optimization --- linkis-commons/linkis-storage/pom.xml | 3 +++ pom.xml | 17 +++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index 93628db808..2f1cdb44bd 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -104,11 +104,13 @@ org.apache.parquet parquet-avro ${parquet-avro.version} + ${storage.parquet.scope} org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} + ${storage.parquet.scope} log4j @@ -134,6 +136,7 @@ orc-core ${orc-core.version} nohive + ${storage.orc.scope} org.apache.hive diff --git a/pom.xml b/pom.xml index f8476db3d7..d27ef8f9a8 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,9 @@ ${hadoop.version} provided + provided + provided + 1.16.2 0.9.3 1.3.0 @@ -1944,5 +1947,19 @@ + + + storage-parquet + + compile + + + + + storage-orc + + compile + + From 4e573efef6c719c33f3434be65c9861c2a23a8d4 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 19 Jan 2024 09:47:49 +0800 Subject: [PATCH 13/16] maven packaging optimization --- linkis-commons/linkis-storage/pom.xml | 3 --- pom.xml | 16 ---------------- 2 files changed, 19 deletions(-) diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index 2f1cdb44bd..93628db808 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -104,13 +104,11 @@ org.apache.parquet parquet-avro ${parquet-avro.version} - ${storage.parquet.scope} org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} - ${storage.parquet.scope} log4j @@ -136,7 +134,6 @@ orc-core ${orc-core.version} nohive - ${storage.orc.scope} org.apache.hive diff --git a/pom.xml b/pom.xml index d27ef8f9a8..a6792badf0 100644 --- a/pom.xml +++ b/pom.xml @@ -121,8 +121,6 @@ ${hadoop.version} provided - provided - provided 1.16.2 0.9.3 @@ -1947,19 +1945,5 @@ - - - storage-parquet - - compile - - - - - storage-orc - - compile - - From 8d5c5ac5552cfe3ecd5ed6f43979180e47713e42 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 19 Jan 2024 10:27:06 +0800 Subject: [PATCH 14/16] maven packaging optimization --- pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/pom.xml b/pom.xml index a6792badf0..f8476db3d7 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,6 @@ ${hadoop.version} provided - 1.16.2 0.9.3 1.3.0 From 5c4c20fdf4ec92c132f237683fd80ffd3646be03 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 22 Jan 2024 15:33:05 +0800 Subject: [PATCH 15/16] Override skip method --- .../linkis/storage/resultset/OrcResultSetReader.java | 11 ++++++++++- .../storage/resultset/ParquetResultSetReader.java | 11 ++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java index 6df61c7998..249e326cde 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java @@ -101,7 +101,16 @@ public MetaData getMetaData() { @Override public int skip(int recordNum) throws IOException { - throw new UnsupportedOperationException("Storeage Unsupported type: skip"); + if (recordNum < 0) return -1; + + for (int i = recordNum; i > 0; i--) { + try { + hasNext(); + } catch (Throwable t) { + return recordNum - i; + } + } + return recordNum; } @Override diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java index 9774ba1dba..c09804294d 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java @@ -98,7 +98,16 @@ public MetaData getMetaData() { @Override public int skip(int recordNum) throws IOException { - throw new UnsupportedOperationException("Storeage Unsupported type: skip"); + if (recordNum < 0) return -1; + + for (int i = recordNum; i > 0; i--) { + try { + this.record = parquetReader.read(); + } catch (Throwable t) { + return recordNum - i; + } + } + return recordNum; } @Override From c718a8d3e7af359e3a98abb74124252db19b3b32 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 22 Jan 2024 18:53:35 +0800 Subject: [PATCH 16/16] maven packaging optimization --- linkis-commons/linkis-storage/pom.xml | 3 +++ pom.xml | 17 +++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index 93628db808..2f1cdb44bd 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -104,11 +104,13 @@ org.apache.parquet parquet-avro ${parquet-avro.version} + ${storage.parquet.scope} org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} + ${storage.parquet.scope} log4j @@ -134,6 +136,7 @@ orc-core ${orc-core.version} nohive + ${storage.orc.scope} org.apache.hive diff --git a/pom.xml b/pom.xml index f8476db3d7..d27ef8f9a8 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,9 @@ ${hadoop.version} provided + provided + provided + 1.16.2 0.9.3 1.3.0 @@ -1944,5 +1947,19 @@ + + + storage-parquet + + compile + + + + + storage-orc + + compile + +