Skip to content

Commit

Permalink
Support for storing result sets in Parquet format
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Dec 6, 2023
1 parent 4c17598 commit 197e9a3
Show file tree
Hide file tree
Showing 13 changed files with 351 additions and 36 deletions.
30 changes: 30 additions & 0 deletions linkis-commons/linkis-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,36 @@
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.261</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<!-- for hadoop 3.3.3 -->
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
public class LinkisStorageConf {
private static final Object CONF_LOCK = new Object();

public static final String DOLPHIN = "dolphin";

public static final String PARQUET = "parquet";

public static final String PARQUET_FILE_SUFFIX = ".parquet";

public static final String HDFS_FILE_SYSTEM_REST_ERRS =
CommonVars.apply(
"wds.linkis.hdfs.rest.errs",
Expand All @@ -34,12 +40,17 @@ public class LinkisStorageConf {
public static final String ROW_BYTE_MAX_LEN_STR =
CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue();

public static final String ENGINE_RESULT_TYPE =
CommonVars.apply("wds.linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue();

public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR);

public static final String FILE_TYPE =
CommonVars.apply(
"wds.linkis.storage.file.type",
"dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql")
"dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql"
+ ","
+ PARQUET)
.getValue();

private static volatile String[] fileTypeArr = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.storage.FSFactory;
import org.apache.linkis.storage.conf.LinkisStorageConf;
import org.apache.linkis.storage.domain.Dolphin;
import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary;
import org.apache.linkis.storage.exception.StorageWarnException;
Expand Down Expand Up @@ -134,15 +135,22 @@ public String[] getResultSetType() {

@Override
public ResultSet<? extends MetaData, ? extends Record> getResultSetByPath(FsPath fsPath, Fs fs) {
ResultSet resultSet = null;
try (InputStream inputStream = fs.read(fsPath)) {
String resultSetType = Dolphin.getType(inputStream);
if (StringUtils.isEmpty(resultSetType)) {
throw new StorageWarnException(
THE_FILE_IS_EMPTY.getErrorCode(),
MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath()));
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
String resultSetType = Dolphin.getType(inputStream);
if (StringUtils.isEmpty(resultSetType)) {
throw new StorageWarnException(
THE_FILE_IS_EMPTY.getErrorCode(),
MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc(), fsPath.getPath()));
}
// Utils.tryQuietly(fs::close);
resultSet = getResultSetByType(resultSetType);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE);
}
// Utils.tryQuietly(fs::close);
return getResultSetByType(resultSetType);
return resultSet;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.storage.resultset;

import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.storage.domain.Column;
import org.apache.linkis.storage.domain.DataType;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.apache.linkis.storage.resultset.table.TableRecord;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParquetResultSetReader<K extends MetaData, V extends Record>
extends StorageResultSetReader {

private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetReader.class);

private FsPath fsPath;

private ParquetReader<GenericRecord> parquetReader;

private GenericRecord record;

public ParquetResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath)
throws IOException {
super(resultSet, inputStream);
this.fsPath = fsPath;
this.parquetReader =
AvroParquetReader.<GenericRecord>builder(new Path(fsPath.getPath())).build();
this.record = parquetReader.read();
}

@Override
public MetaData getMetaData() {
if (metaData == null) {
try {
List<Schema.Field> fields = record.getSchema().getFields();
List<Column> columnList =
fields.stream()
.map(
field ->
new Column(
field.name(),
DataType.toDataType(field.schema().getType().getName()),
""))
.collect(Collectors.toList());

metaData = new TableMetaData(columnList.toArray(new Column[0]));
} catch (Exception e) {
throw new RuntimeException("Failed to read parquet schema", e);
}
}
return metaData;
}

@Override
public boolean hasNext() throws IOException {
if (metaData == null) getMetaData();
if (record == null) return false;
ArrayList<Object> resultList = new ArrayList<>();
TableMetaData tableMetaData = (TableMetaData) metaData;
int length = tableMetaData.getColumns().length;
for (int i = 0; i < length; i++) {
resultList.add(record.get(i));
}
row = new TableRecord(resultList.toArray(new Object[0]));
if (row == null) return false;
return record != null;
}

@Override
public Record getRecord() {
if (metaData == null) throw new RuntimeException("Must read metadata first(必须先读取metadata)");
if (row == null) {
throw new RuntimeException(
"Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)");
}
try {
this.record = parquetReader.read();
} catch (IOException e) {
throw new RuntimeException("Failed to read parquet record", e);
}
return row;
}

@Override
public void close() throws IOException {
super.close();
parquetReader.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.storage.resultset;

import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.storage.domain.Column;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.apache.linkis.storage.resultset.table.TableRecord;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParquetResultSetWriter<K extends MetaData, V extends Record>
extends StorageResultSetWriter {
private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetWriter.class);

private Schema schema = null;

public ParquetResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) {
super(resultSet, maxCacheSize, storePath);
}

@Override
public void addMetaData(MetaData metaData) throws IOException {
if (!moveToWriteRow) {
rMetaData = metaData;
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder.record("linkis").fields();
TableMetaData tableMetaData = (TableMetaData) rMetaData;
for (Column column : tableMetaData.columns) {
fieldAssembler
.name(column.getColumnName().replaceAll("\\.", "_").replaceAll("[^a-zA-Z0-9_]", ""))
.doc(column.getComment())
.type(column.getDataType().getTypeName().toLowerCase())
.noDefault();
}
schema = fieldAssembler.endRecord();
moveToWriteRow = true;
}
}

@Override
public void addRecord(Record record) {
if (moveToWriteRow) {
try {
TableRecord tableRecord = (TableRecord) record;
Object[] row = tableRecord.row;
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(storePath.getPath()))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()) {

GenericRecord genericRecord = new GenericData.Record(schema);
for (int i = 0; i < row.length; i++) {
genericRecord.put(schema.getFields().get(i).name(), row[i]);
}
writer.write(genericRecord);
}
} catch (Exception e) {
logger.warn("addMetaDataAndRecordString failed", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.common.io.resultset.ResultSetReader;
import org.apache.linkis.storage.FSFactory;
import org.apache.linkis.storage.conf.LinkisStorageConf;
import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary;
import org.apache.linkis.storage.exception.StorageWarnException;
import org.apache.linkis.storage.resultset.table.TableMetaData;
Expand All @@ -40,8 +41,19 @@ public class ResultSetReaderFactory {
private static final Logger logger = LoggerFactory.getLogger(ResultSetReaderFactory.class);

public static <K extends MetaData, V extends Record> ResultSetReader getResultSetReader(
ResultSet<K, V> resultSet, InputStream inputStream) {
return new StorageResultSetReader<>(resultSet, inputStream);
ResultSet<K, V> resultSet, InputStream inputStream, FsPath fsPath) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
StorageResultSetReader<K, V> resultSetReader = null;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
resultSetReader = new StorageResultSetReader<>(resultSet, inputStream);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
try {
resultSetReader = new ParquetResultSetReader<>(resultSet, inputStream, fsPath);
} catch (IOException e) {
throw new RuntimeException("Failed to read parquet", e);
}
}
return resultSetReader;
}

public static <K extends MetaData, V extends Record> ResultSetReader getResultSetReader(
Expand All @@ -61,7 +73,7 @@ public static ResultSetReader getResultSetReader(String res) throws IOException
Fs fs = FSFactory.getFs(resPath);
fs.init(null);
ResultSetReader reader =
ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath));
ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath), resPath);
if (reader instanceof StorageResultSetReader) {
((StorageResultSetReader<?, ?>) reader).setFs(fs);
}
Expand Down Expand Up @@ -96,7 +108,7 @@ public static ResultSetReader getTableResultReader(String res) {
InputStream read = fs.read(resPath);

return ResultSetReaderFactory.<TableMetaData, TableRecord>getResultSetReader(
(TableResultSet) resultSet, read);
(TableResultSet) resultSet, read, resPath);
} catch (IOException e) {
throw new StorageWarnException(
LinkisStorageErrorCodeSummary.TABLE_ARE_NOT_SUPPORTED.getErrorCode(),
Expand Down
Loading

0 comments on commit 197e9a3

Please sign in to comment.