Skip to content

Commit

Permalink
Fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Dec 7, 2023
1 parent 34433b3 commit c9e810f
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LinkisStorageConf {
CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue();

public static final String ENGINE_RESULT_TYPE =
CommonVars.apply("wds.linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue();
CommonVars.apply("linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue();

public static final long ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public boolean exists(String resultSetType) {

@Override
public boolean isResultSetPath(String path) {
return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX);
return path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)
|| path.endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX);
}

@Override
Expand Down Expand Up @@ -138,7 +139,8 @@ public String[] getResultSetType() {
ResultSet resultSet = null;
try (InputStream inputStream = fs.read(fsPath)) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)
|| fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) {
String resultSetType = Dolphin.getType(inputStream);
if (StringUtils.isEmpty(resultSetType)) {
throw new StorageWarnException(
Expand All @@ -147,7 +149,8 @@ public String[] getResultSetType() {
}
// Utils.tryQuietly(fs::close);
resultSet = getResultSetByType(resultSetType);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)
|| fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) {
resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE);
}
return resultSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.common.io.resultset.ResultSetReader;
import org.apache.linkis.storage.domain.Column;
import org.apache.linkis.storage.domain.DataType;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.apache.linkis.storage.resultset.table.TableRecord;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
Expand All @@ -42,19 +44,29 @@
import org.slf4j.LoggerFactory;

public class ParquetResultSetReader<K extends MetaData, V extends Record>
extends StorageResultSetReader {
extends ResultSetReader<K, V> {

private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetReader.class);

private FsPath fsPath;

private final ResultSet<K, V> resultSet;

private final InputStream inputStream;

private MetaData metaData;

private Record row;

private ParquetReader<GenericRecord> parquetReader;

private GenericRecord record;

public ParquetResultSetReader(ResultSet resultSet, InputStream inputStream, FsPath fsPath)
throws IOException {
super(resultSet, inputStream);
this.resultSet = resultSet;
this.inputStream = inputStream;
this.fsPath = fsPath;
this.parquetReader =
AvroParquetReader.<GenericRecord>builder(new Path(fsPath.getPath())).build();
Expand Down Expand Up @@ -84,6 +96,21 @@ public MetaData getMetaData() {
return metaData;
}

@Override
public int skip(int recordNum) throws IOException {
return 0;
}

@Override
public long getPosition() throws IOException {
return 0;
}

@Override
public long available() throws IOException {
return 0;
}

@Override
public boolean hasNext() throws IOException {
if (metaData == null) getMetaData();
Expand Down Expand Up @@ -116,7 +143,7 @@ public Record getRecord() {

@Override
public void close() throws IOException {
super.close();
IOUtils.closeQuietly(inputStream);
parquetReader.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.storage.domain.Column;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.apache.linkis.storage.resultset.table.TableRecord;
Expand All @@ -41,21 +42,36 @@
import org.slf4j.LoggerFactory;

public class ParquetResultSetWriter<K extends MetaData, V extends Record>
extends StorageResultSetWriter {
extends ResultSetWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(ParquetResultSetWriter.class);

private Schema schema = null;
private Schema schema;

private ParquetWriter<GenericRecord> parquetWriter;

private boolean moveToWriteRow = false;

private MetaData metaData = null;

private final FsPath storePath;

private final long maxCacheSize;

private final ResultSet<K, V> resultSet;

public ParquetResultSetWriter(ResultSet resultSet, long maxCacheSize, FsPath storePath) {
super(resultSet, maxCacheSize, storePath);
this.resultSet = resultSet;
this.maxCacheSize = maxCacheSize;
this.storePath = storePath;
}

@Override
public void addMetaData(MetaData metaData) throws IOException {
if (!moveToWriteRow) {
rMetaData = metaData;
this.metaData = metaData;
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder.record("linkis").fields();
TableMetaData tableMetaData = (TableMetaData) rMetaData;
TableMetaData tableMetaData = (TableMetaData) this.metaData;
for (Column column : tableMetaData.columns) {
fieldAssembler
.name(column.getColumnName().replaceAll("\\.", "_").replaceAll("[^a-zA-Z0-9_]", ""))
Expand All @@ -65,31 +81,55 @@ public void addMetaData(MetaData metaData) throws IOException {
}
schema = fieldAssembler.endRecord();
moveToWriteRow = true;
if (parquetWriter == null) {
parquetWriter =
AvroParquetWriter.<GenericRecord>builder(new Path(storePath.getPath()))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build();
}
}
}

@Override
public void addRecord(Record record) {
if (moveToWriteRow) {
TableRecord tableRecord = (TableRecord) record;
try {
TableRecord tableRecord = (TableRecord) record;
Object[] row = tableRecord.row;
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(storePath.getPath()))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()) {

GenericRecord genericRecord = new GenericData.Record(schema);
for (int i = 0; i < row.length; i++) {
genericRecord.put(schema.getFields().get(i).name(), row[i]);
}
writer.write(genericRecord);
GenericRecord genericRecord = new GenericData.Record(schema);
for (int i = 0; i < row.length; i++) {
genericRecord.put(schema.getFields().get(i).name(), row[i]);
}
} catch (Exception e) {
parquetWriter.write(genericRecord);
} catch (IOException e) {
logger.warn("addMetaDataAndRecordString failed", e);
}
}
}

@Override
public FsPath toFSPath() {
return storePath;
}

@Override
public String toString() {
return storePath.getSchemaPath();
}

@Override
public void addMetaDataAndRecordString(String content) {}

@Override
public void addRecordString(String content) {}

@Override
public void close() throws IOException {
parquetWriter.close();
}

@Override
public void flush() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ResultSetReaderFactory {
public static <K extends MetaData, V extends Record> ResultSetReader getResultSetReader(
ResultSet<K, V> resultSet, InputStream inputStream, FsPath fsPath) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
StorageResultSetReader<K, V> resultSetReader = null;
ResultSetReader<K, V> resultSetReader = null;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
resultSetReader = new StorageResultSetReader<>(resultSet, inputStream);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.common.io.resultset.ResultSetReader;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.storage.conf.LinkisStorageConf;

import java.io.IOException;
Expand All @@ -34,11 +35,10 @@
public class ResultSetWriterFactory {
private static final Logger logger = LoggerFactory.getLogger(ResultSetWriterFactory.class);

public static <K extends MetaData, V extends Record>
org.apache.linkis.common.io.resultset.ResultSetWriter<K, V> getResultSetWriter(
ResultSet<K, V> resultSet, long maxCacheSize, FsPath storePath) {
public static <K extends MetaData, V extends Record> ResultSetWriter<K, V> getResultSetWriter(
ResultSet<K, V> resultSet, long maxCacheSize, FsPath storePath) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
StorageResultSetWriter<K, V> writer = null;
ResultSetWriter<K, V> writer = null;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
Expand All @@ -47,25 +47,22 @@ org.apache.linkis.common.io.resultset.ResultSetWriter<K, V> getResultSetWriter(
return writer;
}

public static <K extends MetaData, V extends Record>
org.apache.linkis.common.io.resultset.ResultSetWriter<K, V> getResultSetWriter(
ResultSet<K, V> resultSet, long maxCacheSize, FsPath storePath, String proxyUser) {
public static <K extends MetaData, V extends Record> ResultSetWriter<K, V> getResultSetWriter(
ResultSet<K, V> resultSet, long maxCacheSize, FsPath storePath, String proxyUser) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
StorageResultSetWriter<K, V> writer = null;
ResultSetWriter<K, V> writer = null;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath);
StorageResultSetWriter storageResultSetWriter = (StorageResultSetWriter) writer;
storageResultSetWriter.setProxyUser(proxyUser);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath);
}
writer.setProxyUser(proxyUser);
return writer;
}

public static Record[] getRecordByWriter(
org.apache.linkis.common.io.resultset.ResultSetWriter<? extends MetaData, ? extends Record>
writer,
long limit)
throws IOException {
ResultSetWriter<? extends MetaData, ? extends Record> writer, long limit) throws IOException {
String res = writer.toString();
return getRecordByRes(res, limit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public class StorageResultSetReader<K extends MetaData, V extends Record>
private static final Logger logger = LoggerFactory.getLogger(StorageResultSetReader.class);

private final ResultSet<K, V> resultSet;
public final InputStream inputStream;
public final ResultDeserializer<K, V> deserializer;
public K metaData;
public Record row;
private final InputStream inputStream;
private final ResultDeserializer<K, V> deserializer;
private K metaData;
private Record row;
private int colCount = 0;
private int rowCount = 0;
private Fs fs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ public class StorageResultSetWriter<K extends MetaData, V extends Record>

private final ResultSet<K, V> resultSet;
private final long maxCacheSize;
public final FsPath storePath;
private final FsPath storePath;

private final ResultSerializer serializer;
public boolean moveToWriteRow = false;
private boolean moveToWriteRow = false;
private OutputStream outputStream = null;
private int rowCount = 0;
private final List<Byte> buffer = new ArrayList<Byte>();
private Fs fs = null;
public MetaData rMetaData = null;
private MetaData rMetaData = null;
private String proxyUser = StorageUtils.getJvmUser();
private boolean fileCreated = false;
private boolean closed = false;
Expand Down
1 change: 1 addition & 0 deletions tool/dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ zstd-jni-1.4.9-1.jar
zstd-jni-1.5.0-4.jar
zjsonpatch-0.3.0.jar
agrona-1.12.0.jar
audience-annotations-0.12.0.jar
audience-annotations-0.13.0.jar
commons-crypto-1.1.0.jar
disruptor-3.4.2.jar
Expand Down

0 comments on commit c9e810f

Please sign in to comment.