Skip to content

Commit

Permalink
Fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Dec 7, 2023
1 parent c9e810f commit d5d14eb
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ public String[] getResultSetType() {
public ResultSet<? extends MetaData, ? extends Record> getResultSetByPath(FsPath fsPath, Fs fs) {
ResultSet resultSet = null;
try (InputStream inputStream = fs.read(fsPath)) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)
|| fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) {
if (fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) {
String resultSetType = Dolphin.getType(inputStream);
if (StringUtils.isEmpty(resultSetType)) {
throw new StorageWarnException(
Expand All @@ -149,8 +147,7 @@ public String[] getResultSetType() {
}
// Utils.tryQuietly(fs::close);
resultSet = getResultSetByType(resultSetType);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)
|| fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) {
} else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) {
resultSet = getResultSetByType(ResultSetFactory.TABLE_TYPE);
}
return resultSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.linkis.common.io.resultset.ResultSetReader;
import org.apache.linkis.storage.FSFactory;
import org.apache.linkis.storage.conf.LinkisStorageConf;
import org.apache.linkis.storage.domain.Dolphin;
import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary;
import org.apache.linkis.storage.exception.StorageWarnException;
import org.apache.linkis.storage.resultset.table.TableMetaData;
Expand All @@ -42,11 +43,10 @@ public class ResultSetReaderFactory {

public static <K extends MetaData, V extends Record> ResultSetReader getResultSetReader(
ResultSet<K, V> resultSet, InputStream inputStream, FsPath fsPath) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
ResultSetReader<K, V> resultSetReader = null;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
if (fsPath.getPath().endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)) {
resultSetReader = new StorageResultSetReader<>(resultSet, inputStream);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
} else if (fsPath.getPath().endsWith(LinkisStorageConf.PARQUET_FILE_SUFFIX)) {
try {
resultSetReader = new ParquetResultSetReader<>(resultSet, inputStream, fsPath);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.linkis.common.io.resultset.ResultSetReader;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.storage.conf.LinkisStorageConf;
import org.apache.linkis.storage.resultset.table.TableResultSet;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -39,10 +40,10 @@ public static <K extends MetaData, V extends Record> ResultSetWriter<K, V> getRe
ResultSet<K, V> resultSet, long maxCacheSize, FsPath storePath) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
ResultSetWriter<K, V> writer = null;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) {
writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath);
} else {
writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath);
}
return writer;
}
Expand All @@ -51,12 +52,12 @@ public static <K extends MetaData, V extends Record> ResultSetWriter<K, V> getRe
ResultSet<K, V> resultSet, long maxCacheSize, FsPath storePath, String proxyUser) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
ResultSetWriter<K, V> writer = null;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
if (engineResultType.equals(LinkisStorageConf.PARQUET) && resultSet instanceof TableResultSet) {
writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath);
} else {
writer = new StorageResultSetWriter<>(resultSet, maxCacheSize, storePath);
StorageResultSetWriter storageResultSetWriter = (StorageResultSetWriter) writer;
storageResultSetWriter.setProxyUser(proxyUser);
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
writer = new ParquetResultSetWriter<>(resultSet, maxCacheSize, storePath);
}
return writer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.storage.conf.LinkisStorageConf;
import org.apache.linkis.storage.domain.Dolphin;
import org.apache.linkis.storage.resultset.table.TableResultSet;
import org.apache.linkis.storage.utils.StorageConfiguration;

import org.slf4j.Logger;
Expand Down Expand Up @@ -51,10 +52,8 @@ public String charset() {
@Override
public FsPath getResultSetPath(FsPath parentDir, String fileName) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
String fileSuffix = null;
if (engineResultType.equals(LinkisStorageConf.DOLPHIN)) {
fileSuffix = Dolphin.DOLPHIN_FILE_SUFFIX;
} else if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
String fileSuffix = Dolphin.DOLPHIN_FILE_SUFFIX;
if (engineResultType.equals(LinkisStorageConf.PARQUET) && this instanceof TableResultSet) {
fileSuffix = LinkisStorageConf.PARQUET_FILE_SUFFIX;
}
final String path =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,8 @@ public interface FileSource extends Closeable {
(path, suffix) -> path.endsWith("." + suffix);

static boolean isResultSet(String path) {
String engineResultType = LinkisStorageConf.ENGINE_RESULT_TYPE;
String type = fileType[0];
if (engineResultType.equals(LinkisStorageConf.PARQUET)) {
type = LinkisStorageConf.PARQUET;
}
return suffixPredicate.apply(path, type);
return suffixPredicate.apply(path, fileType[0])
|| suffixPredicate.apply(path, LinkisStorageConf.PARQUET);
}

static boolean isResultSet(FsPath fsPath) {
Expand Down

0 comments on commit d5d14eb

Please sign in to comment.