Skip to content

Commit

Permalink
[Feature] Enhanced FileSource to support more parameters feature (#4838)
Browse files Browse the repository at this point in the history
* Enhanced FileSource to support more parameters feature

* Delete param of `limitTotalLine` and it can be replaced by method of `fileInfo`

* Remove redundant references
  • Loading branch information
guoshupei authored Aug 9, 2023
1 parent f08773d commit 64cdd4d
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,17 @@ public Pair<Integer, Integer>[] getFileInfo(int needToCountRowNumber) {
.map(fileSplit -> fileSplit.getFileInfo(needToCountRowNumber))
.toArray(Pair[]::new);
}

@Override
public FileSource limitBytes(Long limitBytes) {
Arrays.stream(fileSplits).forEach(fileSplit -> fileSplit.setLimitBytes(limitBytes));
return this;
}

@Override
public FileSource limitColumnLength(int limitColumnLength) {
Arrays.stream(fileSplits)
.forEach(fileSplit -> fileSplit.setLimitColumnLength(limitColumnLength));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ static boolean isResultSet(FsPath fsPath) {
return isResultSet(fsPath.getPath());
}

FileSource limitBytes(Long limitBytes);

FileSource limitColumnLength(int limitColumnLength);

/**
* Currently only supports table multi-result sets
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -60,6 +62,8 @@ public class FileSplit implements Closeable {
protected Function<Record, Record> shuffler;
private boolean pageTrigger = false;
protected Map<String, String> params = new HashMap<>();
private long limitBytes = 0L;
private int limitColumnLength = 0;

public FileSplit(FsReader<? extends MetaData, ? extends Record> fsReader) {
this.fsReader = fsReader;
Expand Down Expand Up @@ -98,6 +102,14 @@ public int getTotalLine() {
return totalLine;
}

public void setLimitBytes(long limitBytes) {
this.limitBytes = limitBytes;
}

public void setLimitColumnLength(int limitColumnLength) {
this.limitColumnLength = limitColumnLength;
}

public <M> M whileLoop(Function<MetaData, M> metaDataFunction, Consumer<Record> recordConsumer) {
M m = null;
try {
Expand Down Expand Up @@ -222,16 +234,47 @@ record -> {

public Pair<Object, List<String[]>> collect() {
List<String[]> recordList = new ArrayList<>();
final AtomicLong tmpBytes = new AtomicLong(0L);
final AtomicBoolean overFlag = new AtomicBoolean(false);
Object metaData =
whileLoop(
collectMetaData -> collectMetaData(collectMetaData),
r -> recordList.add(collectRecord(r)));
r -> {
if (!overFlag.get()) {
String[] arr = collectRecord(r);
if (limitBytes > 0) {
for (int i = 0; i < arr.length; i++) {
tmpBytes.addAndGet(arr[i].getBytes().length);
if (overFlag.get() || tmpBytes.get() > limitBytes) {
overFlag.set(true);
arr[i] = "";
}
}
recordList.add(arr);
} else {
recordList.add(arr);
}
}
});
return new Pair<>(metaData, recordList);
}

public String[] collectRecord(Record record) {
if (record instanceof TableRecord) {
TableRecord tableRecord = (TableRecord) record;
if (limitColumnLength > 0) {
return Arrays.stream(tableRecord.row)
.map(
obj -> {
String col = DataType.valueToString(obj);
if (col.length() > limitColumnLength) {
return col.substring(0, limitColumnLength);
} else {
return col;
}
})
.toArray(String[]::new);
}
return Arrays.stream(tableRecord.row).map(DataType::valueToString).toArray(String[]::new);
} else if (record instanceof LineRecord) {
LineRecord lineRecord = (LineRecord) record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public class WorkSpaceConfiguration {
public static final CommonVars<Boolean> ENABLE_USER_GROUP =
CommonVars$.MODULE$.apply("linkis.os.user.group.enabled", true);

// default 63M
public static final CommonVars<Long> FILESYSTEM_LIMIT_BYTES =
CommonVars$.MODULE$.apply("linkis.filesystem.limit.bytes", 66060288L);

public static final CommonVars<Integer> FILESYSTEM_LIMIT_COLUMN_LENGTH =
CommonVars$.MODULE$.apply("linkis.filesystem.limit.column.length", 2000);

public static final ExecutorService executorService =
new ThreadPoolExecutor(
FILESYSTEM_FS_THREAD_NUM.getValue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,9 @@ public Message openFile(
@RequestParam(value = "path", required = false) String path,
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "pageSize", defaultValue = "5000") Integer pageSize,
@RequestParam(value = "charset", defaultValue = "utf-8") String charset)
@RequestParam(value = "charset", defaultValue = "utf-8") String charset,
@RequestParam(value = "limitBytes", defaultValue = "0") Long limitBytes,
@RequestParam(value = "limitColumnLength", defaultValue = "0") Integer limitColumnLength)
throws IOException, WorkSpaceException {

Message message = Message.ok();
Expand All @@ -583,6 +585,14 @@ public Message openFile(
if (FileSource.isResultSet(fsPath.getPath())) {
fileSource = fileSource.page(page, pageSize);
}
if (limitBytes > 0) {
fileSource = fileSource.limitBytes(Math.min(limitBytes, FILESYSTEM_LIMIT_BYTES.getValue()));
}
if (limitColumnLength > 0) {
fileSource =
fileSource.limitColumnLength(
Math.min(limitColumnLength, FILESYSTEM_LIMIT_COLUMN_LENGTH.getValue()));
}
Pair<Object, List<String[]>> result = fileSource.collect()[0];
IOUtils.closeQuietly(fileSource);
message.data("metadata", result.getFirst()).data("fileContent", result.getSecond());
Expand Down

0 comments on commit 64cdd4d

Please sign in to comment.