From 20c2654d5060d19d58ccdd32c8e52ad664e97049 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Wed, 24 Jan 2024 19:09:38 +0800 Subject: [PATCH] Support for storage resultSet in Parquet and Orc format (#5024) * Support for storing result sets in Parquet format * Modify known-dependencies.txt * Fix bugs * Fix bugs * Support for storing result sets in orc format * Formatted code * Modify known-dependencies.txt * Modify known-dependencies.txt * Fix bugs * Change comments to English * Code optimization * maven packaging optimization * maven packaging optimization * maven packaging optimization * Override skip method * maven packaging optimization --- linkis-commons/linkis-storage/pom.xml | 46 ++++ .../storage/conf/LinkisStorageConf.java | 19 +- .../storage/exception/StorageErrorCode.java | 6 +- .../exception/StorageReadException.java | 36 +++ .../resultset/DefaultResultSetFactory.java | 26 ++- .../storage/resultset/OrcResultSetReader.java | 212 ++++++++++++++++++ .../storage/resultset/OrcResultSetWriter.java | 141 ++++++++++++ .../resultset/ParquetResultSetReader.java | 158 +++++++++++++ .../resultset/ParquetResultSetWriter.java | 135 +++++++++++ .../resultset/ResultSetReaderFactory.java | 34 ++- .../resultset/ResultSetWriterFactory.java | 45 ++-- .../storage/resultset/StorageResultSet.java | 13 +- .../resultset/StorageResultSetWriter.java | 18 +- .../linkis/storage/source/FileSource.java | 10 +- .../apache/linkis/storage/utils/OrcUtils.java | 152 +++++++++++++ .../linkis/storage/utils/StorageHelper.java | 5 +- .../filesystem/restful/api/FsRestfulApi.java | 4 + pom.xml | 20 ++ tool/dependencies/known-dependencies.txt | 21 ++ 19 files changed, 1059 insertions(+), 42 deletions(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageReadException.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index def795ebd8..2f1cdb44bd 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -99,6 +99,52 @@ aws-java-sdk-s3 1.12.261 + + + org.apache.parquet + parquet-avro + ${parquet-avro.version} + ${storage.parquet.scope} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + ${storage.parquet.scope} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + + + org.apache.orc + orc-core + ${orc-core.version} + nohive + ${storage.orc.scope} + + + org.apache.hive + hive-storage-api + + + + diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 74950c15fe..7fb5c1b4ac 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -25,6 +25,16 @@ public class LinkisStorageConf { private static final Object CONF_LOCK = new Object(); + public static final String DOLPHIN = "dolphin"; + + public static final String PARQUET = "parquet"; + + public static final String PARQUET_FILE_SUFFIX = ".parquet"; + + public static final String ORC = "orc"; + + public static final String ORC_FILE_SUFFIX = ".orc"; + public static final String HDFS_FILE_SYSTEM_REST_ERRS = CommonVars.apply( "wds.linkis.hdfs.rest.errs", @@ -34,12 +44,19 @@ public class LinkisStorageConf { public static final String ROW_BYTE_MAX_LEN_STR = CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue(); + public static final String ENGINE_RESULT_TYPE = + CommonVars.apply("linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); + public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR); public static final String FILE_TYPE = CommonVars.apply( "wds.linkis.storage.file.type", - "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql") + "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql" + + "," + + PARQUET + + "," + + ORC) .getValue(); private static volatile String[] fileTypeArr = null; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java index 308e548f27..7a8452141b 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java @@ -25,7 +25,11 @@ public enum StorageErrorCode { INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"), FS_OOM(53002, "OOM occurred while reading the file"), - FS_ERROR(53003, "Failed to operation fs"); + FS_ERROR(53003, "Failed to operation fs"), + + READ_PARQUET_FAILED(53004, "Failed to read parquet file"), + + READ_ORC_FAILED(53005, "Failed to read orc file"); StorageErrorCode(int errorCode, String message) { this.code = errorCode; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageReadException.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageReadException.java new file mode 100644 index 0000000000..dedad1140c --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageReadException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class StorageReadException extends ErrorException { + + public StorageReadException(int errCode, String desc) { + super(errCode, desc); + } + + public StorageReadException(int errCode, String desc, Throwable t) { + super(errCode, desc); + initCause(t); + } + + public StorageReadException(int errCode, String desc, String ip, int port, String serviceKind) { + super(errCode, desc, ip, port, serviceKind); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java index db78afac29..da51a2f13d 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java @@ -23,6 +23,7 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.storage.FSFactory; +import org.apache.linkis.storage.conf.LinkisStorageConf; import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary; import org.apache.linkis.storage.exception.StorageWarnException; @@ -108,7 +109,9 @@ public boolean exists(String resultSetType) { @Override public boolean isResultSetPath(String path) { - return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX); + return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX) + || path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX) + || path.endsWith(LinkisStorageConf.ORC_FILE_SUFFIX); } @Override @@ -134,15 +137,22 @@ public String[] getResultSetType() { @Override public ResultSet getResultSetByPath(FsPath fsPath, Fs fs) { + ResultSet resultSet = null; try (InputStream inputStream = fs.read(fsPath)) { - String resultSetType = Dolphin.getType(inputStream); - if (StringUtils.isEmpty(resultSetType)) { - throw new StorageWarnException( - THE_FILE_IS_EMPTY.getErrorCode(), - MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath())); + if (fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) { + String resultSetType = Dolphin.getType(inputStream); + if (StringUtils.isEmpty(resultSetType)) { + throw new StorageWarnException( + THE_FILE_IS_EMPTY.getErrorCode(), + MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath())); + } + // Utils.tryQuietly(fs::close); + resultSet = getResultSetByType(resultSetType); + } else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX) + || fsPath.getPath().endsWith(LinkisStorageConf.ORC_FILE_SUFFIX)) { + resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE); } - // Utils.tryQuietly(fs::close); - return getResultSetByType(resultSetType); + return resultSet; } catch (IOException e) { throw new RuntimeException(e); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java new file mode 100644 index 0000000000..249e326cde --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetReader; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.domain.DataType; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.*; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrcResultSetReader + extends ResultSetReader { + + private static final Logger logger = LoggerFactory.getLogger(OrcResultSetReader.class); + + private FsPath fsPath; + + private final ResultSet resultSet; + + private final InputStream inputStream; + + private MetaData metaData; + + private Record row; + + private RecordReader rows; + + private Reader reader; + + public OrcResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath) + throws IOException { + super(resultSet, inputStream); + this.resultSet = resultSet; + this.inputStream = inputStream; + this.fsPath = fsPath; + this.reader = + OrcFile.createReader( + new Path(fsPath.getPath()), OrcFile.readerOptions(new Configuration())); + this.rows = reader.rows(); + } + + @Override + public MetaData getMetaData() { + if (metaData == null) { + try { + List fieldNames = reader.getSchema().getFieldNames(); + List typeDescriptions = reader.getSchema().getChildren(); + List columnList = new ArrayList<>(); + for (int i = 0; i < fieldNames.size(); i++) { + Column column = + new Column( + fieldNames.get(i), + DataType.toDataType(typeDescriptions.get(i).getCategory().getName()), + ""); + columnList.add(column); + } + + metaData = new TableMetaData(columnList.toArray(new Column[0])); + } catch (Exception e) { + throw new RuntimeException("Failed to read parquet schema", e); + } + } + return metaData; + } + + @Override + public int skip(int recordNum) throws IOException { + if (recordNum < 0) return -1; + + for (int i = recordNum; i > 0; i--) { + try { + hasNext(); + } catch (Throwable t) { + return recordNum - i; + } + } + return recordNum; + } + + @Override + public long getPosition() throws IOException { + throw new UnsupportedOperationException("Storeage Unsupported type: getPosition"); + } + + @Override + public long available() throws IOException { + throw new UnsupportedOperationException("Storeage Unsupported type: available"); + } + + @Override + public boolean hasNext() throws IOException { + if (metaData == null) getMetaData(); + if (rows == null) return false; + VectorizedRowBatch batch = + reader.getSchema().createRowBatch(TypeDescription.RowBatchVersion.ORIGINAL, 1); + TableMetaData tableMetaData = (TableMetaData) metaData; + + if (rows.nextBatch(batch)) { + int rowNum = 0; + Object[] rowData = new Object[tableMetaData.getColumns().length]; + for (int i = 0; i < batch.numCols; i++) { + ColumnVector columnVector = batch.cols[i]; + if (columnVector instanceof BytesColumnVector) { + BytesColumnVector vector = (BytesColumnVector) columnVector; + rowData[i] = vector.toString(rowNum); + } else if (columnVector instanceof Decimal64ColumnVector) { + Decimal64ColumnVector vector = (Decimal64ColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof DecimalColumnVector) { + DecimalColumnVector vector = (DecimalColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof DoubleColumnVector) { + DoubleColumnVector vector = (DoubleColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof ListColumnVector) { + ListColumnVector vector = (ListColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof IntervalDayTimeColumnVector) { + IntervalDayTimeColumnVector vector = (IntervalDayTimeColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof LongColumnVector) { + LongColumnVector vector = (LongColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof MapColumnVector) { + MapColumnVector vector = (MapColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof MultiValuedColumnVector) { + MultiValuedColumnVector vector = (MultiValuedColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof StructColumnVector) { + StructColumnVector vector = (StructColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof TimestampColumnVector) { + TimestampColumnVector vector = (TimestampColumnVector) columnVector; + rowData[i] = vector.time[rowNum]; + } else if (columnVector instanceof UnionColumnVector) { + UnionColumnVector vector = (UnionColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } + } + row = new TableRecord(rowData); + } else { + return false; + } + return row != null; + } + + @Override + public Record getRecord() { + if (metaData == null) throw new RuntimeException("Must read metadata first(必须先读取metadata)"); + if (row == null) { + throw new RuntimeException( + "Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)"); + } + return row; + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(inputStream); + rows.close(); + reader.close(); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java new file mode 100644 index 0000000000..c4809f6499 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetWriter; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; +import org.apache.linkis.storage.utils.OrcUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrcResultSetWriter + extends ResultSetWriter { + private static final Logger logger = LoggerFactory.getLogger(OrcResultSetWriter.class); + + private TypeDescription schema; + + private Writer writer; + + private boolean moveToWriteRow = false; + + private MetaData metaData = null; + + private final FsPath storePath; + + private final long maxCacheSize; + + private final ResultSet resultSet; + + public OrcResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) { + super(resultSet, maxCacheSize, storePath); + this.resultSet = resultSet; + this.maxCacheSize = maxCacheSize; + this.storePath = storePath; + } + + @Override + public void addMetaData(MetaData metaData) throws IOException { + if (!moveToWriteRow) { + this.metaData = metaData; + if (this.schema == null) { + this.schema = TypeDescription.createStruct(); + } + TableMetaData tableMetaData = (TableMetaData) this.metaData; + for (Column column : tableMetaData.columns) { + schema.addField(column.getColumnName(), OrcUtils.dataTypeToOrcType(column.getDataType())); + } + moveToWriteRow = true; + if (writer == null) { + writer = + OrcFile.createWriter( + new Path(storePath.getPath()), + OrcFile.writerOptions(new Configuration()) + .setSchema(schema) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.V_0_12)); + } + } + } + + @Override + public void addRecord(Record record) { + if (moveToWriteRow) { + TableRecord tableRecord = (TableRecord) record; + TableMetaData tableMetaData = (TableMetaData) metaData; + try { + Object[] row = tableRecord.row; + VectorizedRowBatch batch = schema.createRowBatch(); + int rowCount = batch.size++; + + for (int i = 0; i < row.length; i++) { + OrcUtils.setColumn( + rowCount, batch.cols[i], tableMetaData.columns[i].getDataType(), row[i]); + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + writer.addRowBatch(batch); + + } catch (IOException e) { + logger.warn("addMetaDataAndRecordString failed", e); + } + } + } + + @Override + public FsPath toFSPath() { + return storePath; + } + + @Override + public String toString() { + return storePath.getSchemaPath(); + } + + @Override + public void addMetaDataAndRecordString(String content) {} + + @Override + public void addRecordString(String content) {} + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public void flush() throws IOException {} +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java new file mode 100644 index 0000000000..c09804294d --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetReader; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.domain.DataType; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ParquetResultSetReader + extends ResultSetReader { + + private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetReader.class); + + private FsPath fsPath; + + private final ResultSet resultSet; + + private final InputStream inputStream; + + private MetaData metaData; + + private Record row; + + private ParquetReader parquetReader; + + private GenericRecord record; + + public ParquetResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath) + throws IOException { + super(resultSet, inputStream); + this.resultSet = resultSet; + this.inputStream = inputStream; + this.fsPath = fsPath; + this.parquetReader = + AvroParquetReader.builder(new Path(fsPath.getPath())).build(); + this.record = parquetReader.read(); + } + + @Override + public MetaData getMetaData() { + if (metaData == null) { + try { + List fields = record.getSchema().getFields(); + List columnList = + fields.stream() + .map( + field -> + new Column( + field.name(), + DataType.toDataType(field.schema().getType().getName()), + "")) + .collect(Collectors.toList()); + + metaData = new TableMetaData(columnList.toArray(new Column[0])); + } catch (Exception e) { + throw new RuntimeException("Failed to read parquet schema", e); + } + } + return metaData; + } + + @Override + public int skip(int recordNum) throws IOException { + if (recordNum < 0) return -1; + + for (int i = recordNum; i > 0; i--) { + try { + this.record = parquetReader.read(); + } catch (Throwable t) { + return recordNum - i; + } + } + return recordNum; + } + + @Override + public long getPosition() throws IOException { + throw new UnsupportedOperationException("Storeage Unsupported type: getPosition"); + } + + @Override + public long available() throws IOException { + throw new UnsupportedOperationException("Storeage Unsupported type: available"); + } + + @Override + public boolean hasNext() throws IOException { + if (metaData == null) getMetaData(); + if (record == null) return false; + ArrayList resultList = new ArrayList<>(); + TableMetaData tableMetaData = (TableMetaData) metaData; + int length = tableMetaData.getColumns().length; + for (int i = 0; i < length; i++) { + resultList.add(record.get(i)); + } + row = new TableRecord(resultList.toArray(new Object[0])); + if (row == null) return false; + return record != null; + } + + @Override + public Record getRecord() { + if (metaData == null) throw new RuntimeException("Must read metadata first(必须先读取metadata)"); + if (row == null) { + throw new RuntimeException( + "Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)"); + } + try { + this.record = parquetReader.read(); + } catch (IOException e) { + throw new RuntimeException("Failed to read parquet record", e); + } + return row; + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(inputStream); + parquetReader.close(); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java new file mode 100644 index 0000000000..6fbac3c8cb --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetWriter; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ParquetResultSetWriter + extends ResultSetWriter { + private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetWriter.class); + + private Schema schema; + + private ParquetWriter parquetWriter; + + private boolean moveToWriteRow = false; + + private MetaData metaData = null; + + private final FsPath storePath; + + private final long maxCacheSize; + + private final ResultSet resultSet; + + public ParquetResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) { + super(resultSet, maxCacheSize, storePath); + this.resultSet = resultSet; + this.maxCacheSize = maxCacheSize; + this.storePath = storePath; + } + + @Override + public void addMetaData(MetaData metaData) throws IOException { + if (!moveToWriteRow) { + this.metaData = metaData; + SchemaBuilder.FieldAssembler fieldAssembler = SchemaBuilder.record("linkis").fields(); + TableMetaData tableMetaData = (TableMetaData) this.metaData; + for (Column column : tableMetaData.columns) { + fieldAssembler + .name(column.getColumnName().replaceAll("\\.", "_").replaceAll("[^a-zA-Z0-9_]", "")) + .doc(column.getComment()) + .type(column.getDataType().getTypeName().toLowerCase()) + .noDefault(); + } + schema = fieldAssembler.endRecord(); + moveToWriteRow = true; + if (parquetWriter == null) { + parquetWriter = + AvroParquetWriter.builder(new Path(storePath.getPath())) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build(); + } + } + } + + @Override + public void addRecord(Record record) { + if (moveToWriteRow) { + TableRecord tableRecord = (TableRecord) record; + try { + Object[] row = tableRecord.row; + GenericRecord genericRecord = new GenericData.Record(schema); + for (int i = 0; i < row.length; i++) { + genericRecord.put(schema.getFields().get(i).name(), row[i]); + } + parquetWriter.write(genericRecord); + } catch (IOException e) { + logger.warn("addMetaDataAndRecordString failed", e); + } + } + } + + @Override + public FsPath toFSPath() { + return storePath; + } + + @Override + public String toString() { + return storePath.getSchemaPath(); + } + + @Override + public void addMetaDataAndRecordString(String content) {} + + @Override + public void addRecordString(String content) {} + + @Override + public void close() throws IOException { + parquetWriter.close(); + } + + @Override + public void flush() throws IOException {} +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 5e56b099d7..bf46f49f9b 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -24,7 +24,11 @@ import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.common.io.resultset.ResultSetReader; import org.apache.linkis.storage.FSFactory; +import org.apache.linkis.storage.conf.LinkisStorageConf; +import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary; +import org.apache.linkis.storage.exception.StorageErrorCode; +import org.apache.linkis.storage.exception.StorageReadException; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.resultset.table.TableMetaData; import org.apache.linkis.storage.resultset.table.TableRecord; @@ -40,8 +44,30 @@ public class ResultSetReaderFactory { private static final Logger logger = LoggerFactory.getLogger(ResultSetReaderFactory.class); public static ResultSetReader getResultSetReader( - ResultSet resultSet, InputStream inputStream) { - return new StorageResultSetReader<>(resultSet, inputStream); + ResultSet resultSet, InputStream inputStream, FsPath fsPath) { + ResultSetReader resultSetReader = null; + if (fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) { + resultSetReader = new StorageResultSetReader<>(resultSet, inputStream); + } else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) { + try { + resultSetReader = new ParquetResultSetReader<>(resultSet, inputStream, fsPath); + } catch (IOException e) { + throw new StorageReadException( + StorageErrorCode.READ_PARQUET_FAILED.getCode(), + StorageErrorCode.READ_PARQUET_FAILED.getMessage(), + e); + } + } else if (fsPath.getPath().endsWith(LinkisStorageConf.ORC_FILE_SUFFIX)) { + try { + resultSetReader = new OrcResultSetReader<>(resultSet, inputStream, fsPath); + } catch (IOException e) { + throw new StorageReadException( + StorageErrorCode.READ_ORC_FAILED.getCode(), + StorageErrorCode.READ_ORC_FAILED.getMessage(), + e); + } + } + return resultSetReader; } public static ResultSetReader getResultSetReader( @@ -61,7 +87,7 @@ public static ResultSetReader getResultSetReader(String res) throws IOException Fs fs = FSFactory.getFs(resPath); fs.init(null); ResultSetReader reader = - ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath)); + ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath), resPath); if (reader instanceof StorageResultSetReader) { ((StorageResultSetReader) reader).setFs(fs); } @@ -96,7 +122,7 @@ public static ResultSetReader getTableResultReader(String res) { InputStream read = fs.read(resPath); return ResultSetReaderFactory.getResultSetReader( - (TableResultSet) resultSet, read); + (TableResultSet) resultSet, read, resPath); } catch (IOException e) { throw new StorageWarnException( LinkisStorageErrorCodeSummary.TABLE_ARE_NOT_SUPPORTED.getErrorCode(), diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java index d70319c9bd..9cb1999367 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java @@ -22,6 +22,9 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.common.io.resultset.ResultSetReader; +import org.apache.linkis.common.io.resultset.ResultSetWriter; +import org.apache.linkis.storage.conf.LinkisStorageConf; +import org.apache.linkis.storage.resultset.table.TableResultSet; import java.io.IOException; import java.util.ArrayList; @@ -33,26 +36,40 @@ public class ResultSetWriterFactory { private static final Logger logger = LoggerFactory.getLogger(ResultSetWriterFactory.class); - public static - org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( - ResultSet resultSet, long maxCacheSize, FsPath storePath) { - return new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + public static ResultSetWriter getResultSetWriter( + ResultSet resultSet, long maxCacheSize, FsPath storePath) { + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + ResultSetWriter writer = null; + if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) { + writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else if (engineResultType.equals(LinkisStorageConf.ORC) + && resultSet instanceof TableResultSet) { + writer = new OrcResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else { + writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + } + return writer; } - public static - org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( - ResultSet resultSet, long maxCacheSize, FsPath storePath, String proxyUser) { - StorageResultSetWriter writer = - new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); - writer.setProxyUser(proxyUser); + public static ResultSetWriter getResultSetWriter( + ResultSet resultSet, long maxCacheSize, FsPath storePath, String proxyUser) { + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + ResultSetWriter writer = null; + if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) { + writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else if (engineResultType.equals(LinkisStorageConf.ORC) + && resultSet instanceof TableResultSet) { + writer = new OrcResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else { + writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + StorageResultSetWriter storageResultSetWriter = (StorageResultSetWriter) writer; + storageResultSetWriter.setProxyUser(proxyUser); + } return writer; } public static Record[] getRecordByWriter( - org.apache.linkis.common.io.resultset.ResultSetWriter - writer, - long limit) - throws IOException { + ResultSetWriter writer, long limit) throws IOException { String res = writer.toString(); return getRecordByRes(res, limit); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java index c83661de2e..c708f5faf6 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java @@ -21,7 +21,9 @@ import org.apache.linkis.common.io.MetaData; import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.storage.conf.LinkisStorageConf; import org.apache.linkis.storage.domain.Dolphin; +import org.apache.linkis.storage.resultset.table.TableResultSet; import org.apache.linkis.storage.utils.StorageConfiguration; import org.slf4j.Logger; @@ -49,10 +51,17 @@ public String charset() { @Override public FsPath getResultSetPath(FsPath parentDir, String fileName) { + String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; + String fileSuffix = Dolphin.DOLPHIN_FILE_SUFFIX; + if (engineResultType.equals(LinkisStorageConf.PARQUET) && this instanceof TableResultSet) { + fileSuffix = LinkisStorageConf.PARQUET_FILE_SUFFIX; + } else if (engineResultType.equals(LinkisStorageConf.ORC) && this instanceof TableResultSet) { + fileSuffix = LinkisStorageConf.ORC_FILE_SUFFIX; + } final String path = parentDir.getPath().endsWith("/") - ? parentDir.getUriString() + fileName + Dolphin.DOLPHIN_FILE_SUFFIX - : parentDir.getUriString() + "/" + fileName + Dolphin.DOLPHIN_FILE_SUFFIX; + ? parentDir.getUriString() + fileName + fileSuffix + : parentDir.getUriString() + "/" + fileName + fileSuffix; logger.info("Get result set path: {}", path); return new FsPath(path); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java index ea513664bd..230c68301c 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java @@ -17,15 +17,19 @@ package org.apache.linkis.storage.resultset; -import org.apache.linkis.common.io.*; -import org.apache.linkis.common.io.resultset.*; +import org.apache.linkis.common.io.Fs; +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSerializer; +import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.common.io.resultset.ResultSetWriter; -import org.apache.linkis.common.utils.*; -import org.apache.linkis.storage.*; -import org.apache.linkis.storage.conf.*; -import org.apache.linkis.storage.domain.*; +import org.apache.linkis.storage.FSFactory; +import org.apache.linkis.storage.conf.LinkisStorageConf; +import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.exception.StorageErrorException; -import org.apache.linkis.storage.utils.*; +import org.apache.linkis.storage.utils.FileSystemUtils; +import org.apache.linkis.storage.utils.StorageUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index 0ed650186d..f7a5d3e512 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -73,7 +73,9 @@ public interface FileSource extends Closeable { (path, suffix) -> path.endsWith("." + suffix); static boolean isResultSet(String path) { - return suffixPredicate.apply(path, fileType[0]); + return suffixPredicate.apply(path, LinkisStorageConf.DOLPHIN) + || suffixPredicate.apply(path, LinkisStorageConf.PARQUET) + || suffixPredicate.apply(path, LinkisStorageConf.ORC); } static boolean isResultSet(FsPath fsPath) { @@ -137,7 +139,8 @@ static FileSource create(FsPath fsPath, InputStream is) { static FileSplit createResultSetFileSplit(FsPath fsPath, InputStream is) { logger.info("try create result set file split with path:{}", fsPath.getPath()); ResultSet resultset = ResultSetFactory.getInstance().getResultSetByPath(fsPath); - ResultSetReader resultsetReader = ResultSetReaderFactory.getResultSetReader(resultset, is); + ResultSetReader resultsetReader = + ResultSetReaderFactory.getResultSetReader(resultset, is, fsPath); return new FileSplit(resultsetReader, resultset.resultSetType()); } @@ -145,7 +148,8 @@ static FileSplit createResultSetFileSplit(FsPath fsPath, Fs fs) { ResultSet resultset = ResultSetFactory.getInstance().getResultSetByPath(fsPath, fs); ResultSetReader resultsetReader = null; try { - resultsetReader = ResultSetReaderFactory.getResultSetReader(resultset, fs.read(fsPath)); + resultsetReader = + ResultSetReaderFactory.getResultSetReader(resultset, fs.read(fsPath), fsPath); } catch (IOException e) { logger.warn("FileSource createResultSetFileSplit failed", e); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java new file mode 100644 index 0000000000..72c2f8c355 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.utils; + +import org.apache.linkis.storage.domain.DataType; + +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +/** + * Inspired by: + * https://github.com/apache/flink/blob/master/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java + */ +public class OrcUtils { + + public static TypeDescription dataTypeToOrcType(DataType type) { + switch (type) { + case CharType: + return TypeDescription.createChar().withMaxLength(1024); + case StringType: + return TypeDescription.createString(); + case LongType: + return TypeDescription.createLong(); + case VarcharType: + return TypeDescription.createVarchar().withMaxLength(1024); + case BooleanType: + return TypeDescription.createBoolean(); + case BinaryType: + return TypeDescription.createBinary(); + case DecimalType: + return TypeDescription.createDecimal().withScale(10).withPrecision(38); + case TinyIntType: + return TypeDescription.createByte(); + case ShortIntType: + return TypeDescription.createShort(); + case IntType: + return TypeDescription.createInt(); + case BigIntType: + return TypeDescription.createLong(); + case FloatType: + return TypeDescription.createFloat(); + case DoubleType: + return TypeDescription.createDouble(); + case DateType: + return TypeDescription.createDate(); + case TimestampType: + return TypeDescription.createTimestamp(); + case ArrayType: + return TypeDescription.createList(dataTypeToOrcType(DataType.VarcharType)); + case MapType: + return TypeDescription.createMap( + dataTypeToOrcType(DataType.VarcharType), dataTypeToOrcType(DataType.VarcharType)); + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + public static void setColumn(int columnId, ColumnVector column, DataType type, Object value) { + switch (type) { + case CharType: + case VarcharType: + case BinaryType: + case StringType: + { + BytesColumnVector vector = (BytesColumnVector) column; + vector.setVal(columnId, String.valueOf(value).getBytes()); + break; + } + case BooleanType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Boolean.valueOf(value.toString()) ? 1 : 0; + break; + } + case DecimalType: + { + DecimalColumnVector vector = (DecimalColumnVector) column; + vector.set(columnId, HiveDecimal.create(new BigDecimal(value.toString()))); + break; + } + case TinyIntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = (byte) value; + break; + } + case DateType: + case IntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Integer.valueOf(value.toString()); + break; + } + case BigIntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Long.valueOf(value.toString()); + break; + } + case FloatType: + { + DoubleColumnVector vector = (DoubleColumnVector) column; + vector.vector[columnId] = Float.valueOf(value.toString()); + break; + } + case DoubleType: + { + DoubleColumnVector vector = (DoubleColumnVector) column; + vector.vector[columnId] = Double.valueOf(value.toString()); + break; + } + case TimestampType: + { + TimestampColumnVector vector = (TimestampColumnVector) column; + try { + vector.set( + columnId, + new Timestamp( + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ") + .parse(value.toString()) + .getTime())); + } catch (ParseException e) { + vector.set(columnId, new Timestamp(System.currentTimeMillis())); + } + break; + } + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageHelper.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageHelper.java index e1dee151ca..491c3d7af4 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageHelper.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageHelper.java @@ -80,7 +80,8 @@ public static void getTableResLines(String[] args) { resultSetFactory.getResultSetByType(ResultSetFactory.TABLE_TYPE); Fs fs = FSFactory.getFs(resPath); fs.init(null); - resultSetReader = ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath)); + resultSetReader = + ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath), resPath); TableMetaData metaData = (TableMetaData) resultSetReader.getMetaData(); Arrays.stream(metaData.getColumns()).forEach(column -> logger.info(column.toString())); int num = 0; @@ -116,7 +117,7 @@ public static void getTableRes(String[] args) { fs.init(null); ResultSetReader reader = - ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath)); + ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath), resPath); MetaData rmetaData = reader.getMetaData(); Arrays.stream(((TableMetaData) rmetaData).getColumns()) .forEach(column -> logger.info(column.toString())); diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index 189ab711fd..0256689e1c 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -404,6 +404,10 @@ public Message getDirFileTrees( FsPathListWithError fsPathListWithError = fileSystem.listPathWithError(fsPath); if (fsPathListWithError != null) { for (FsPath children : fsPathListWithError.getFsPaths()) { + // parquet and orc compatible, skipping.crc files + if (children.getPath().endsWith(".crc")) { + continue; + } DirFileTree dirFileTreeChildren = new DirFileTree(); dirFileTreeChildren.setName(new File(children.getPath()).getName()); dirFileTreeChildren.setPath(children.getSchemaPath()); diff --git a/pom.xml b/pom.xml index 5af89dfaca..d27ef8f9a8 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,9 @@ ${hadoop.version} provided + provided + provided + 1.16.2 0.9.3 1.3.0 @@ -154,6 +157,9 @@ 3.16.3 + 1.10.0 + 1.5.8 + 1.19.4 2.23.1 4.1.86.Final @@ -1941,5 +1947,19 @@ + + + storage-parquet + + compile + + + + + storage-orc + + compile + + diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index b826a67495..d39673e6ff 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -34,6 +34,8 @@ automaton-1.11-8.jar avatica-1.11.0.jar avro-1.7.7.jar avro-1.8.2.jar +avro-1.10.1.jar +avro-1.10.2.jar aws-java-sdk-core-1.11.792.jar aws-java-sdk-kms-1.11.792.jar aws-java-sdk-s3-1.11.792.jar @@ -118,6 +120,7 @@ expiringmap-0.5.6.jar failsafe-2.4.0.jar failureaccess-1.0.1.jar fastutil-6.5.6.jar +fastutil-7.0.13.jar feign-core-10.12.jar feign-form-3.8.0.jar feign-form-spring-3.8.0.jar @@ -506,6 +509,7 @@ opentracing-api-0.33.0.jar opentracing-noop-0.33.0.jar opentracing-util-0.33.0.jar orc-core-1.5.8.jar +orc-core-1.5.8-nohive.jar orc-shims-1.5.8.jar org.jacoco.agent-0.8.5-runtime.jar osgi-resource-locator-1.0.1.jar @@ -514,6 +518,19 @@ pagehelper-5.3.1.jar paranamer-2.3.jar paranamer-2.8.jar parquet-hadoop-bundle-1.10.0.jar +parquet-avro-1.10.0.jar +parquet-column-1.10.0.jar +parquet-column-1.12.2.jar +parquet-common-1.10.0.jar +parquet-common-1.12.2.jar +parquet-encoding-1.10.0.jar +parquet-encoding-1.12.2.jar +parquet-format-2.4.0.jar +parquet-format-structures-1.12.2.jar +parquet-hadoop-1.10.0.jar +parquet-hadoop-1.12.2.jar +parquet-jackson-1.10.0.jar +parquet-jackson-1.12.2.jar poi-5.2.3.jar poi-ooxml-5.2.3.jar poi-ooxml-lite-5.2.3.jar @@ -578,6 +595,7 @@ snakeyaml-1.33.jar snappy-java-1.1.4.jar snappy-java-1.1.7.7.jar snappy-java-1.1.8.2.jar +snappy-java-1.1.8.4.jar spark-doris-connector-3.2_2.12-1.2.0.jar spark-redis_2.12-2.6.0.jar spring-aop-5.2.23.RELEASE.jar @@ -683,8 +701,11 @@ zookeeper-3.5.9.jar zookeeper-jute-3.5.9.jar zstd-jni-1.4.4-7.jar zstd-jni-1.4.5-6.jar +zstd-jni-1.4.9-1.jar +zstd-jni-1.5.0-4.jar zjsonpatch-0.3.0.jar agrona-1.12.0.jar +audience-annotations-0.12.0.jar audience-annotations-0.13.0.jar commons-crypto-1.1.0.jar disruptor-3.4.2.jar