diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 916f9ff469..2c4509486a 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -41,7 +41,7 @@ public class LinkisStorageConf { CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue(); public static final String ENGINE_RESULT_TYPE = - CommonVars.apply("wds.linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); + CommonVars.apply("linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java index 9ac4c02cc7..e75a2d96bc 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/DefaultResultSetFactory.java @@ -109,7 +109,8 @@ public boolean exists(String resultSetType) { @Override public boolean isResultSetPath(String path) { - return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX); + return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX) + || path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX); } @Override @@ -138,7 +139,8 @@ public String[] getResultSetType() { ResultSet resultSet = null; try (InputStream inputStream = fs.read(fsPath)) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { + if (engineResultType.equals(LinkisStorageConf.DOLPHIN) + || fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) { String resultSetType = Dolphin.getType(inputStream); if (StringUtils.isEmpty(resultSetType)) { throw new StorageWarnException( @@ -147,7 +149,8 @@ public String[] getResultSetType() { } // Utils.tryQuietly(fs::close); resultSet = getResultSetByType(resultSetType); - } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { + } else if (engineResultType.equals(LinkisStorageConf.PARQUET) + || fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) { resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE); } return resultSet; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java index 07c6c4cd7a..c4c43d6f29 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetReader.java @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.MetaData; import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetReader; import org.apache.linkis.storage.domain.Column; import org.apache.linkis.storage.domain.DataType; import org.apache.linkis.storage.resultset.table.TableMetaData; @@ -28,6 +29,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; @@ -42,12 +44,20 @@ import org.slf4j.LoggerFactory; public class ParquetResultSetReader - extends StorageResultSetReader { + extends ResultSetReader { private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetReader.class); private FsPath fsPath; + private final ResultSet resultSet; + + private final InputStream inputStream; + + private MetaData metaData; + + private Record row; + private ParquetReader parquetReader; private GenericRecord record; @@ -55,6 +65,8 @@ public class ParquetResultSetReader public ParquetResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath) throws IOException { super(resultSet, inputStream); + this.resultSet = resultSet; + this.inputStream = inputStream; this.fsPath = fsPath; this.parquetReader = AvroParquetReader.builder(new Path(fsPath.getPath())).build(); @@ -84,6 +96,21 @@ public MetaData getMetaData() { return metaData; } + @Override + public int skip(int recordNum) throws IOException { + return 0; + } + + @Override + public long getPosition() throws IOException { + return 0; + } + + @Override + public long available() throws IOException { + return 0; + } + @Override public boolean hasNext() throws IOException { if (metaData == null) getMetaData(); @@ -116,7 +143,7 @@ public Record getRecord() { @Override public void close() throws IOException { - super.close(); + IOUtils.closeQuietly(inputStream); parquetReader.close(); } } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java index 31dd6c7a5c..6fbac3c8cb 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ParquetResultSetWriter.java @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.MetaData; import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; +import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.storage.domain.Column; import org.apache.linkis.storage.resultset.table.TableMetaData; import org.apache.linkis.storage.resultset.table.TableRecord; @@ -41,21 +42,36 @@ import org.slf4j.LoggerFactory; public class ParquetResultSetWriter - extends StorageResultSetWriter { + extends ResultSetWriter { private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetWriter.class); - private Schema schema = null; + private Schema schema; + + private ParquetWriter parquetWriter; + + private boolean moveToWriteRow = false; + + private MetaData metaData = null; + + private final FsPath storePath; + + private final long maxCacheSize; + + private final ResultSet resultSet; public ParquetResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) { super(resultSet, maxCacheSize, storePath); + this.resultSet = resultSet; + this.maxCacheSize = maxCacheSize; + this.storePath = storePath; } @Override public void addMetaData(MetaData metaData) throws IOException { if (!moveToWriteRow) { - rMetaData = metaData; + this.metaData = metaData; SchemaBuilder.FieldAssembler fieldAssembler = SchemaBuilder.record("linkis").fields(); - TableMetaData tableMetaData = (TableMetaData) rMetaData; + TableMetaData tableMetaData = (TableMetaData) this.metaData; for (Column column : tableMetaData.columns) { fieldAssembler .name(column.getColumnName().replaceAll("\\.", "_").replaceAll("[^a-zA-Z0-9_]", "")) @@ -65,31 +81,55 @@ public void addMetaData(MetaData metaData) throws IOException { } schema = fieldAssembler.endRecord(); moveToWriteRow = true; + if (parquetWriter == null) { + parquetWriter = + AvroParquetWriter.builder(new Path(storePath.getPath())) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build(); + } } } @Override public void addRecord(Record record) { if (moveToWriteRow) { + TableRecord tableRecord = (TableRecord) record; try { - TableRecord tableRecord = (TableRecord) record; Object[] row = tableRecord.row; - try (ParquetWriter writer = - AvroParquetWriter.builder(new Path(storePath.getPath())) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build()) { - - GenericRecord genericRecord = new GenericData.Record(schema); - for (int i = 0; i < row.length; i++) { - genericRecord.put(schema.getFields().get(i).name(), row[i]); - } - writer.write(genericRecord); + GenericRecord genericRecord = new GenericData.Record(schema); + for (int i = 0; i < row.length; i++) { + genericRecord.put(schema.getFields().get(i).name(), row[i]); } - } catch (Exception e) { + parquetWriter.write(genericRecord); + } catch (IOException e) { logger.warn("addMetaDataAndRecordString failed", e); } } } + + @Override + public FsPath toFSPath() { + return storePath; + } + + @Override + public String toString() { + return storePath.getSchemaPath(); + } + + @Override + public void addMetaDataAndRecordString(String content) {} + + @Override + public void addRecordString(String content) {} + + @Override + public void close() throws IOException { + parquetWriter.close(); + } + + @Override + public void flush() throws IOException {} } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 03434abdbe..36dbd130b0 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -43,7 +43,7 @@ public class ResultSetReaderFactory { public static ResultSetReader getResultSetReader( ResultSet resultSet, InputStream inputStream, FsPath fsPath) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - StorageResultSetReader resultSetReader = null; + ResultSetReader resultSetReader = null; if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { resultSetReader = new StorageResultSetReader<>(resultSet, inputStream); } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java index 3e50692186..35df51e779 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetWriterFactory.java @@ -22,6 +22,7 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSet; import org.apache.linkis.common.io.resultset.ResultSetReader; +import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.storage.conf.LinkisStorageConf; import java.io.IOException; @@ -34,11 +35,10 @@ public class ResultSetWriterFactory { private static final Logger logger = LoggerFactory.getLogger(ResultSetWriterFactory.class); - public static - org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( - ResultSet resultSet, long maxCacheSize, FsPath storePath) { + public static ResultSetWriter getResultSetWriter( + ResultSet resultSet, long maxCacheSize, FsPath storePath) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - StorageResultSetWriter writer = null; + ResultSetWriter writer = null; if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { @@ -47,25 +47,22 @@ org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( return writer; } - public static - org.apache.linkis.common.io.resultset.ResultSetWriter getResultSetWriter( - ResultSet resultSet, long maxCacheSize, FsPath storePath, String proxyUser) { + public static ResultSetWriter getResultSetWriter( + ResultSet resultSet, long maxCacheSize, FsPath storePath, String proxyUser) { String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE; - StorageResultSetWriter writer = null; + ResultSetWriter writer = null; if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) { writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath); + StorageResultSetWriter storageResultSetWriter = (StorageResultSetWriter) writer; + storageResultSetWriter.setProxyUser(proxyUser); } else if (engineResultType.equals(LinkisStorageConf.PARQUET)) { writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath); } - writer.setProxyUser(proxyUser); return writer; } public static Record[] getRecordByWriter( - org.apache.linkis.common.io.resultset.ResultSetWriter - writer, - long limit) - throws IOException { + ResultSetWriter writer, long limit) throws IOException { String res = writer.toString(); return getRecordByRes(res, limit); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java index 595ebda983..c0222cc848 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java @@ -43,10 +43,10 @@ public class StorageResultSetReader private static final Logger logger = LoggerFactory.getLogger(StorageResultSetReader.class); private final ResultSet resultSet; - public final InputStream inputStream; - public final ResultDeserializer deserializer; - public K metaData; - public Record row; + private final InputStream inputStream; + private final ResultDeserializer deserializer; + private K metaData; + private Record row; private int colCount = 0; private int rowCount = 0; private Fs fs; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java index bdea8ee4e7..230c68301c 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetWriter.java @@ -50,15 +50,15 @@ public class StorageResultSetWriter private final ResultSet resultSet; private final long maxCacheSize; - public final FsPath storePath; + private final FsPath storePath; private final ResultSerializer serializer; - public boolean moveToWriteRow = false; + private boolean moveToWriteRow = false; private OutputStream outputStream = null; private int rowCount = 0; private final List buffer = new ArrayList(); private Fs fs = null; - public MetaData rMetaData = null; + private MetaData rMetaData = null; private String proxyUser = StorageUtils.getJvmUser(); private boolean fileCreated = false; private boolean closed = false; diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index d4487ac37a..cfaf437f72 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -701,6 +701,7 @@ zstd-jni-1.4.9-1.jar zstd-jni-1.5.0-4.jar zjsonpatch-0.3.0.jar agrona-1.12.0.jar +audience-annotations-0.12.0.jar audience-annotations-0.13.0.jar commons-crypto-1.1.0.jar disruptor-3.4.2.jar