Skip to content

Commit

Permalink
Enhanced FileSource to support more parameters feature
Browse files Browse the repository at this point in the history
  • Loading branch information
guoshupei committed Jul 29, 2023
1 parent ead6822 commit 83853cc
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,28 @@ public Pair<Integer, Integer>[] getFileInfo(int needToCountRowNumber) {
.map(fileSplit -> fileSplit.getFileInfo(needToCountRowNumber))
.toArray(Pair[]::new);
}

@Override
public int getTotalCount() {
return Arrays.stream(fileSplits).mapToInt(FileSplit::getTotalCount).sum();
}

@Override
public FileSource limitTotalLine(int limitTotalLine) {
Arrays.stream(fileSplits).forEach(fileSplit -> fileSplit.setLimitTotalLine(limitTotalLine));
return this;
}

@Override
public FileSource limitBytes(Long limitBytes) {
Arrays.stream(fileSplits).forEach(fileSplit -> fileSplit.setLimitBytes(limitBytes));
return this;
}

@Override
public FileSource limitColumnLength(int limitColumnLength) {
Arrays.stream(fileSplits)
.forEach(fileSplit -> fileSplit.setLimitColumnLength(limitColumnLength));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ static boolean isResultSet(FsPath fsPath) {
return isResultSet(fsPath.getPath());
}

int getTotalCount();

FileSource limitTotalLine(int limitTotalLine);

FileSource limitBytes(Long limitBytes);

FileSource limitColumnLength(int limitColumnLength);

/**
* Currently only supports table multi-result sets
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -60,6 +62,9 @@ public class FileSplit implements Closeable {
protected Function<Record, Record> shuffler;
private boolean pageTrigger = false;
protected Map<String, String> params = new HashMap<>();
private int limitTotalLine = 0;
private long limitBytes = 0L;
private int limitColumnLength = 0;

public FileSplit(FsReader<? extends MetaData, ? extends Record> fsReader) {
this.fsReader = fsReader;
Expand Down Expand Up @@ -98,6 +103,22 @@ public int getTotalLine() {
return totalLine;
}

public int getTotalCount() {
return count;
}

public void setLimitTotalLine(int limitTotalLine) {
this.limitTotalLine = limitTotalLine;
}

public void setLimitBytes(long limitBytes) {
this.limitBytes = limitBytes;
}

public void setLimitColumnLength(int limitColumnLength) {
this.limitColumnLength = limitColumnLength;
}

public <M> M whileLoop(Function<MetaData, M> metaDataFunction, Consumer<Record> recordConsumer) {
M m = null;
try {
Expand All @@ -123,8 +144,10 @@ public <M> M whileLoop(Function<MetaData, M> metaDataFunction, Consumer<Record>
}
}
if (!needRemoveFlag) {
recordConsumer.accept(shuffler.apply(record));
totalLine++;
if (ifContinueRecord()) {
recordConsumer.accept(shuffler.apply(record));
totalLine++;
}
count++;
}
}
Expand Down Expand Up @@ -162,8 +185,10 @@ public void biConsumerWhileLoop(
}
}
if (!needRemoveFlag) {
recordConsumer.accept(shuffler.apply(record));
totalLine++;
if (ifContinueRecord()) {
recordConsumer.accept(shuffler.apply(record));
totalLine++;
}
count++;
}
}
Expand Down Expand Up @@ -222,16 +247,47 @@ record -> {

public Pair<Object, List<String[]>> collect() {
List<String[]> recordList = new ArrayList<>();
final AtomicLong tmpBytes = new AtomicLong(0L);
final AtomicBoolean overFlag = new AtomicBoolean(false);
Object metaData =
whileLoop(
collectMetaData -> collectMetaData(collectMetaData),
r -> recordList.add(collectRecord(r)));
r -> {
if (!overFlag.get()) {
String[] arr = collectRecord(r);
if (limitBytes > 0) {
for (int i = 0; i < arr.length; i++) {
tmpBytes.addAndGet(arr[i].getBytes().length);
if (overFlag.get() || tmpBytes.get() > limitBytes) {
overFlag.set(true);
arr[i] = "";
}
}
recordList.add(arr);
} else {
recordList.add(arr);
}
}
});
return new Pair<>(metaData, recordList);
}

public String[] collectRecord(Record record) {
if (record instanceof TableRecord) {
TableRecord tableRecord = (TableRecord) record;
if (limitColumnLength > 0) {
return Arrays.stream(tableRecord.row)
.map(
obj -> {
String col = DataType.valueToString(obj);
if (col.length() > limitColumnLength) {
return col.substring(0, limitColumnLength);
} else {
return col;
}
})
.toArray(String[]::new);
}
return Arrays.stream(tableRecord.row).map(DataType::valueToString).toArray(String[]::new);
} else if (record instanceof LineRecord) {
LineRecord lineRecord = (LineRecord) record;
Expand Down Expand Up @@ -267,6 +323,10 @@ private Map<String, String> columnToMap(Column column) {
}

public boolean ifContinueRead() {
return ifContinueRecord() || (limitTotalLine > 0 && count <= limitTotalLine);
}

public boolean ifContinueRecord() {
return !pageTrigger || count <= end;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ public class WorkSpaceConfiguration {
public static final CommonVars<Boolean> ENABLE_USER_GROUP =
CommonVars$.MODULE$.apply("linkis.os.user.group.enabled", true);

public static final CommonVars<Integer> FILESYSTEM_LIMIT_TOTAL_LINE =
CommonVars$.MODULE$.apply("wds.linkis.filesystem.limit.total.line", 5000);

// default 63M
public static final CommonVars<Long> FILESYSTEM_LIMIT_BYTES =
CommonVars$.MODULE$.apply("wds.linkis.filesystem.limit.bytes", 66060288L);

public static final CommonVars<Integer> FILESYSTEM_LIMIT_COLUMN_LENGTH =
CommonVars$.MODULE$.apply("wds.linkis.filesystem.limit.column.length", 2000);

public static final ExecutorService executorService =
new ThreadPoolExecutor(
FILESYSTEM_FS_THREAD_NUM.getValue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.slf4j.LoggerFactory;

import static org.apache.linkis.filesystem.conf.WorkSpaceConfiguration.*;
import static org.apache.linkis.filesystem.conf.WorkSpaceConfiguration.FILESYSTEM_LIMIT_COLUMN_LENGTH;
import static org.apache.linkis.filesystem.constant.WorkSpaceConstants.*;

@Api(tags = "file system")
Expand Down Expand Up @@ -560,7 +561,10 @@ public Message openFile(
@RequestParam(value = "path", required = false) String path,
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "pageSize", defaultValue = "5000") Integer pageSize,
@RequestParam(value = "charset", defaultValue = "utf-8") String charset)
@RequestParam(value = "charset", defaultValue = "utf-8") String charset,
@RequestParam(value = "limitTotalLine", defaultValue = "0") Integer limitTotalLine,
@RequestParam(value = "limitBytes", defaultValue = "0") Long limitBytes,
@RequestParam(value = "limitColumnLength", defaultValue = "0") Integer limitColumnLength)
throws IOException, WorkSpaceException {

Message message = Message.ok();
Expand All @@ -583,12 +587,33 @@ public Message openFile(
if (FileSource.isResultSet(fsPath.getPath())) {
fileSource = fileSource.page(page, pageSize);
}
if (limitTotalLine > 0) {
fileSource =
fileSource.limitTotalLine(
Math.min(limitTotalLine, FILESYSTEM_LIMIT_TOTAL_LINE.getValue()));
}
if (limitBytes > 0) {
fileSource = fileSource.limitBytes(Math.min(limitBytes, FILESYSTEM_LIMIT_BYTES.getValue()));
}
if (limitColumnLength > 0) {
fileSource =
fileSource.limitColumnLength(
Math.min(limitColumnLength, FILESYSTEM_LIMIT_COLUMN_LENGTH.getValue()));
}
Pair<Object, List<String[]>> result = fileSource.collect()[0];
IOUtils.closeQuietly(fileSource);
message.data("metadata", result.getFirst()).data("fileContent", result.getSecond());
message.data("metadata", result.getFirst());
message.data("fileContent", result.getSecond());
message.data("type", fileSource.getFileSplits()[0].getType());
message.data("totalLine", fileSource.getTotalLine());
return message.data("page", page).data("totalPage", 0);
message.data("totalCount", fileSource.getTotalCount());
message.data("page", page);
message.data(
"totalPage",
pageSize == 0
? 1
: (int) Math.ceil((double) fileSource.getTotalCount() / (double) pageSize));
return message;
} finally {
IOUtils.closeQuietly(fileSource);
}
Expand Down

0 comments on commit 83853cc

Please sign in to comment.