Skip to content

Commit

Permalink
feat(core): show columns (#351)
Browse files Browse the repository at this point in the history
对show columns的优化改造,主要包括以下几个方面的更新——使得Show Columns基本上匹配IGinx的外特性期望:

功能增强:增强了对接层接口的表达能力,getColumns函数语义更清晰。
性能提升:将show columns后的正则下推到对接层实现,减少读取的列的数量和内存占用。
错误修复:
修复了原有show columns中无法仅显示满足dataPrefix的列,而显示全部dummy列。
对接层修复了对列isDummy属性的判断。
修复了show columns无法查询前缀带有schemaPrefix的列的bug。
各对接层show columns详细实现
对接层	非dummy	dummy
Mongodb	后过滤(Mongodb无法有选择地列出集合名称)	直接查询
Redis	查询+后过滤(后过滤用于过滤tagkv)	直接查询
Parquet	后过滤(列名直接保存在内存中,但是没有建立索引)	后过滤(列名需要从Parquet文件中读取,无法有选择地读取)
FileSystem	list file实现	直接查询
IoTDB12	直接查询	直接查询
InfluxDB	查询+后过滤(后过滤用于过滤tagkv)	直接查询
直接查询:通过查询直接获取所需的列,不需要再次过滤,不会读取多余的列
后过滤:先获取所有列,然后在对接层代码中过滤
  • Loading branch information
RemHero authored Aug 8, 2024
1 parent d805bb2 commit 3c4919d
Show file tree
Hide file tree
Showing 41 changed files with 1,631 additions and 277 deletions.
1 change: 1 addition & 0 deletions .github/actions/service/postgresql/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ runs:
echo "port = ${port}" >> "${PGCONF}"
echo "unix_socket_directories = ''" >> "${PGCONF}"
echo "fsync = off" >> "${PGCONF}"
echo "max_connections = 200" >> "${PGCONF}"
pg_ctl start --pgdata="${PGDATA}"
done
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.SetTransform;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval;
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
import java.util.Set;

public interface IStorage {
/** 对非叠加分片查询数据 */
Expand Down Expand Up @@ -60,7 +66,7 @@ default TaskExecuteResult executeProjectWithSetTransform(
TaskExecuteResult executeInsert(Insert insert, DataArea dataArea);

/** 获取所有列信息 */
List<Column> getColumns() throws PhysicalException;
List<Column> getColumns(Set<String> patterns, TagFilter tagFilter) throws PhysicalException;

/** 获取指定前缀的数据边界 */
Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String prefix) throws PhysicalException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue;
import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils;
import cn.edu.tsinghua.iginx.engine.physical.task.GlobalPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Operator;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.SetTransform;
import cn.edu.tsinghua.iginx.engine.shared.operator.ShowColumns;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
Expand All @@ -57,7 +61,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -365,64 +368,55 @@ public TaskExecuteResult executeGlobalTask(GlobalPhysicalTask task) {

public TaskExecuteResult executeShowColumns(ShowColumns showColumns) {
List<StorageEngineMeta> storageList = metaManager.getStorageEngineList();
Set<Column> columnSet = new HashSet<>();
TreeSet<Column> targetColumns = new TreeSet<>(Comparator.comparing(Column::getPhysicalPath));
for (StorageEngineMeta storage : storageList) {
long id = storage.getId();
Pair<IStorage, ThreadPoolExecutor> pair = storageManager.getStorage(id);
if (pair == null) {
continue;
}
try {
List<Column> columnList = pair.k.getColumns();
// fix the schemaPrefix
Set<String> patterns = showColumns.getPathRegexSet();
String schemaPrefix = storage.getSchemaPrefix();
// schemaPrefix是在IGinX中定义的,数据源的路径中没有该前缀,因此需要剪掉patterns中前缀是schemaPrefix的部分
patterns = StringUtils.cutSchemaPrefix(schemaPrefix, patterns);
if (patterns.isEmpty()) {
continue;
}
// 求patterns与dataPrefix的交集
patterns = StringUtils.intersectDataPrefix(storage.getDataPrefix(), patterns);
if (patterns.isEmpty()) {
continue;
}
if (patterns.contains("*")) {
patterns = Collections.emptySet();
}
List<Column> columnList = pair.k.getColumns(patterns, showColumns.getTagFilter());

// 列名前加上schemaPrefix
if (schemaPrefix != null) {
for (Column column : columnList) {
if (column.isDummy()) {
column.setPath(schemaPrefix + "." + column.getPath());
}
}
columnList.forEach(
column -> {
column.setPath(schemaPrefix + "." + column.getPath());
targetColumns.add(column);
});
} else {
targetColumns.addAll(columnList);
}
columnSet.addAll(columnList);
} catch (PhysicalException e) {
return new TaskExecuteResult(e);
}
}

Set<String> pathRegexSet = showColumns.getPathRegexSet();
TagFilter tagFilter = showColumns.getTagFilter();

TreeSet<Column> tsSetAfterFilter = new TreeSet<>(Comparator.comparing(Column::getPhysicalPath));
for (Column column : columnSet) {
boolean isTarget = true;
if (!pathRegexSet.isEmpty()) {
isTarget = false;
for (String pathRegex : pathRegexSet) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) {
isTarget = true;
break;
}
}
}
if (tagFilter != null) {
if (!TagKVUtils.match(column.getTags(), tagFilter)) {
isTarget = false;
}
}
if (isTarget) {
tsSetAfterFilter.add(column);
}
}

int limit = showColumns.getLimit();
int offset = showColumns.getOffset();
if (limit == Integer.MAX_VALUE && offset == 0) {
return new TaskExecuteResult(Column.toRowStream(tsSetAfterFilter));
return new TaskExecuteResult(Column.toRowStream(targetColumns));
} else {
// only need part of data.
List<Column> tsList = new ArrayList<>();
int cur = 0, size = tsSetAfterFilter.size();
for (Iterator<Column> iter = tsSetAfterFilter.iterator(); iter.hasNext(); cur++) {
int cur = 0, size = targetColumns.size();
for (Iterator<Column> iter = targetColumns.iterator(); iter.hasNext(); cur++) {
if (cur >= size || cur - offset >= limit) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.AndFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Op;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.filesystem.exec.Executor;
import cn.edu.tsinghua.iginx.filesystem.exec.LocalExecutor;
import cn.edu.tsinghua.iginx.filesystem.exec.RemoteExecutor;
Expand All @@ -42,6 +46,7 @@
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -148,8 +153,9 @@ public TaskExecuteResult executeDelete(Delete delete, DataArea dataArea) {
}

@Override
public List<Column> getColumns() throws PhysicalException {
return executor.getColumnsOfStorageUnit(WILDCARD);
public List<Column> getColumns(Set<String> patterns, TagFilter tagFilter)
throws PhysicalException {
return executor.getColumnsOfStorageUnit(WILDCARD, patterns, tagFilter);
}

@Override
Expand All @@ -159,7 +165,7 @@ public Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String prefix)
}

@Override
public synchronized void release() throws PhysicalException {
public synchronized void release() {
executor.close();
if (thread != null) {
thread.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
import java.util.Set;

public interface Executor {

Expand All @@ -44,7 +45,8 @@ TaskExecuteResult executeProjectTask(
TaskExecuteResult executeDeleteTask(
List<String> paths, List<KeyRange> keyRanges, TagFilter tagFilter, String storageUnit);

List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException;
List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> patterns, TagFilter tagFilter) throws PhysicalException;

Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String dataPrefix)
throws PhysicalException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import cn.edu.tsinghua.iginx.filesystem.tools.MemoryPool;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.utils.StringUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
Expand Down Expand Up @@ -391,8 +392,16 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
.collect(Collectors.toList());
}

public List<File> getAllFiles(File dir, boolean containsEmptyDir) {
public List<File> getTargetFiles(
File dir, String root, String storageUnit, List<String> patterns, boolean containsEmptyDir) {
dir = FilePathUtils.normalize(dir, FileAccessType.READ);
List<String> pathRegexList = new ArrayList<>(patterns.size());
String suffix = storageUnit == null ? "" : "\\d+"; // 末尾匹配数字
patterns.forEach(
p -> {
String pathPattern = FilePathUtils.toFilePath(root, storageUnit, p);
pathRegexList.add(StringUtils.reformatPath(pathPattern) + suffix);
});

List<File> res = new ArrayList<>();
try {
Expand All @@ -405,12 +414,18 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
if (containsEmptyDir && isDirEmpty(dir)) {
res.add(dir.toFile());
}
try (DirectoryStream<Path> stream =
Files.newDirectoryStream(
dir,
path ->
path.toFile().isFile() && FilePathUtils.matches(path, pathRegexList))) {
stream.forEach(path -> res.add(path.toFile()));
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
res.add(file.toFile());
return FileVisitResult.CONTINUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream.EmptyRowStream;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.KeyRange;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -67,9 +69,9 @@ public class LocalExecutor implements Executor {

private String prefix;

private boolean hasData;
private final boolean hasData;

private FileSystemManager fileSystemManager;
private final FileSystemManager fileSystemManager;

public LocalExecutor(boolean isReadOnly, boolean hasData, Map<String, String> extraParams) {
String dir = extraParams.get(Constant.INIT_INFO_DIR);
Expand Down Expand Up @@ -327,43 +329,48 @@ public TaskExecuteResult executeDeleteTask(
}

@Override
public List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException {
public List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> patterns, TagFilter tagFilter) throws PhysicalException {
List<Column> columns = new ArrayList<>();
List<String> patternList = new ArrayList<>(patterns);
if (patternList.isEmpty()) {
patternList.add("*");
}
if (root != null) {
File directory = new File(FilePathUtils.toIginxPath(root, storageUnit, null));
for (File file : fileSystemManager.getAllFiles(directory, false)) {
for (File file :
fileSystemManager.getTargetFiles(directory, root, storageUnit, patternList, false)) {
FileMeta meta = fileSystemManager.getFileMeta(file);
if (meta == null) {
throw new PhysicalException(
String.format(
"encounter error when getting columns of storage unit because file meta %s is null",
file.getAbsolutePath()));
}
columns.add(
new Column(
FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit),
meta.getDataType(),
meta.getTags(),
false));
// get columns by tag filter
if (tagFilter != null && !TagKVUtils.match(meta.getTags(), tagFilter)) {
continue;
}
String columnPath =
FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit);
columns.add(new Column(columnPath, meta.getDataType(), meta.getTags(), false));
}
}
if (hasData && dummyRoot != null) {
for (File file : fileSystemManager.getAllFiles(new File(realDummyRoot), true)) {
columns.add(
new Column(
FilePathUtils.convertAbsolutePathToPath(
dummyRoot, file.getAbsolutePath(), storageUnit),
DataType.BINARY,
null,
true));
// get columns from dummy storage unit
if (hasData && dummyRoot != null && tagFilter == null) {
for (File file :
fileSystemManager.getTargetFiles(
new File(realDummyRoot), dummyRoot, null, patternList, true)) {
String dummyPath =
FilePathUtils.convertAbsolutePathToPath(dummyRoot, file.getAbsolutePath(), storageUnit);
columns.add(new Column(dummyPath, DataType.BINARY, null, true));
}
}
return columns;
}

@Override
public Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String dataPrefix)
throws PhysicalException {
public Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String dataPrefix) {
KeyInterval keyInterval = KeyInterval.getDefaultKeyInterval();
ColumnsInterval columnsInterval;

Expand Down
Loading

0 comments on commit 3c4919d

Please sign in to comment.