Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): show columns #351

Merged
Merged
Show file tree
Hide file tree
Changes from 97 commits
Commits
Show all changes
115 commits
Select commit Hold shift + click to select a range
a4ae3d1
fix show columns
RemHero Jun 4, 2024
a406f2f
test
RemHero Jun 4, 2024
918a760
test
RemHero Jun 4, 2024
e083c74
test
RemHero Jun 5, 2024
4a4c798
fix pom
RemHero Jun 5, 2024
621cf38
add interface
RemHero Jun 6, 2024
f065562
fix
RemHero Jun 6, 2024
e16e2ae
fix
RemHero Jun 6, 2024
aa69181
format
RemHero Jun 6, 2024
4c73402
add test
RemHero Jun 7, 2024
7acf6a8
remove test
RemHero Jun 7, 2024
aa3b103
feat(redis): show columns with pattern and tagFilter
aqni Jun 7, 2024
4c08d43
feat(mongodb): show columns with pattern and tagFilter
aqni Jun 7, 2024
2dc19cb
feat(parquet): show columns with pattern and tagFilter
aqni Jun 7, 2024
1d23dfc
Merge pull request #152 from aqni/feat/redis/show_columns_prefix_filt…
RemHero Jun 7, 2024
9360aae
Merge pull request #153 from aqni/feat/mongodb/show_columns_prefix_fi…
RemHero Jun 7, 2024
23ae9f6
Merge pull request #154 from aqni/feat/parquet/show_columns_prefix_fi…
RemHero Jun 7, 2024
f0b7fc5
influxdb get columns by pattern and tag
shinyano Jun 8, 2024
fe7a188
Merge pull request #156 from shinyano/fix/get-columns
RemHero Jun 8, 2024
0a0d7b5
relational get columns by pattern and tag
shinyano Jun 8, 2024
7c5e8c9
Merge pull request #157 from shinyano/fix/get-columns
RemHero Jun 8, 2024
0e368fb
fix influxdb & relational
shinyano Jun 8, 2024
d104944
Merge pull request #158 from shinyano/fix/get-columns
RemHero Jun 10, 2024
4836912
add test
RemHero Jun 10, 2024
9122833
fix mongo
aqni Jun 8, 2024
4cad9c4
fix mongodb
aqni Jun 11, 2024
cae24f7
fix
aqni Jun 8, 2024
48ca3fd
test parquet
aqni Jun 11, 2024
366cd86
fix parquet
aqni Jun 11, 2024
54562b3
Revert "test parquet"
aqni Jun 11, 2024
2eb9735
Merge pull request #160 from aqni/feat/mongodb/show_columns_prefix_fi…
RemHero Jun 11, 2024
fb01b6b
Merge pull request #161 from aqni/feat/parquet/show_columns_prefix_fi…
RemHero Jun 11, 2024
55d84e2
format
RemHero Jun 11, 2024
f23bd5d
fix parquet
RemHero Jun 11, 2024
e258635
fix parquet
RemHero Jun 11, 2024
39e11a7
add test
RemHero Jun 11, 2024
155f95b
test
RemHero Jun 11, 2024
039a7b4
test
RemHero Jun 11, 2024
c3928c3
fix parquet
aqni Jun 12, 2024
5392117
Merge pull request #163 from aqni/feat/parquet/show_columns_prefix_fi…
RemHero Jun 13, 2024
4d31180
format
RemHero Jun 13, 2024
9f6601c
fix
RemHero Jun 14, 2024
e5d6ae4
Merge branch 'main' into feat_show_columns_prefix_filter_push_down_test
RemHero Jun 14, 2024
45b3ca5
fix the code QL
RemHero Jun 14, 2024
7771fc2
foramt
RemHero Jun 14, 2024
e0e84b6
rename
RemHero Jun 17, 2024
389382d
better IoTDB
RemHero Jun 17, 2024
ca90fc2
feat(influxdb&relational): show columns by pattern (#164)
shinyano Jun 18, 2024
38a92ba
fix IoTDB
RemHero Jun 18, 2024
e443ec5
fix IoTDB
RemHero Jun 19, 2024
bdcf1fb
remove unused code
shinyano Jun 28, 2024
69c13ba
Merge pull request #165 from shinyano/refactor/get_columns
SolomonAnn Jun 28, 2024
c2716e1
Merge branch 'main' into pr/351
jzl18thu Jul 12, 2024
f46f96b
Merge branch 'main' into pr/351
jzl18thu Jul 14, 2024
cab29ac
format && optimize import
jzl18thu Jul 14, 2024
b4bb47a
optimize import
jzl18thu Jul 14, 2024
0a339c2
rename patterns
jzl18thu Jul 16, 2024
1f02d04
add note
jzl18thu Jul 16, 2024
5bdb244
extract function
jzl18thu Jul 16, 2024
a8cd688
Merge branch 'main' into pr/351
jzl18thu Jul 16, 2024
4327816
just for test
jzl18thu Jul 16, 2024
5c90d76
test
jzl18thu Jul 16, 2024
93708e7
test
jzl18thu Jul 16, 2024
4782475
test
jzl18thu Jul 16, 2024
4b76934
fix schema prefix
jzl18thu Jul 18, 2024
52d66e6
fix
jzl18thu Jul 18, 2024
ddc1e6b
open CompactIT test
jzl18thu Jul 18, 2024
148ff04
fix
jzl18thu Jul 18, 2024
62fe52b
add test
jzl18thu Jul 18, 2024
57b96ac
fix test
jzl18thu Jul 18, 2024
042f7dc
adjust getColumns
jzl18thu Jul 18, 2024
3b712a2
Revert "adjust getColumns"
jzl18thu Jul 18, 2024
7c399b9
add unit test
jzl18thu Jul 18, 2024
379c5d2
fix
jzl18thu Jul 18, 2024
fa97124
format
jzl18thu Jul 18, 2024
8c1cc2b
add debug log
jzl18thu Jul 19, 2024
aea60ba
try to fix close connection
jzl18thu Jul 19, 2024
b4bc966
add log
jzl18thu Jul 19, 2024
bf88e1d
revert
jzl18thu Jul 19, 2024
786df5b
test
jzl18thu Jul 19, 2024
c3a3ae6
add test
jzl18thu Jul 19, 2024
868cd5a
test
jzl18thu Jul 19, 2024
ee0ed29
fix test
jzl18thu Jul 19, 2024
4ccd732
test
jzl18thu Jul 19, 2024
e2a9747
fix
jzl18thu Jul 20, 2024
2378e8b
fix
jzl18thu Jul 21, 2024
3e20c1c
Merge branch 'main' into pr/351
jzl18thu Jul 21, 2024
c63034e
open other test
jzl18thu Jul 21, 2024
eadd521
Merge branch 'main' into pr/351
jzl18thu Jul 23, 2024
0a3513a
try to fix
jzl18thu Jul 23, 2024
7c77b9c
open other tests
jzl18thu Jul 23, 2024
acaf379
test
jzl18thu Jul 24, 2024
20b0beb
debug
jzl18thu Jul 24, 2024
001afbd
debug
jzl18thu Jul 24, 2024
aebbdbc
debug
jzl18thu Jul 24, 2024
fd8af16
revert
jzl18thu Jul 24, 2024
32077ac
Update StoragePhysicalTaskExecutor.java
SolomonAnn Jul 26, 2024
23e3696
feat(filesystem): push down patterns and tagFilter
jzl18thu Jul 28, 2024
1c2f742
Merge branch 'main' into pr/351
jzl18thu Jul 28, 2024
18826a0
fix
jzl18thu Jul 28, 2024
7832a82
fix has_data
jzl18thu Aug 5, 2024
1fa2c8e
Revert "fix has_data"
jzl18thu Aug 5, 2024
625128f
Merge branch 'main' into pr/351
jzl18thu Aug 6, 2024
de20884
push down data prefix
jzl18thu Aug 6, 2024
355fa6e
debug
jzl18thu Aug 6, 2024
26bf3e4
push down patterns
jzl18thu Aug 6, 2024
065e926
fix schemaprefix
jzl18thu Aug 6, 2024
a09c673
format
jzl18thu Aug 6, 2024
6463d19
fix
jzl18thu Aug 6, 2024
6fe3209
fix
jzl18thu Aug 6, 2024
996ba6a
fix
jzl18thu Aug 6, 2024
74d5dbe
fix
jzl18thu Aug 6, 2024
6556885
fix
jzl18thu Aug 6, 2024
e5335e4
fix
jzl18thu Aug 6, 2024
8c42d91
open other tests
jzl18thu Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/service/postgresql/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ runs:
echo "port = ${port}" >> "${PGCONF}"
echo "unix_socket_directories = ''" >> "${PGCONF}"
echo "fsync = off" >> "${PGCONF}"
echo "max_connections = 200" >> "${PGCONF}"

pg_ctl start --pgdata="${PGDATA}"
done
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.SetTransform;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval;
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
import java.util.Set;

public interface IStorage {
/** 对非叠加分片查询数据 */
Expand Down Expand Up @@ -60,7 +66,7 @@ default TaskExecuteResult executeProjectWithSetTransform(
TaskExecuteResult executeInsert(Insert insert, DataArea dataArea);

/** 获取所有列信息 */
List<Column> getColumns() throws PhysicalException;
List<Column> getColumns(Set<String> patterns, TagFilter tagFilter) throws PhysicalException;

/** 获取指定前缀的数据边界 */
Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String prefix) throws PhysicalException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue;
import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils;
import cn.edu.tsinghua.iginx.engine.physical.task.GlobalPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Operator;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.SetTransform;
import cn.edu.tsinghua.iginx.engine.shared.operator.ShowColumns;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
Expand Down Expand Up @@ -353,64 +358,76 @@ public TaskExecuteResult executeGlobalTask(GlobalPhysicalTask task) {

public TaskExecuteResult executeShowColumns(ShowColumns showColumns) {
List<StorageEngineMeta> storageList = metaManager.getStorageEngineList();
Set<Column> columnSet = new HashSet<>();
TreeSet<Column> columnSetAfterFilter =
new TreeSet<>(Comparator.comparing(Column::getPhysicalPath));
for (StorageEngineMeta storage : storageList) {
long id = storage.getId();
Pair<IStorage, ThreadPoolExecutor> pair = storageManager.getStorage(id);
if (pair == null) {
continue;
}
try {
List<Column> columnList = pair.k.getColumns();
// fix the schemaPrefix
Set<String> patterns = showColumns.getPathRegexSet();
TagFilter tagFilter = showColumns.getTagFilter();

String schemaPrefix = storage.getSchemaPrefix();
if (schemaPrefix != null) {
for (Column column : columnList) {
if (column.isDummy()) {
column.setPath(schemaPrefix + "." + column.getPath());
// 不下推dataPrefix的原因:非dummy的数据库中如果有原始数据,会在show columns时被查询到并造成误解
String dataPrefixRegex =
SolomonAnn marked this conversation as resolved.
Show resolved Hide resolved
storage.getDataPrefix() == null
? null
: StringUtils.reformatPath(storage.getDataPrefix() + ".*");

// schemaPrefix是在IGinX中定义的,数据源的路径中没有该前缀,因此需要剪掉前缀是schemaPrefix的部分
Set<String> patternsCutSchemaPrefix = StringUtils.cutSchemaPrefix(schemaPrefix, patterns);
if (patternsCutSchemaPrefix.isEmpty()) {
continue;
}
if (patternsCutSchemaPrefix.contains("*")) {
patternsCutSchemaPrefix = Collections.emptySet();
}
List<Column> columnList = pair.k.getColumns(patternsCutSchemaPrefix, tagFilter);

if (tagFilter != null) {
columnSetAfterFilter.addAll(columnList);
continue;
}
for (Column column : columnList) {
if (!column.isDummy()) {
columnSetAfterFilter.add(column);
continue;
}
if (dataPrefixRegex == null || Pattern.matches(dataPrefixRegex, column.getPath())) {
if (schemaPrefix == null) {
columnSetAfterFilter.add(column);
continue;
}
column.setPath(schemaPrefix + "." + column.getPath());
boolean isMatch = patterns.isEmpty();
for (String pathRegex : patterns) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) {
isMatch = true;
break;
}
}
if (isMatch) {
columnSetAfterFilter.add(column);
}
}
}
columnSet.addAll(columnList);
} catch (PhysicalException e) {
return new TaskExecuteResult(e);
}
}

Set<String> pathRegexSet = showColumns.getPathRegexSet();
TagFilter tagFilter = showColumns.getTagFilter();

TreeSet<Column> tsSetAfterFilter = new TreeSet<>(Comparator.comparing(Column::getPhysicalPath));
for (Column column : columnSet) {
boolean isTarget = true;
if (!pathRegexSet.isEmpty()) {
isTarget = false;
for (String pathRegex : pathRegexSet) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) {
isTarget = true;
break;
}
}
}
if (tagFilter != null) {
if (!TagKVUtils.match(column.getTags(), tagFilter)) {
isTarget = false;
}
}
if (isTarget) {
tsSetAfterFilter.add(column);
}
}

int limit = showColumns.getLimit();
int offset = showColumns.getOffset();
if (limit == Integer.MAX_VALUE && offset == 0) {
return new TaskExecuteResult(Column.toRowStream(tsSetAfterFilter));
return new TaskExecuteResult(Column.toRowStream(columnSetAfterFilter));
} else {
// only need part of data.
List<Column> tsList = new ArrayList<>();
int cur = 0, size = tsSetAfterFilter.size();
for (Iterator<Column> iter = tsSetAfterFilter.iterator(); iter.hasNext(); cur++) {
int cur = 0, size = columnSetAfterFilter.size();
for (Iterator<Column> iter = columnSetAfterFilter.iterator(); iter.hasNext(); cur++) {
if (cur >= size || cur - offset >= limit) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.AndFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Op;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.filesystem.exec.Executor;
import cn.edu.tsinghua.iginx.filesystem.exec.LocalExecutor;
import cn.edu.tsinghua.iginx.filesystem.exec.RemoteExecutor;
Expand All @@ -42,6 +46,7 @@
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -148,8 +153,9 @@ public TaskExecuteResult executeDelete(Delete delete, DataArea dataArea) {
}

@Override
public List<Column> getColumns() throws PhysicalException {
return executor.getColumnsOfStorageUnit(WILDCARD);
public List<Column> getColumns(Set<String> patterns, TagFilter tagFilter)
throws PhysicalException {
return executor.getColumnsOfStorageUnit(WILDCARD, patterns, tagFilter);
}

@Override
Expand All @@ -159,7 +165,7 @@ public Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String prefix)
}

@Override
public synchronized void release() throws PhysicalException {
public synchronized void release() {
executor.close();
if (thread != null) {
thread.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
import java.util.Set;

public interface Executor {

Expand All @@ -44,7 +45,8 @@ TaskExecuteResult executeProjectTask(
TaskExecuteResult executeDeleteTask(
List<String> paths, List<KeyRange> keyRanges, TagFilter tagFilter, String storageUnit);

List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException;
List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> patterns, TagFilter tagFilter) throws PhysicalException;

Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String dataPrefix)
throws PhysicalException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream.EmptyRowStream;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.KeyRange;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
Expand All @@ -47,11 +48,13 @@
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.utils.StringUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -67,9 +70,9 @@ public class LocalExecutor implements Executor {

private String prefix;

private boolean hasData;
private final boolean hasData;

private FileSystemManager fileSystemManager;
private final FileSystemManager fileSystemManager;

public LocalExecutor(boolean isReadOnly, boolean hasData, Map<String, String> extraParams) {
String dir = extraParams.get(Constant.INIT_INFO_DIR);
Expand Down Expand Up @@ -327,43 +330,48 @@ public TaskExecuteResult executeDeleteTask(
}

@Override
public List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException {
public List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> patterns, TagFilter tagFilter) throws PhysicalException {
List<Column> columns = new ArrayList<>();
if (root != null) {
File directory = new File(FilePathUtils.toIginxPath(root, storageUnit, null));
for (File file : fileSystemManager.getAllFiles(directory, false)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

文件系统的show columns也可以下推,在getAllFiles()方法中将patterns转成正则匹配

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

FileMeta meta = fileSystemManager.getFileMeta(file);
String columnPath =
FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit);
if (meta == null) {
throw new PhysicalException(
String.format(
"encounter error when getting columns of storage unit because file meta %s is null",
file.getAbsolutePath()));
}
columns.add(
new Column(
FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit),
meta.getDataType(),
meta.getTags(),
false));
// get columns by pattern
if (StringUtils.pathNotMatchPatterns(columnPath, patterns)) {
continue;
}
// get columns by tag filter
if (tagFilter != null && !TagKVUtils.match(meta.getTags(), tagFilter)) {
continue;
}
columns.add(new Column(columnPath, meta.getDataType(), meta.getTags(), false));
}
}
if (hasData && dummyRoot != null) {
// get columns from dummy storage unit
if (hasData && dummyRoot != null && tagFilter == null) {
for (File file : fileSystemManager.getAllFiles(new File(realDummyRoot), true)) {
columns.add(
new Column(
FilePathUtils.convertAbsolutePathToPath(
dummyRoot, file.getAbsolutePath(), storageUnit),
DataType.BINARY,
null,
true));
String dummyPath =
FilePathUtils.convertAbsolutePathToPath(dummyRoot, file.getAbsolutePath(), storageUnit);
if (StringUtils.pathNotMatchPatterns(dummyPath, patterns)) {
continue;
}
columns.add(new Column(dummyPath, DataType.BINARY, null, true));
}
}
return columns;
}

@Override
public Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String dataPrefix)
throws PhysicalException {
public Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String dataPrefix) {
KeyInterval keyInterval = KeyInterval.getDefaultKeyInterval();
ColumnsInterval columnsInterval;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,25 @@
import cn.edu.tsinghua.iginx.engine.shared.data.write.DataView;
import cn.edu.tsinghua.iginx.engine.shared.data.write.RawDataType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.AndTagFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.BasePreciseTagFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.BaseTagFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.OrTagFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.PreciseTagFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.filesystem.exception.FilesystemException;
import cn.edu.tsinghua.iginx.filesystem.thrift.*;
import cn.edu.tsinghua.iginx.filesystem.thrift.DeleteReq;
import cn.edu.tsinghua.iginx.filesystem.thrift.FSHeader;
import cn.edu.tsinghua.iginx.filesystem.thrift.FSKeyRange;
import cn.edu.tsinghua.iginx.filesystem.thrift.FSRawData;
import cn.edu.tsinghua.iginx.filesystem.thrift.FileSystemService.Client;
import cn.edu.tsinghua.iginx.filesystem.thrift.GetBoundaryOfStorageResp;
import cn.edu.tsinghua.iginx.filesystem.thrift.GetColumnsOfStorageUnitResp;
import cn.edu.tsinghua.iginx.filesystem.thrift.InsertReq;
import cn.edu.tsinghua.iginx.filesystem.thrift.ProjectReq;
import cn.edu.tsinghua.iginx.filesystem.thrift.ProjectResp;
import cn.edu.tsinghua.iginx.filesystem.thrift.RawTagFilter;
import cn.edu.tsinghua.iginx.filesystem.thrift.Status;
import cn.edu.tsinghua.iginx.filesystem.thrift.TagFilterType;
import cn.edu.tsinghua.iginx.filesystem.tools.FilterTransformer;
import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval;
Expand All @@ -50,6 +65,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand Down Expand Up @@ -225,11 +241,13 @@ public TaskExecuteResult executeDeleteTask(
}

@Override
public List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException {
public List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> patterns, TagFilter tagFilter) throws PhysicalException {
try {
TTransport transport = thriftConnPool.borrowTransport();
Client client = new Client(new TBinaryProtocol(transport));
GetColumnsOfStorageUnitResp resp = client.getColumnsOfStorageUnit(storageUnit);
GetColumnsOfStorageUnitResp resp =
client.getColumnsOfStorageUnit(storageUnit, patterns, constructRawTagFilter(tagFilter));
thriftConnPool.returnTransport(transport);
List<Column> columns = new ArrayList<>();
resp.getPathList()
Expand Down Expand Up @@ -272,6 +290,9 @@ public void close() {

private RawTagFilter constructRawTagFilter(TagFilter tagFilter) {
RawTagFilter filter = null;
if (tagFilter == null) {
return null;
}
switch (tagFilter.getType()) {
case Base:
{
Expand Down
Loading
Loading