-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for storage resultSet in Parquet and Orc format #5024
Support for storage resultSet in Parquet and Orc format #5024
Conversation
@@ -34,12 +40,17 @@ public class LinkisStorageConf { | |||
public static final String ROW_BYTE_MAX_LEN_STR = | |||
CommonVars.apply("wds.linkis.resultset.row.max.str", "2m").getValue(); | |||
|
|||
public static final String ENGINE_RESULT_TYPE = | |||
CommonVars.apply("wds.linkis.engine.resultSet.type", DOLPHIN, "Result type").getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need remote wds prefix
@@ -134,15 +135,22 @@ public String[] getResultSetType() { | |||
|
|||
@Override | |||
public ResultSet<? extends MetaData, ? extends Record> getResultSetByPath(FsPath fsPath, Fs fs) { | |||
ResultSet resultSet = null; | |||
try (InputStream inputStream = fs.read(fsPath)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should support filename suffix judge
import org.slf4j.LoggerFactory; | ||
|
||
public class ParquetResultSetReader<K extends MetaData, V extends Record> | ||
extends StorageResultSetReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should extends ResultSetReader
"Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)"); | ||
} | ||
try { | ||
this.record = parquetReader.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.record = parquetReader.read(); should in hasNext method
import org.slf4j.LoggerFactory; | ||
|
||
public class ParquetResultSetWriter<K extends MetaData, V extends Record> | ||
extends StorageResultSetWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should extends ResultSetWriter
try { | ||
TableRecord tableRecord = (TableRecord) record; | ||
Object[] row = tableRecord.row; | ||
try (ParquetWriter<GenericRecord> writer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writer should be class var, only init once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for reviewing the code
|
||
@Override | ||
public int skip(int recordNum) throws IOException { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to implement
|
||
@Override | ||
public long getPosition() throws IOException { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to implement
|
||
@Override | ||
public long available() throws IOException { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to implement
|
||
@Override | ||
public int skip(int recordNum) throws IOException { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to implement
} | ||
|
||
@Override | ||
public long available() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to implement
|
||
@Override | ||
public long getPosition() throws IOException { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to implement
try { | ||
resultSetReader = new ParquetResultSetReader<>(resultSet, inputStream, fsPath); | ||
} catch (IOException e) { | ||
throw new RuntimeException("Failed to read parquet", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Linkis Exception
@@ -73,7 +73,9 @@ public interface FileSource extends Closeable { | |||
(path, suffix) -> path.endsWith("." + suffix); | |||
|
|||
static boolean isResultSet(String path) { | |||
return suffixPredicate.apply(path, fileType[0]); | |||
return suffixPredicate.apply(path, fileType[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to update dolphin
@@ -404,6 +404,10 @@ public Message getDirFileTrees( | |||
FsPathListWithError fsPathListWithError = fileSystem.listPathWithError(fsPath); | |||
if (fsPathListWithError != null) { | |||
for (FsPath children : fsPathListWithError.getFsPaths()) { | |||
// 兼容parquet和orc,跳过.crc文件 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need use English
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for reviewing the code
# Conflicts: # tool/dependencies/known-dependencies.txt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Support for storing result sets in Parquet format
linkis.engine.resultSet.type
default value isdolphin
, Can be modified toparquet
andorc
The maven compilation command is
mvn clean install -Pstorage-parquet -Pstorage-orc -Dmaven.test.skip=true