diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index c9c610bcbd..93628db808 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -103,7 +103,7 @@ org.apache.parquet parquet-avro - 1.12.0 + ${parquet-avro.version} org.apache.hadoop @@ -129,6 +129,19 @@ + + org.apache.orc + orc-core + ${orc-core.version} + nohive + + + org.apache.hive + hive-storage-api + + + + diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 2c4509486a..58ff80b8c5 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -31,6 +31,10 @@ public class LinkisStorageConf { public static final String PARQUET_FILE_SUFFIX = ".parquet"; + public static final String ORC = "orc"; + + public static final String ORC_FILE_SUFFIX = ".orc"; + public static final String HDFS_FILE_SYSTEM_REST_ERRS = CommonVars.apply( "wds.linkis.hdfs.rest.errs", @@ -41,7 +45,7 @@ public class LinkisStorageConf { CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue(); public static final String ENGINE_RESULT_TYPE = - CommonVars.apply("linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); + CommonVars.apply("linkis.engine.resultSet.type", ORC, "Result type").getValue(); public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR); @@ -50,7 +54,9 @@ public class LinkisStorageConf { "wds.linkis.storage.file.type", "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql" + "," - + PARQUET) + + PARQUET + + "," + + ORC) .getValue(); private static volatile String[] fileTypeArr = null; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java index d1d60d8a2c..da51a2f13d 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java @@ -110,7 +110,8 @@ public boolean exists(String resultSetType) { @Override public boolean isResultSetPath(String path) { return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX) - || path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX); + || path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX) + || path.endsWith(LinkisStorageConf.ORC_FILE_SUFFIX); } @Override @@ -147,7 +148,8 @@ public String[] getResultSetType() { } // Utils.tryQuietly(fs::close); resultSet = getResultSetByType(resultSetType); - } else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) { + } else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX) + || fsPath.getPath().endsWith(LinkisStorageConf.ORC_FILE_SUFFIX)) { resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE); } return resultSet; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java new file mode 100644 index 0000000000..7061cfcd48 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetReader.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetReader; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.domain.DataType; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.*; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrcResultSetReader + extends ResultSetReader { + + private static final Logger logger = LoggerFactory.getLogger(OrcResultSetReader.class); + + private FsPath fsPath; + + private final ResultSet resultSet; + + private final InputStream inputStream; + + private MetaData metaData; + + private Record row; + + private RecordReader rows; + + private Reader reader; + + public OrcResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath) + throws IOException { + super(resultSet, inputStream); + this.resultSet = resultSet; + this.inputStream = inputStream; + this.fsPath = fsPath; + this.reader = + OrcFile.createReader( + new Path(fsPath.getPath()), OrcFile.readerOptions(new Configuration())); + this.rows = reader.rows(); + } + + @Override + public MetaData getMetaData() { + if (metaData == null) { + try { + List fieldNames = reader.getSchema().getFieldNames(); + List typeDescriptions = reader.getSchema().getChildren(); + List columnList = new ArrayList<>(); + for (int i = 0; i < fieldNames.size(); i++) { + Column column = + new Column( + fieldNames.get(i), + DataType.toDataType(typeDescriptions.get(i).getCategory().getName()), + ""); + columnList.add(column); + } + + metaData = new TableMetaData(columnList.toArray(new Column[0])); + } catch (Exception e) { + throw new RuntimeException("Failed to read parquet schema", e); + } + } + return metaData; + } + + @Override + public int skip(int recordNum) throws IOException { + return 0; + } + + @Override + public long getPosition() throws IOException { + return 0; + } + + @Override + public long available() throws IOException { + return 0; + } + + @Override + public boolean hasNext() throws IOException { + if (metaData == null) getMetaData(); + if (rows == null) return false; + VectorizedRowBatch batch = + reader.getSchema().createRowBatch(TypeDescription.RowBatchVersion.ORIGINAL, 1); + TableMetaData tableMetaData = (TableMetaData) metaData; + + if (rows.nextBatch(batch)) { + int rowNum = 0; + Object[] rowData = new Object[tableMetaData.getColumns().length]; + for (int i = 0; i < batch.numCols; i++) { + ColumnVector columnVector = batch.cols[i]; + if (columnVector instanceof BytesColumnVector) { + BytesColumnVector vector = (BytesColumnVector) columnVector; + rowData[i] = vector.toString(rowNum); + } else if (columnVector instanceof Decimal64ColumnVector) { + Decimal64ColumnVector vector = (Decimal64ColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof DecimalColumnVector) { + DecimalColumnVector vector = (DecimalColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof DoubleColumnVector) { + DoubleColumnVector vector = (DoubleColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof ListColumnVector) { + ListColumnVector vector = (ListColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof IntervalDayTimeColumnVector) { + IntervalDayTimeColumnVector vector = (IntervalDayTimeColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof LongColumnVector) { + LongColumnVector vector = (LongColumnVector) columnVector; + rowData[i] = vector.vector[rowNum]; + } else if (columnVector instanceof MapColumnVector) { + MapColumnVector vector = (MapColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof MultiValuedColumnVector) { + MultiValuedColumnVector vector = (MultiValuedColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof StructColumnVector) { + StructColumnVector vector = (StructColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } else if (columnVector instanceof TimestampColumnVector) { + TimestampColumnVector vector = (TimestampColumnVector) columnVector; + rowData[i] = vector.time[rowNum]; + } else if (columnVector instanceof UnionColumnVector) { + UnionColumnVector vector = (UnionColumnVector) columnVector; + StringBuilder builder = new StringBuilder(); + vector.stringifyValue(builder, rowNum); + rowData[i] = builder.toString(); + } + } + row = new TableRecord(rowData); + } else { + return false; + } + return row != null; + } + + @Override + public Record getRecord() { + if (metaData == null) throw new RuntimeException("Must read metadata first(必须先读取metadata)"); + if (row == null) { + throw new RuntimeException( + "Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)"); + } + return row; + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(inputStream); + rows.close(); + reader.close(); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java new file mode 100644 index 0000000000..c4809f6499 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/OrcResultSetWriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.resultset; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetWriter; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; +import org.apache.linkis.storage.utils.OrcUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrcResultSetWriter + extends ResultSetWriter { + private static final Logger logger = LoggerFactory.getLogger(OrcResultSetWriter.class); + + private TypeDescription schema; + + private Writer writer; + + private boolean moveToWriteRow = false; + + private MetaData metaData = null; + + private final FsPath storePath; + + private final long maxCacheSize; + + private final ResultSet resultSet; + + public OrcResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) { + super(resultSet, maxCacheSize, storePath); + this.resultSet = resultSet; + this.maxCacheSize = maxCacheSize; + this.storePath = storePath; + } + + @Override + public void addMetaData(MetaData metaData) throws IOException { + if (!moveToWriteRow) { + this.metaData = metaData; + if (this.schema == null) { + this.schema = TypeDescription.createStruct(); + } + TableMetaData tableMetaData = (TableMetaData) this.metaData; + for (Column column : tableMetaData.columns) { + schema.addField(column.getColumnName(), OrcUtils.dataTypeToOrcType(column.getDataType())); + } + moveToWriteRow = true; + if (writer == null) { + writer = + OrcFile.createWriter( + new Path(storePath.getPath()), + OrcFile.writerOptions(new Configuration()) + .setSchema(schema) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.V_0_12)); + } + } + } + + @Override + public void addRecord(Record record) { + if (moveToWriteRow) { + TableRecord tableRecord = (TableRecord) record; + TableMetaData tableMetaData = (TableMetaData) metaData; + try { + Object[] row = tableRecord.row; + VectorizedRowBatch batch = schema.createRowBatch(); + int rowCount = batch.size++; + + for (int i = 0; i < row.length; i++) { + OrcUtils.setColumn( + rowCount, batch.cols[i], tableMetaData.columns[i].getDataType(), row[i]); + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + writer.addRowBatch(batch); + + } catch (IOException e) { + logger.warn("addMetaDataAndRecordString failed", e); + } + } + } + + @Override + public FsPath toFSPath() { + return storePath; + } + + @Override + public String toString() { + return storePath.getSchemaPath(); + } + + @Override + public void addMetaDataAndRecordString(String content) {} + + @Override + public void addRecordString(String content) {} + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public void flush() throws IOException {} +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 8e20de99e6..c38a5ac250 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -52,6 +52,12 @@ public static ResultSetReader getResultSe } catch (IOException e) { throw new RuntimeException("Failed to read parquet", e); } + } else if (fsPath.getPath().endsWith(LinkisStorageConf.ORC_FILE_SUFFIX)) { + try { + resultSetReader = new OrcResultSetReader<>(resultSet, inputStream, fsPath); + } catch (IOException e) { + throw new RuntimeException("Failed to read parquet", e); + } } return resultSetReader; } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java index 981ce1b023..9cb1999367 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java @@ -42,6 +42,9 @@ public static ResultSetWriter getRe ResultSetWriter writer = null; if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) { writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else if (engineResultType.equals(LinkisStorageConf.ORC) + && resultSet instanceof TableResultSet) { + writer = new OrcResultSetWriter<>(resultSet, maxCacheSize, storePath); } else { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); } @@ -54,6 +57,9 @@ public static ResultSetWriter getRe ResultSetWriter writer = null; if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) { writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); + } else if (engineResultType.equals(LinkisStorageConf.ORC) + && resultSet instanceof TableResultSet) { + writer = new OrcResultSetWriter<>(resultSet, maxCacheSize, storePath); } else { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); StorageResultSetWriter storageResultSetWriter = (StorageResultSetWriter) writer; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java index 5c25837d8f..c708f5faf6 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSet.java @@ -55,6 +55,8 @@ public FsPath getResultSetPath(FsPath parentDir, String fileName) { String fileSuffix = Dolphin.DOLPHIN_FILE_SUFFIX; if (engineResultType.equals(LinkisStorageConf.PARQUET) && this instanceof TableResultSet) { fileSuffix = LinkisStorageConf.PARQUET_FILE_SUFFIX; + } else if (engineResultType.equals(LinkisStorageConf.ORC) && this instanceof TableResultSet) { + fileSuffix = LinkisStorageConf.ORC_FILE_SUFFIX; } final String path = parentDir.getPath().endsWith("/") diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index df7ae931a6..b3071ac410 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -74,7 +74,8 @@ public interface FileSource extends Closeable { static boolean isResultSet(String path) { return suffixPredicate.apply(path, fileType[0]) - || suffixPredicate.apply(path, LinkisStorageConf.PARQUET); + || suffixPredicate.apply(path, LinkisStorageConf.PARQUET) + || suffixPredicate.apply(path, LinkisStorageConf.ORC); } static boolean isResultSet(FsPath fsPath) { diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java new file mode 100644 index 0000000000..8c949a5c5e --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/OrcUtils.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.utils; + +import org.apache.linkis.storage.domain.DataType; + +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +/** + * Inspired by: + * https://github.com/apache/flink/blob/master/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java + */ +public class OrcUtils { + + public static TypeDescription dataTypeToOrcType(DataType type) { + switch (type) { + case CharType: + return TypeDescription.createChar().withMaxLength(1024); + case StringType: + return TypeDescription.createString(); + case VarcharType: + return TypeDescription.createVarchar().withMaxLength(1024); + case BooleanType: + return TypeDescription.createBoolean(); + case BinaryType: + return TypeDescription.createBinary(); + case DecimalType: + return TypeDescription.createDecimal().withScale(10).withPrecision(38); + case TinyIntType: + return TypeDescription.createByte(); + case ShortIntType: + return TypeDescription.createShort(); + case IntType: + return TypeDescription.createInt(); + case BigIntType: + return TypeDescription.createLong(); + case FloatType: + return TypeDescription.createFloat(); + case DoubleType: + return TypeDescription.createDouble(); + case DateType: + return TypeDescription.createDate(); + case TimestampType: + return TypeDescription.createTimestamp(); + case ArrayType: + return TypeDescription.createList(dataTypeToOrcType(DataType.VarcharType)); + case MapType: + return TypeDescription.createMap( + dataTypeToOrcType(DataType.VarcharType), dataTypeToOrcType(DataType.VarcharType)); + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + public static void setColumn(int columnId, ColumnVector column, DataType type, Object value) { + switch (type) { + case CharType: + case VarcharType: + case BinaryType: + case StringType: + { + BytesColumnVector vector = (BytesColumnVector) column; + vector.setVal(columnId, String.valueOf(value).getBytes()); + break; + } + case BooleanType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Boolean.valueOf(value.toString()) ? 1 : 0; + break; + } + case DecimalType: + { + DecimalColumnVector vector = (DecimalColumnVector) column; + vector.set(columnId, HiveDecimal.create(new BigDecimal(value.toString()))); + break; + } + case TinyIntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = (byte) value; + break; + } + case DateType: + case IntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Integer.valueOf(value.toString()); + break; + } + case BigIntType: + { + LongColumnVector vector = (LongColumnVector) column; + vector.vector[columnId] = Long.valueOf(value.toString()); + break; + } + case FloatType: + { + DoubleColumnVector vector = (DoubleColumnVector) column; + vector.vector[columnId] = Float.valueOf(value.toString()); + break; + } + case DoubleType: + { + DoubleColumnVector vector = (DoubleColumnVector) column; + vector.vector[columnId] = Double.valueOf(value.toString()); + break; + } + case TimestampType: + { + TimestampColumnVector vector = (TimestampColumnVector) column; + try { + vector.set( + columnId, + new Timestamp( + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ") + .parse(value.toString()) + .getTime())); + } catch (ParseException e) { + vector.set(columnId, new Timestamp(System.currentTimeMillis())); + } + break; + } + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } +} diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index 2af3caaaba..d1113569df 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -404,9 +404,9 @@ public Message getDirFileTrees( FsPathListWithError fsPathListWithError = fileSystem.listPathWithError(fsPath); if (fsPathListWithError != null) { for (FsPath children : fsPathListWithError.getFsPaths()) { - // 兼容parquet,跳过parquet.crc文件 - if (children.getPath().endsWith(".parquet.crc")) { - continue; + // 兼容parquet和orc,跳过.crc文件 + if (children.getPath().endsWith(".crc")) { + continue; } DirFileTree dirFileTreeChildren = new DirFileTree(); dirFileTreeChildren.setName(new File(children.getPath()).getName()); diff --git a/pom.xml b/pom.xml index 327a455a30..d0d925a9a1 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,9 @@ 3.16.3 + 1.10.0 + 1.5.8 + 1.19.4 2.23.1 4.1.86.Final