Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node parallelization): parallel without stats #319

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ statement
| SHOW jobStatus TRANSFORM JOB # showEligibleJobStatement
| REMOVE HISTORYDATASOURCE removedStorageEngine (COMMA removedStorageEngine)* # removeHistoryDataSourceStatement
| SET CONFIG configName = stringLiteral configValue = stringLiteral # setConfigStatement
| SHOW CONFIG (configName = stringLiteral)? # showConfigStatement
| SHOW CONFIG configName = stringLiteral # showConfigStatement
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

版本回退了吧

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

| SHOW SESSIONID # showSessionIDStatement
| COMPACT # compactStatement
| SHOW RULES # showRulesStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ private static void processSql(String sql) {
} catch (SessionException e) {
System.out.println(e.getMessage());
} catch (Exception e) {
e.printStackTrace();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些日志处理都按最新的来吧

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

解冲突后,参考曼薇的remote-test,针对功能写几个典型测试吧

System.out.println(
"Execute Error: encounter error(s) when executing sql statement, "
+ "see server log for more details.");
Expand Down Expand Up @@ -446,6 +447,7 @@ private static void processSqlWithStream(String sql) {
} catch (SessionException e) {
System.out.println(e.getMessage());
} catch (Exception e) {
e.printStackTrace();
System.out.println(
"Execute Error: encounter error(s) when executing sql statement, "
+ "see server log for more details.");
Expand Down
36 changes: 19 additions & 17 deletions conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ queryOptimizer=rbo
# 优化器规则
ruleBasedOptimizer=NotFilterRemoveRule=on,FragmentPruningByFilterRule=on,ColumnPruningRule=on,FragmentPruningByPatternRule=on,ConstantPropagationRule=on,FilterConstantFoldingRule=on,RowTransformConstantFoldingRule=on

# 是否开启并行化算子
enableParallelOperator=true
# ParallelFilter触发行数
parallelFilterThreshold=10000
# ParallelGroupBy触发行数
Expand All @@ -62,8 +64,8 @@ parallelApplyFuncGroupsThreshold=1000
# ParallelGroupBy线程池大小
parallelGroupByPoolSize=5
# ParallelGroupBy线程池数量
parallelGroupByPoolNum=5
# ParallelGroupBy线程池数量
parallelGroupByPoolNum=10
# 流式ParallelGroupBy线程池数量
streamParallelGroupByWorkerNum=5

# 约束
Expand Down Expand Up @@ -91,8 +93,8 @@ minThriftWorkerThreadNum = 20
# thrift线程池最大线程数量
maxThriftWrokerThreadNum = 2147483647

# 当前是否是UT测试环境
utTestEnv = false
# 分布式查询的触发分片数
distributedQueryTriggerThreshold = 100

####################
### Migration 相关配置
Expand Down Expand Up @@ -148,10 +150,10 @@ zookeeperConnectionString=127.0.0.1:2181
#etcdEndpoints=http://localhost:2379

# 是否开启元数据内存管理
enableMetaCacheControl=false
enable_meta_cache_control=false

# 分片缓存最大内存限制,单位为 KB,默认 128 MB
fragmentCacheThreshold=131072
fragment_cache_threshold=131072

##########################
### 执行层配置
Expand All @@ -165,15 +167,15 @@ useStreamExecutor=false
### 内存控制
##########################

enableMemoryControl=false
enable_memory_control=false

systemResourceMetrics=default
system_resource_metrics=default

heapMemoryThreshold=0.9
heap_memory_threshold=0.9

systemMemoryThreshold=0.9
system_memory_threshold=0.9

systemCpuThreshold=0.9
system_cpu_threshold=0.9

####################
### REST 服务配置
Expand Down Expand Up @@ -218,17 +220,17 @@ transformMaxRetryTimes=3
### MQTT 配置
####################

enableMqtt=false
enable_mqtt=false

mqttHost=0.0.0.0
mqtt_host=0.0.0.0

mqttPort=1883
mqtt_port=1883

mqttHandlerPoolSize=1
mqtt_handler_pool_size=1

mqttPayloadFormatter=cn.edu.tsinghua.iginx.mqtt.JsonPayloadFormatter
mqtt_payload_formatter=cn.edu.tsinghua.iginx.mqtt.JsonPayloadFormatter

mqttMaxMessageSize=1048576
mqtt_max_message_size=1048576

##########################
### SimplePolicy 策略配置
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import cn.edu.tsinghua.iginx.conf.Constants;
import cn.edu.tsinghua.iginx.engine.ContextBuilder;
import cn.edu.tsinghua.iginx.engine.StatementExecutor;
import cn.edu.tsinghua.iginx.engine.distributedquery.worker.SubPlanExecutor;
import cn.edu.tsinghua.iginx.engine.logical.optimizer.rules.RuleCollection;
import cn.edu.tsinghua.iginx.engine.physical.PhysicalEngineImpl;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
Expand Down Expand Up @@ -563,7 +564,18 @@ public ExecuteSqlResp executeSql(ExecuteSqlReq req) {
StatementExecutor executor = StatementExecutor.getInstance();
RequestContext ctx = contextBuilder.build(req);
executor.execute(ctx);
return ctx.getResult().getExecuteSqlResp();
LOGGER.info("total cost time: " + (ctx.getEndTime() - ctx.getStartTime()));
ExecuteSqlResp resp = ctx.getResult().getExecuteSqlResp();
resp.setCostTime(ctx.getEndTime() - ctx.getStartTime());
return resp;
}

@Override
public ExecuteSubPlanResp executeSubPlan(ExecuteSubPlanReq req) {
SubPlanExecutor executor = SubPlanExecutor.getInstance();
RequestContext ctx = contextBuilder.build(req);
executor.execute(ctx);
return ctx.getResult().getExecuteSubPlanResp();
}

@Override
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ public class Config {
private String ruleBasedOptimizer =
"NotFilterRemoveRule=on,FragmentPruningByFilterRule=on,ColumnPruningRule=on,FragmentPruningByPatternRule=on";

private int distributedQueryTriggerThreshold = 3;

//////////////

public static final String tagNameAnnotation = TagKVUtils.tagNameAnnotation;
Expand All @@ -195,6 +197,8 @@ public class Config {

/////////////

private boolean enableParallelOperator = true;

private int parallelFilterThreshold = 10000;

private int parallelGroupByRowsThreshold = 10000;
Expand Down Expand Up @@ -825,6 +829,14 @@ public void setMaxThriftWrokerThreadNum(int maxThriftWrokerThreadNum) {
this.maxThriftWrokerThreadNum = maxThriftWrokerThreadNum;
}

public boolean isEnableParallelOperator() {
return enableParallelOperator;
}

public void setEnableParallelOperator(boolean enableParallelOperator) {
this.enableParallelOperator = enableParallelOperator;
}

public int getParallelFilterThreshold() {
return parallelFilterThreshold;
}
Expand Down Expand Up @@ -896,4 +908,12 @@ public String getRuleBasedOptimizer() {
public void setRuleBasedOptimizer(String ruleBasedOptimizer) {
this.ruleBasedOptimizer = ruleBasedOptimizer;
}

public int getDistributedQueryTriggerThreshold() {
return distributedQueryTriggerThreshold;
}

public void setDistributedQueryTriggerThreshold(int distributedQueryTriggerThreshold) {
this.distributedQueryTriggerThreshold = distributedQueryTriggerThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ private void loadPropsFromFile() {
Integer.parseInt(properties.getProperty("minThriftWorkerThreadNum", "20")));
config.setMaxThriftWrokerThreadNum(
Integer.parseInt(properties.getProperty("maxThriftWorkerThreadNum", "2147483647")));
config.setEnableParallelOperator(
Boolean.parseBoolean(properties.getProperty("enableParallelOperator", "true")));
config.setParallelFilterThreshold(
Integer.parseInt(properties.getProperty("parallelFilterThreshold", "10000")));
config.setParallelGroupByRowsThreshold(
Expand All @@ -234,6 +236,9 @@ private void loadPropsFromFile() {
properties.getProperty(
"ruleBasedOptimizer",
"NotFilterRemoveRule=on,FragmentPruningByFilterRule=on,ColumnPruningRule=on,FragmentPruningByPatternRule=on"));
properties.getProperty("ruleBasedOptimizer", "RemoveNotRule=on,FilterFragmentRule=on");
config.setDistributedQueryTriggerThreshold(
Integer.parseInt(properties.getProperty("distributedQueryTriggerThreshold", "3")));
} catch (IOException e) {
config.setUTTestEnv(true);
config.setNeedInitBasicUDFFunctions(false);
Expand Down Expand Up @@ -341,6 +346,8 @@ private void loadPropsFromEnv() {
EnvUtils.loadEnv("historicalPrefixList", config.getHistoricalPrefixList()));
config.setExpectedStorageUnitNum(
EnvUtils.loadEnv("expectedStorageUnitNum", config.getExpectedStorageUnitNum()));
config.setEnableParallelOperator(
EnvUtils.loadEnv("enableParallelOperator", config.isEnableParallelOperator()));
config.setParallelFilterThreshold(
EnvUtils.loadEnv("parallelFilterThreshold", config.getParallelFilterThreshold()));
config.setParallelGroupByRowsThreshold(
Expand All @@ -360,6 +367,9 @@ private void loadPropsFromEnv() {
config.setUTTestEnv(EnvUtils.loadEnv("utTestEnv", config.isUTTestEnv()));
config.setRuleBasedOptimizer(
EnvUtils.loadEnv("ruleBasedOptimizer", config.getRuleBasedOptimizer()));
config.setDistributedQueryTriggerThreshold(
EnvUtils.loadEnv(
"distributedQueryTriggerThreshold", config.getDistributedQueryTriggerThreshold()));
}

private void loadUDFListFromFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ public RequestContext build(ExecuteStatementReq req) {
return new RequestContext(req.getSessionId(), req.getStatement(), true);
}

public RequestContext build(ExecuteSubPlanReq req) {
RequestContext context = new RequestContext(req.getSessionId());
context.setSubPlanMsg(req.getSubPlan());
return context;
}

public RequestContext build(LoadCSVReq req) {
return new RequestContext(req.getSessionId(), req.getStatement());
}
Expand Down
Loading
Loading