diff --git a/be/src/connector/connector.h b/be/src/connector/connector.h index e3f901c517b00..4fd4a64073add 100644 --- a/be/src/connector/connector.h +++ b/be/src/connector/connector.h @@ -125,9 +125,10 @@ using DataSourcePtr = std::unique_ptr; class DataSourceProvider { public: - static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB - static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB - static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB + static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB + static constexpr int64_t DEFAULT_DATA_SOURCE_MEM_BYTES = 64 * 1024 * 1024; // 64MB + static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB + static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB virtual ~DataSourceProvider() = default; diff --git a/be/src/exec/pipeline/fragment_executor.cpp b/be/src/exec/pipeline/fragment_executor.cpp index c6ce17ceab77a..29a435b400dba 100644 --- a/be/src/exec/pipeline/fragment_executor.cpp +++ b/be/src/exec/pipeline/fragment_executor.cpp @@ -226,12 +226,12 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified } } - int scan_node_number = 1; - if (query_globals.__isset.scan_node_number) { - scan_node_number = query_globals.scan_node_number; + int connector_scan_node_number = 1; + if (query_globals.__isset.connector_scan_node_number) { + connector_scan_node_number = query_globals.connector_scan_node_number; } _query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_ratio, - wg.get(), runtime_state, scan_node_number); + wg.get(), runtime_state, connector_scan_node_number); auto query_mem_tracker = _query_ctx->mem_tracker(); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(query_mem_tracker.get()); diff --git a/be/src/exec/pipeline/query_context.cpp b/be/src/exec/pipeline/query_context.cpp index d7be3cc686101..7d395b44b3296 100644 --- a/be/src/exec/pipeline/query_context.cpp +++ b/be/src/exec/pipeline/query_context.cpp @@ -113,7 +113,7 @@ void QueryContext::cancel(const Status& status) { void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit, std::optional spill_mem_reserve_ratio, workgroup::WorkGroup* wg, - RuntimeState* runtime_state, int scan_node_number) { + RuntimeState* runtime_state, int connector_scan_node_number) { std::call_once(_init_mem_tracker_once, [=]() { _profile = std::make_shared("Query" + print_id(_query_id)); auto* mem_tracker_counter = @@ -145,7 +145,7 @@ void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, _static_query_mem_limit = std::min(big_query_mem_limit, _static_query_mem_limit); } _connector_scan_operator_mem_share_arbitrator = _object_pool.add( - new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, scan_node_number)); + new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, connector_scan_node_number)); { MemTracker* connector_scan_parent = GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker(); diff --git a/be/src/exec/pipeline/query_context.h b/be/src/exec/pipeline/query_context.h index 236694cd6e849..027135936406d 100644 --- a/be/src/exec/pipeline/query_context.h +++ b/be/src/exec/pipeline/query_context.h @@ -164,7 +164,7 @@ class QueryContext : public std::enable_shared_from_this { /// that there is a big query memory limit of this resource group. void init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit = -1, std::optional spill_mem_limit = std::nullopt, workgroup::WorkGroup* wg = nullptr, - RuntimeState* state = nullptr, int scan_node_number = 1); + RuntimeState* state = nullptr, int connector_scan_node_number = 1); std::shared_ptr mem_tracker() { return _mem_tracker; } MemTracker* connector_scan_mem_tracker() { return _connector_scan_mem_tracker.get(); } diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.cpp b/be/src/exec/pipeline/scan/connector_scan_operator.cpp index ef50bb44624c5..abec83ce6ced7 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/connector_scan_operator.cpp @@ -17,7 +17,6 @@ #include "exec/connector_scan_node.h" #include "exec/pipeline/pipeline_driver.h" #include "exec/pipeline/scan/balanced_chunk_buffer.h" -#include "exec/workgroup/work_group.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -25,10 +24,11 @@ namespace starrocks::pipeline { // ==================== ConnectorScanOperatorFactory ==================== ConnectorScanOperatorMemShareArbitrator::ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, - int scan_node_number) + int connector_scan_node_number) : query_mem_limit(query_mem_limit), scan_mem_limit(query_mem_limit), - total_chunk_source_mem_bytes(scan_node_number * connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES) {} + total_chunk_source_mem_bytes(connector_scan_node_number * + connector::DataSourceProvider::DEFAULT_DATA_SOURCE_MEM_BYTES) {} int64_t ConnectorScanOperatorMemShareArbitrator::update_chunk_source_mem_bytes(int64_t old_value, int64_t new_value) { int64_t diff = new_value - old_value; @@ -54,9 +54,7 @@ class ConnectorScanOperatorIOTasksMemLimiter { int64_t chunk_source_mem_bytes_update_count = 0; int64_t arb_chunk_source_mem_bytes = 0; mutable int64_t debug_output_timestamp = 0; - std::atomic open_scan_operator_count = 0; - std::atomic active_scan_operator_count = 0; public: ConnectorScanOperatorIOTasksMemLimiter(int64_t dop, bool shared_scan) : dop(dop), shared_scan(shared_scan) {} @@ -127,9 +125,6 @@ class ConnectorScanOperatorIOTasksMemLimiter { int64_t update_open_scan_operator_count(int delta) { return open_scan_operator_count.fetch_add(delta, std::memory_order_seq_cst); } - int64_t update_active_scan_operator_count(int delta) { - return active_scan_operator_count.fetch_add(delta, std::memory_order_seq_cst); - } }; ConnectorScanOperatorFactory::ConnectorScanOperatorFactory(int32_t id, ScanNode* scan_node, RuntimeState* state, @@ -218,9 +213,6 @@ struct ConnectorScanOperatorAdaptiveProcessor { int try_add_io_tasks_fail_count = 0; int check_slow_io = 0; int32_t slow_io_latency_ms = config::connector_io_tasks_adjust_interval_ms; - - // ------------------------ - bool started_running = false; }; // ==================== ConnectorScanOperator ==================== @@ -264,43 +256,36 @@ Status ConnectorScanOperator::do_prepare(RuntimeState* state) { _unique_metrics->add_info_string("AdaptiveIOTasks", _enable_adaptive_io_tasks ? "True" : "False"); _adaptive_processor = state->obj_pool()->add(new ConnectorScanOperatorAdaptiveProcessor()); _adaptive_processor->op_start_time = GetCurrentTimeMicros(); - _adaptive_processor->started_running = false; if (options.__isset.connector_io_tasks_slow_io_latency_ms) { _adaptive_processor->slow_io_latency_ms = options.connector_io_tasks_slow_io_latency_ms; } + // As the first running scan operator, it will update the scan mem limit { auto* factory = down_cast(_factory); ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter; - L->update_open_scan_operator_count(1); + int64_t c = L->update_open_scan_operator_count(1); + if (c == 0) { + _adjust_scan_mem_limit(connector::DataSourceProvider::DEFAULT_DATA_SOURCE_MEM_BYTES, + L->get_arb_chunk_source_mem_bytes()); + } } return Status::OK(); } void ConnectorScanOperator::do_close(RuntimeState* state) { + // As the last closing scan operator, it will update the scan mem limit. auto* factory = down_cast(_factory); ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter; int64_t c = L->update_open_scan_operator_count(-1); if (c == 1) { - if (L->update_active_scan_operator_count(0) > 0) { - _adjust_scan_mem_limit(L->get_arb_chunk_source_mem_bytes(), 0); - } + _adjust_scan_mem_limit(L->get_arb_chunk_source_mem_bytes(), 0); } } ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { auto* scan_node = down_cast(_scan_node); auto* factory = down_cast(_factory); - ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter; - - if (_adaptive_processor->started_running == false) { - _adaptive_processor->started_running = true; - int64_t c = L->update_active_scan_operator_count(1); - if (c == 0) { - _adjust_scan_mem_limit(connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES, - L->get_arb_chunk_source_mem_bytes()); - } - } // Only use one chunk source profile, so we can see metrics on scan operator level. // Since there is adaptive io tasks feature, chunk sources will be used unevenly, diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.h b/be/src/exec/pipeline/scan/connector_scan_operator.h index f9351ffbca3b3..943555b058d78 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.h +++ b/be/src/exec/pipeline/scan/connector_scan_operator.h @@ -35,7 +35,7 @@ struct ConnectorScanOperatorMemShareArbitrator { int64_t scan_mem_limit = 0; std::atomic total_chunk_source_mem_bytes = 0; - ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int scan_node_number); + ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int connector_scan_node_number); int64_t set_scan_mem_ratio(double mem_ratio) { scan_mem_limit = std::max(1, query_mem_limit * mem_ratio); diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/MetaScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/MetaScanNode.java index e16bda728cbfc..60aa4fcd1d5fb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/MetaScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/MetaScanNode.java @@ -117,7 +117,8 @@ public void computeRangeLocations() { boolean tabletIsNull = true; for (Replica replica : allQueryableReplicas) { ComputeNode node = - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(replica.getBackendId()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo() + .getBackendOrComputeNode(replica.getBackendId()); if (node == null) { LOG.debug("replica {} not exists", replica.getBackendId()); continue; @@ -183,4 +184,9 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) public boolean canUseRuntimeAdaptiveDop() { return true; } + + @Override + public boolean isRunningAsConnectorOperator() { + return false; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java index cbc0246f702c0..16499c72be0ba 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java @@ -504,7 +504,6 @@ public List updateScanRangeLocations(List freeMemory / 2) { LOG.warn( - "Try to allocate too many scan ranges for table {}, which may cause FE OOM, Partition Num:{}, tablet Num:{}, Scan Range Total Bytes:{}", + "Try to allocate too many scan ranges for table {}, which may cause FE OOM, Partition Num:{}, tablet " + + "Num:{}, Scan Range Total Bytes:{}", olapTable.getName(), totalPartitionNum, totalTabletsNum, totalScanRangeBytes); } } @@ -912,11 +912,14 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) return output.toString(); } +<<<<<<< HEAD @Override public int getNumInstances() { return result.size(); } +======= +>>>>>>> 14ccfcc047 ([Enhancement] fix connector mem scan limit adjustment when no chunk source (#53112)) private void assignOrderByHints(List keyColumnNames) { // assign order by hint for (RuntimeFilterDescription probeRuntimeFilter : probeRuntimeFilters) { @@ -1450,4 +1453,9 @@ public void clearScanNodeForThriftBuild() { bucketColumns.clear(); rowStoreKeyLiterals = Lists.newArrayList(); } + + @Override + public boolean isRunningAsConnectorOperator() { + return false; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java index e86214c837fd9..d82bb80716548 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java @@ -158,4 +158,18 @@ protected List columnAccessPathToThrift() { protected boolean supportTopNRuntimeFilter() { return false; } +<<<<<<< HEAD +======= + + @Override + public boolean needCollectExecStats() { + return true; + } + + // We use this flag to know how many connector scan nodes at BE side, and connector framework + // will use this number to fair share memory usage between those scan nodes. + public boolean isRunningAsConnectorOperator() { + return true; + } +>>>>>>> 14ccfcc047 ([Enhancement] fix connector mem scan limit adjustment when no chunk source (#53112)) } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java index f9170848e46ae..a799278431e6d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java @@ -58,7 +58,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -372,11 +371,14 @@ public List getScanRangeLocations(long maxScanRangeLength) return beScanRanges; } +<<<<<<< HEAD @Override public int getNumInstances() { return beScanRanges == null ? 1 : beScanRanges.size(); } +======= +>>>>>>> 14ccfcc047 ([Enhancement] fix connector mem scan limit adjustment when no chunk source (#53112)) @Override public boolean canUseRuntimeAdaptiveDop() { return true; @@ -389,4 +391,9 @@ public String getCatalogName() { public void setCatalogName(String catalogName) { this.catalogName = catalogName; } + + @Override + public boolean isRunningAsConnectorOperator() { + return false; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java index c24b9604127ef..d93201e698669 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java @@ -110,7 +110,7 @@ public static JobSpec fromQuerySpec(ConnectContext context, if (context.getLastQueryId() != null) { queryGlobals.setLast_query_id(context.getLastQueryId().toString()); } - queryGlobals.setScan_node_number(scanNodes.size()); + queryGlobals.setConnector_scan_node_number(scanNodes.stream().filter(x -> x.isRunningAsConnectorOperator()).count()); return new Builder() .queryId(context.getExecutionId()) @@ -140,7 +140,7 @@ public static JobSpec fromMVMaintenanceJobSpec(ConnectContext context, if (context.getLastQueryId() != null) { queryGlobals.setLast_query_id(context.getLastQueryId().toString()); } - queryGlobals.setScan_node_number(scanNodes.size()); + queryGlobals.setConnector_scan_node_number(scanNodes.stream().filter(x -> x.isRunningAsConnectorOperator()).count()); return new Builder() .queryId(context.getExecutionId()) diff --git a/gensrc/thrift/InternalService.thrift b/gensrc/thrift/InternalService.thrift index 29e9044409580..f0c2e56c74302 100644 --- a/gensrc/thrift/InternalService.thrift +++ b/gensrc/thrift/InternalService.thrift @@ -398,7 +398,7 @@ struct TQueryGlobals { 31: optional i64 timestamp_us - 32: optional i64 scan_node_number + 32: optional i64 connector_scan_node_number }