Skip to content

Commit

Permalink
fix the bug of exec_mem_limit not take effect in routine load (#3666)
Browse files Browse the repository at this point in the history
  • Loading branch information
trueeyu authored Feb 24, 2022
1 parent 97fd5e5 commit f096d7f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,17 @@ public RoutineLoadJob(Long id, String name, String clusterName,
if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
sessionVariables.put(SessionVariable.EXEC_MEM_LIMIT, Long.toString(var.getMaxExecMemByte()));
} else {
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
sessionVariables.put(SessionVariable.EXEC_MEM_LIMIT, Long.toString(SessionVariable.DEFAULT_EXEC_MEM_LIMIT));
}
}

public Map<String, String> getSessionVariables() {
return sessionVariables;
}

protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
setRoutineLoadDesc(stmt.getRoutineLoadDesc());
if (stmt.getDesiredConcurrentNum() != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
queryOptions.setQuery_type(TQueryType.LOAD);
queryOptions.setQuery_timeout(streamLoadTask.getTimeout());
// for stream load, we use exec_mem_limit to limit the memory usage of load channel.
queryOptions.setMem_limit(streamLoadTask.getExecMemLimit());
queryOptions.setLoad_mem_limit(streamLoadTask.getLoadMemLimit());
params.setQuery_options(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
private boolean enablePipelineEngine = false;

// max memory used on every backend.
public static final long DEFAULT_EXEC_MEM_LIMIT = 2147483648L;
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
public long maxExecMemByte = 2147483648L;
public long maxExecMemByte = DEFAULT_EXEC_MEM_LIMIT;

@VariableMgr.VarAttr(name = ENABLE_SPILLING)
public boolean enableSpilling = false;
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/task/StreamLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.starrocks.common.util.SqlParserUtils;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.routineload.RoutineLoadJob;
import com.starrocks.qe.SessionVariable;
import com.starrocks.thrift.TFileFormatType;
import com.starrocks.thrift.TFileType;
import com.starrocks.thrift.TStreamLoadPutRequest;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class StreamLoadTask {
private boolean strictMode = false; // default is false
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
private int timeout = Config.stream_load_default_timeout_second;
private long execMemLimit = 0;
private long loadMemLimit = 0;

public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) {
Expand Down Expand Up @@ -257,6 +259,11 @@ private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) {
jsonRoot = routineLoadJob.getJsonRoot();
}
stripOuterArray = routineLoadJob.isStripOuterArray();
if (routineLoadJob.getSessionVariables().containsKey(SessionVariable.EXEC_MEM_LIMIT)) {
execMemLimit = Long.parseLong(routineLoadJob.getSessionVariables().get(SessionVariable.EXEC_MEM_LIMIT));
} else {
execMemLimit = SessionVariable.DEFAULT_EXEC_MEM_LIMIT;
}
}

// used for stream load
Expand Down Expand Up @@ -326,4 +333,8 @@ private void setRowDelimiter(String orgDelimiter) throws AnalysisException {
public long getLoadMemLimit() {
return loadMemLimit;
}

public long getExecMemLimit() {
return execMemLimit;
}
}

0 comments on commit f096d7f

Please sign in to comment.