From f3762787249e4382b619607e94ea3a5becd5dc3a Mon Sep 17 00:00:00 2001 From: yanz Date: Fri, 22 Nov 2024 07:01:33 +0800 Subject: [PATCH 1/4] [Enhancement] fix connector mem scan limit for no chunk source --- .../pipeline/scan/connector_scan_operator.cpp | 28 ++++++------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.cpp b/be/src/exec/pipeline/scan/connector_scan_operator.cpp index 559d62e6adf6a..bd05cc1d77998 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/connector_scan_operator.cpp @@ -17,7 +17,6 @@ #include "exec/connector_scan_node.h" #include "exec/pipeline/pipeline_driver.h" #include "exec/pipeline/scan/balanced_chunk_buffer.h" -#include "exec/workgroup/work_group.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -54,9 +53,7 @@ class ConnectorScanOperatorIOTasksMemLimiter { int64_t chunk_source_mem_bytes_update_count = 0; int64_t arb_chunk_source_mem_bytes = 0; mutable int64_t debug_output_timestamp = 0; - std::atomic open_scan_operator_count = 0; - std::atomic active_scan_operator_count = 0; public: ConnectorScanOperatorIOTasksMemLimiter(int64_t dop, bool shared_scan) : dop(dop), shared_scan(shared_scan) {} @@ -127,9 +124,6 @@ class ConnectorScanOperatorIOTasksMemLimiter { int64_t update_open_scan_operator_count(int delta) { return open_scan_operator_count.fetch_add(delta, std::memory_order_seq_cst); } - int64_t update_active_scan_operator_count(int delta) { - return active_scan_operator_count.fetch_add(delta, std::memory_order_seq_cst); - } }; ConnectorScanOperatorFactory::ConnectorScanOperatorFactory(int32_t id, ScanNode* scan_node, RuntimeState* state, @@ -269,38 +263,32 @@ Status ConnectorScanOperator::do_prepare(RuntimeState* state) { _adaptive_processor->slow_io_latency_ms = options.connector_io_tasks_slow_io_latency_ms; } + // As the first running scan operator, it will update the scan mem limit { auto* factory = down_cast(_factory); ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter; - L->update_open_scan_operator_count(1); + int64_t c = L->update_open_scan_operator_count(1); + if (c == 0) { + _adjust_scan_mem_limit(connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES, + L->get_arb_chunk_source_mem_bytes()); + } } return Status::OK(); } void ConnectorScanOperator::do_close(RuntimeState* state) { + // As the last closing scan operator, it will update the scan mem limit. auto* factory = down_cast(_factory); ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter; int64_t c = L->update_open_scan_operator_count(-1); if (c == 1) { - if (L->update_active_scan_operator_count(0) > 0) { - _adjust_scan_mem_limit(L->get_arb_chunk_source_mem_bytes(), 0); - } + _adjust_scan_mem_limit(L->get_arb_chunk_source_mem_bytes(), 0); } } ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { auto* scan_node = down_cast(_scan_node); auto* factory = down_cast(_factory); - ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter; - - if (_adaptive_processor->started_running == false) { - _adaptive_processor->started_running = true; - int64_t c = L->update_active_scan_operator_count(1); - if (c == 0) { - _adjust_scan_mem_limit(connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES, - L->get_arb_chunk_source_mem_bytes()); - } - } // Only use one chunk source profile, so we can see metrics on scan operator level. // Since there is adaptive io tasks feature, chunk sources will be used unevenly, From 020a3b1e9025060e4becea7a423e501d8c5b2b2d Mon Sep 17 00:00:00 2001 From: yanz Date: Fri, 22 Nov 2024 07:03:47 +0800 Subject: [PATCH 2/4] update name to connector scan number --- be/src/exec/pipeline/fragment_executor.cpp | 8 ++++---- be/src/exec/pipeline/query_context.cpp | 4 ++-- be/src/exec/pipeline/query_context.h | 2 +- be/src/exec/pipeline/scan/connector_scan_operator.cpp | 5 +++-- be/src/exec/pipeline/scan/connector_scan_operator.h | 2 +- .../main/java/com/starrocks/qe/scheduler/dag/JobSpec.java | 4 ++-- gensrc/thrift/InternalService.thrift | 2 +- 7 files changed, 14 insertions(+), 13 deletions(-) diff --git a/be/src/exec/pipeline/fragment_executor.cpp b/be/src/exec/pipeline/fragment_executor.cpp index b4cba1c119638..c9338ca4935eb 100644 --- a/be/src/exec/pipeline/fragment_executor.cpp +++ b/be/src/exec/pipeline/fragment_executor.cpp @@ -238,12 +238,12 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified } } - int scan_node_number = 1; - if (query_globals.__isset.scan_node_number) { - scan_node_number = query_globals.scan_node_number; + int connector_scan_node_number = 1; + if (query_globals.__isset.connector_scan_node_number) { + connector_scan_node_number = query_globals.connector_scan_node_number; } _query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_ratio, - wg.get(), runtime_state, scan_node_number); + wg.get(), runtime_state, connector_scan_node_number); auto query_mem_tracker = _query_ctx->mem_tracker(); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(query_mem_tracker.get()); diff --git a/be/src/exec/pipeline/query_context.cpp b/be/src/exec/pipeline/query_context.cpp index 2bfc7a1d28678..09b940cf88bfd 100644 --- a/be/src/exec/pipeline/query_context.cpp +++ b/be/src/exec/pipeline/query_context.cpp @@ -110,7 +110,7 @@ void QueryContext::cancel(const Status& status) { void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit, std::optional spill_mem_reserve_ratio, workgroup::WorkGroup* wg, - RuntimeState* runtime_state, int scan_node_number) { + RuntimeState* runtime_state, int connector_scan_node_number) { std::call_once(_init_mem_tracker_once, [=]() { _profile = std::make_shared("Query" + print_id(_query_id)); auto* mem_tracker_counter = @@ -142,7 +142,7 @@ void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, _static_query_mem_limit = std::min(big_query_mem_limit, _static_query_mem_limit); } _connector_scan_operator_mem_share_arbitrator = _object_pool.add( - new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, scan_node_number)); + new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, connector_scan_node_number)); { MemTracker* connector_scan_parent = GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker(); diff --git a/be/src/exec/pipeline/query_context.h b/be/src/exec/pipeline/query_context.h index ed0ba918dfcb6..639790acfd822 100644 --- a/be/src/exec/pipeline/query_context.h +++ b/be/src/exec/pipeline/query_context.h @@ -164,7 +164,7 @@ class QueryContext : public std::enable_shared_from_this { /// that there is a big query memory limit of this resource group. void init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit = -1, std::optional spill_mem_limit = std::nullopt, workgroup::WorkGroup* wg = nullptr, - RuntimeState* state = nullptr, int scan_node_number = 1); + RuntimeState* state = nullptr, int connector_scan_node_number = 1); std::shared_ptr mem_tracker() { return _mem_tracker; } MemTracker* connector_scan_mem_tracker() { return _connector_scan_mem_tracker.get(); } diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.cpp b/be/src/exec/pipeline/scan/connector_scan_operator.cpp index bd05cc1d77998..1ae8658cb3397 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/connector_scan_operator.cpp @@ -24,10 +24,11 @@ namespace starrocks::pipeline { // ==================== ConnectorScanOperatorFactory ==================== ConnectorScanOperatorMemShareArbitrator::ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, - int scan_node_number) + int connector_scan_node_number) : query_mem_limit(query_mem_limit), scan_mem_limit(query_mem_limit), - total_chunk_source_mem_bytes(scan_node_number * connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES) {} + total_chunk_source_mem_bytes(connector_scan_node_number * + connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES) {} int64_t ConnectorScanOperatorMemShareArbitrator::update_chunk_source_mem_bytes(int64_t old_value, int64_t new_value) { int64_t diff = new_value - old_value; diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.h b/be/src/exec/pipeline/scan/connector_scan_operator.h index 4f5caed101bec..d52812e0c6a3c 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.h +++ b/be/src/exec/pipeline/scan/connector_scan_operator.h @@ -35,7 +35,7 @@ struct ConnectorScanOperatorMemShareArbitrator { int64_t scan_mem_limit = 0; std::atomic total_chunk_source_mem_bytes = 0; - ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int scan_node_number); + ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int connector_scan_node_number); int64_t set_scan_mem_ratio(double mem_ratio) { scan_mem_limit = std::max(1, query_mem_limit * mem_ratio); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java index 58a7a6fbf5e5e..800788ee81eae 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java @@ -113,7 +113,7 @@ public static JobSpec fromQuerySpec(ConnectContext context, if (context.getLastQueryId() != null) { queryGlobals.setLast_query_id(context.getLastQueryId().toString()); } - queryGlobals.setScan_node_number(scanNodes.size()); + queryGlobals.setConnector_scan_node_number(scanNodes.size()); return new Builder() .queryId(context.getExecutionId()) @@ -143,7 +143,7 @@ public static JobSpec fromMVMaintenanceJobSpec(ConnectContext context, if (context.getLastQueryId() != null) { queryGlobals.setLast_query_id(context.getLastQueryId().toString()); } - queryGlobals.setScan_node_number(scanNodes.size()); + queryGlobals.setConnector_scan_node_number(scanNodes.size()); return new Builder() .queryId(context.getExecutionId()) diff --git a/gensrc/thrift/InternalService.thrift b/gensrc/thrift/InternalService.thrift index 6dc55fa7bd01a..407741dc6bc8a 100644 --- a/gensrc/thrift/InternalService.thrift +++ b/gensrc/thrift/InternalService.thrift @@ -405,7 +405,7 @@ struct TQueryGlobals { 31: optional i64 timestamp_us - 32: optional i64 scan_node_number + 32: optional i64 connector_scan_node_number } From 5bb5de68fcb424200c8174213efc249913696def Mon Sep 17 00:00:00 2001 From: yanz Date: Fri, 22 Nov 2024 07:57:05 +0800 Subject: [PATCH 3/4] fix fe side Signed-off-by: yanz --- .../main/java/com/starrocks/planner/MetaScanNode.java | 8 +++++++- .../main/java/com/starrocks/planner/OlapScanNode.java | 10 +++++++--- .../src/main/java/com/starrocks/planner/ScanNode.java | 6 ++++++ .../java/com/starrocks/planner/SchemaScanNode.java | 7 +++++-- .../java/com/starrocks/qe/scheduler/dag/JobSpec.java | 4 ++-- 5 files changed, 27 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/MetaScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/MetaScanNode.java index 481ae5a720fe1..30c8b85f81c7d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/MetaScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/MetaScanNode.java @@ -128,7 +128,8 @@ public void computeRangeLocations() { boolean tabletIsNull = true; for (Replica replica : allQueryableReplicas) { ComputeNode node = - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(replica.getBackendId()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo() + .getBackendOrComputeNode(replica.getBackendId()); if (node == null) { LOG.debug("replica {} not exists", replica.getBackendId()); continue; @@ -197,4 +198,9 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) public boolean canUseRuntimeAdaptiveDop() { return true; } + + @Override + public boolean isRunningAsConnectorOperator() { + return false; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java index df3c72115aaa3..039e19fbf7593 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java @@ -514,7 +514,6 @@ public List updateScanRangeLocations(List freeMemory / 2) { LOG.warn( - "Try to allocate too many scan ranges for table {}, which may cause FE OOM, Partition Num:{}, tablet Num:{}, Scan Range Total Bytes:{}", + "Try to allocate too many scan ranges for table {}, which may cause FE OOM, Partition Num:{}, tablet " + + "Num:{}, Scan Range Total Bytes:{}", olapTable.getName(), totalPartitionNum, totalTabletsNum, totalScanRangeBytes); } } @@ -942,7 +942,6 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) return output.toString(); } - private void assignOrderByHints(List keyColumnNames) { // assign order by hint for (RuntimeFilterDescription probeRuntimeFilter : probeRuntimeFilters) { @@ -1514,4 +1513,9 @@ public void clearScanNodeForThriftBuild() { bucketColumns.clear(); rowStoreKeyLiterals = Lists.newArrayList(); } + + @Override + public boolean isRunningAsConnectorOperator() { + return false; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java index 5c9c6cae9c36d..1b1b940ce3aa0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java @@ -167,4 +167,10 @@ protected boolean supportTopNRuntimeFilter() { public boolean needCollectExecStats() { return true; } + + // We use this flag to know how many connector scan nodes at BE side, and connector framework + // will use this number to fair share memory usage between those scan nodes. + public boolean isRunningAsConnectorOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java index fb9c70966a2f5..ed2d5950401b6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java @@ -58,7 +58,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -372,7 +371,6 @@ public List getScanRangeLocations(long maxScanRangeLength) return beScanRanges; } - @Override public boolean canUseRuntimeAdaptiveDop() { return true; @@ -385,4 +383,9 @@ public String getCatalogName() { public void setCatalogName(String catalogName) { this.catalogName = catalogName; } + + @Override + public boolean isRunningAsConnectorOperator() { + return false; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java index 800788ee81eae..2e57ecd04241e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java @@ -113,7 +113,7 @@ public static JobSpec fromQuerySpec(ConnectContext context, if (context.getLastQueryId() != null) { queryGlobals.setLast_query_id(context.getLastQueryId().toString()); } - queryGlobals.setConnector_scan_node_number(scanNodes.size()); + queryGlobals.setConnector_scan_node_number(scanNodes.stream().filter(x -> x.isRunningAsConnectorOperator()).count()); return new Builder() .queryId(context.getExecutionId()) @@ -143,7 +143,7 @@ public static JobSpec fromMVMaintenanceJobSpec(ConnectContext context, if (context.getLastQueryId() != null) { queryGlobals.setLast_query_id(context.getLastQueryId().toString()); } - queryGlobals.setConnector_scan_node_number(scanNodes.size()); + queryGlobals.setConnector_scan_node_number(scanNodes.stream().filter(x -> x.isRunningAsConnectorOperator()).count()); return new Builder() .queryId(context.getExecutionId()) From 6392b33a8a314088d7212225b6875a03e247769d Mon Sep 17 00:00:00 2001 From: yanz Date: Fri, 22 Nov 2024 20:31:54 +0800 Subject: [PATCH 4/4] fix for comment --- be/src/connector/connector.h | 7 ++++--- be/src/exec/pipeline/scan/connector_scan_operator.cpp | 8 ++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/be/src/connector/connector.h b/be/src/connector/connector.h index 4cebb339d7042..7a3771f99c552 100644 --- a/be/src/connector/connector.h +++ b/be/src/connector/connector.h @@ -125,9 +125,10 @@ using DataSourcePtr = std::unique_ptr; class DataSourceProvider { public: - static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB - static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB - static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB + static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB + static constexpr int64_t DEFAULT_DATA_SOURCE_MEM_BYTES = 64 * 1024 * 1024; // 64MB + static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB + static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB virtual ~DataSourceProvider() = default; diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.cpp b/be/src/exec/pipeline/scan/connector_scan_operator.cpp index 1ae8658cb3397..1bb212a1eb2c5 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/connector_scan_operator.cpp @@ -28,7 +28,7 @@ ConnectorScanOperatorMemShareArbitrator::ConnectorScanOperatorMemShareArbitrator : query_mem_limit(query_mem_limit), scan_mem_limit(query_mem_limit), total_chunk_source_mem_bytes(connector_scan_node_number * - connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES) {} + connector::DataSourceProvider::DEFAULT_DATA_SOURCE_MEM_BYTES) {} int64_t ConnectorScanOperatorMemShareArbitrator::update_chunk_source_mem_bytes(int64_t old_value, int64_t new_value) { int64_t diff = new_value - old_value; @@ -213,9 +213,6 @@ struct ConnectorScanOperatorAdaptiveProcessor { int try_add_io_tasks_fail_count = 0; int check_slow_io = 0; int32_t slow_io_latency_ms = config::connector_io_tasks_adjust_interval_ms; - - // ------------------------ - bool started_running = false; }; // ==================== ConnectorScanOperator ==================== @@ -259,7 +256,6 @@ Status ConnectorScanOperator::do_prepare(RuntimeState* state) { _unique_metrics->add_info_string("AdaptiveIOTasks", _enable_adaptive_io_tasks ? "True" : "False"); _adaptive_processor = state->obj_pool()->add(new ConnectorScanOperatorAdaptiveProcessor()); _adaptive_processor->op_start_time = GetCurrentTimeMicros(); - _adaptive_processor->started_running = false; if (options.__isset.connector_io_tasks_slow_io_latency_ms) { _adaptive_processor->slow_io_latency_ms = options.connector_io_tasks_slow_io_latency_ms; } @@ -270,7 +266,7 @@ Status ConnectorScanOperator::do_prepare(RuntimeState* state) { ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter; int64_t c = L->update_open_scan_operator_count(1); if (c == 0) { - _adjust_scan_mem_limit(connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES, + _adjust_scan_mem_limit(connector::DataSourceProvider::DEFAULT_DATA_SOURCE_MEM_BYTES, L->get_arb_chunk_source_mem_bytes()); } }