Skip to content

Commit

Permalink
[Enhancement] fix connector mem scan limit adjustment when no chunk s…
Browse files Browse the repository at this point in the history
…ource (#53112)

Signed-off-by: yanz <[email protected]>
(cherry picked from commit 14ccfcc)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java
#	fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
#	fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java
  • Loading branch information
dirtysalt authored and mergify[bot] committed Nov 25, 2024
1 parent 7b672b9 commit 906f250
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 44 deletions.
7 changes: 4 additions & 3 deletions be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ using DataSourcePtr = std::unique_ptr<DataSource>;

class DataSourceProvider {
public:
static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB
static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB
static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB
static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB
static constexpr int64_t DEFAULT_DATA_SOURCE_MEM_BYTES = 64 * 1024 * 1024; // 64MB
static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB
static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB

virtual ~DataSourceProvider() = default;

Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
}
}

int scan_node_number = 1;
if (query_globals.__isset.scan_node_number) {
scan_node_number = query_globals.scan_node_number;
int connector_scan_node_number = 1;
if (query_globals.__isset.connector_scan_node_number) {
connector_scan_node_number = query_globals.connector_scan_node_number;
}
_query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_ratio,
wg.get(), runtime_state, scan_node_number);
wg.get(), runtime_state, connector_scan_node_number);

auto query_mem_tracker = _query_ctx->mem_tracker();
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(query_mem_tracker.get());
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void QueryContext::cancel(const Status& status) {

void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit,
std::optional<double> spill_mem_reserve_ratio, workgroup::WorkGroup* wg,
RuntimeState* runtime_state, int scan_node_number) {
RuntimeState* runtime_state, int connector_scan_node_number) {
std::call_once(_init_mem_tracker_once, [=]() {
_profile = std::make_shared<RuntimeProfile>("Query" + print_id(_query_id));
auto* mem_tracker_counter =
Expand Down Expand Up @@ -145,7 +145,7 @@ void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent,
_static_query_mem_limit = std::min(big_query_mem_limit, _static_query_mem_limit);
}
_connector_scan_operator_mem_share_arbitrator = _object_pool.add(
new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, scan_node_number));
new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, connector_scan_node_number));

{
MemTracker* connector_scan_parent = GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
/// that there is a big query memory limit of this resource group.
void init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit = -1,
std::optional<double> spill_mem_limit = std::nullopt, workgroup::WorkGroup* wg = nullptr,
RuntimeState* state = nullptr, int scan_node_number = 1);
RuntimeState* state = nullptr, int connector_scan_node_number = 1);
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
MemTracker* connector_scan_mem_tracker() { return _connector_scan_mem_tracker.get(); }

Expand Down
37 changes: 11 additions & 26 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
#include "exec/connector_scan_node.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/workgroup/work_group.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"

namespace starrocks::pipeline {

// ==================== ConnectorScanOperatorFactory ====================
ConnectorScanOperatorMemShareArbitrator::ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit,
int scan_node_number)
int connector_scan_node_number)
: query_mem_limit(query_mem_limit),
scan_mem_limit(query_mem_limit),
total_chunk_source_mem_bytes(scan_node_number * connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES) {}
total_chunk_source_mem_bytes(connector_scan_node_number *
connector::DataSourceProvider::DEFAULT_DATA_SOURCE_MEM_BYTES) {}

int64_t ConnectorScanOperatorMemShareArbitrator::update_chunk_source_mem_bytes(int64_t old_value, int64_t new_value) {
int64_t diff = new_value - old_value;
Expand All @@ -54,9 +54,7 @@ class ConnectorScanOperatorIOTasksMemLimiter {
int64_t chunk_source_mem_bytes_update_count = 0;
int64_t arb_chunk_source_mem_bytes = 0;
mutable int64_t debug_output_timestamp = 0;

std::atomic<int64_t> open_scan_operator_count = 0;
std::atomic<int64_t> active_scan_operator_count = 0;

public:
ConnectorScanOperatorIOTasksMemLimiter(int64_t dop, bool shared_scan) : dop(dop), shared_scan(shared_scan) {}
Expand Down Expand Up @@ -127,9 +125,6 @@ class ConnectorScanOperatorIOTasksMemLimiter {
int64_t update_open_scan_operator_count(int delta) {
return open_scan_operator_count.fetch_add(delta, std::memory_order_seq_cst);
}
int64_t update_active_scan_operator_count(int delta) {
return active_scan_operator_count.fetch_add(delta, std::memory_order_seq_cst);
}
};

ConnectorScanOperatorFactory::ConnectorScanOperatorFactory(int32_t id, ScanNode* scan_node, RuntimeState* state,
Expand Down Expand Up @@ -218,9 +213,6 @@ struct ConnectorScanOperatorAdaptiveProcessor {
int try_add_io_tasks_fail_count = 0;
int check_slow_io = 0;
int32_t slow_io_latency_ms = config::connector_io_tasks_adjust_interval_ms;

// ------------------------
bool started_running = false;
};

// ==================== ConnectorScanOperator ====================
Expand Down Expand Up @@ -264,43 +256,36 @@ Status ConnectorScanOperator::do_prepare(RuntimeState* state) {
_unique_metrics->add_info_string("AdaptiveIOTasks", _enable_adaptive_io_tasks ? "True" : "False");
_adaptive_processor = state->obj_pool()->add(new ConnectorScanOperatorAdaptiveProcessor());
_adaptive_processor->op_start_time = GetCurrentTimeMicros();
_adaptive_processor->started_running = false;
if (options.__isset.connector_io_tasks_slow_io_latency_ms) {
_adaptive_processor->slow_io_latency_ms = options.connector_io_tasks_slow_io_latency_ms;
}

// As the first running scan operator, it will update the scan mem limit
{
auto* factory = down_cast<ConnectorScanOperatorFactory*>(_factory);
ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter;
L->update_open_scan_operator_count(1);
int64_t c = L->update_open_scan_operator_count(1);
if (c == 0) {
_adjust_scan_mem_limit(connector::DataSourceProvider::DEFAULT_DATA_SOURCE_MEM_BYTES,
L->get_arb_chunk_source_mem_bytes());
}
}
return Status::OK();
}

void ConnectorScanOperator::do_close(RuntimeState* state) {
// As the last closing scan operator, it will update the scan mem limit.
auto* factory = down_cast<ConnectorScanOperatorFactory*>(_factory);
ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter;
int64_t c = L->update_open_scan_operator_count(-1);
if (c == 1) {
if (L->update_active_scan_operator_count(0) > 0) {
_adjust_scan_mem_limit(L->get_arb_chunk_source_mem_bytes(), 0);
}
_adjust_scan_mem_limit(L->get_arb_chunk_source_mem_bytes(), 0);
}
}

ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
auto* scan_node = down_cast<ConnectorScanNode*>(_scan_node);
auto* factory = down_cast<ConnectorScanOperatorFactory*>(_factory);
ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter;

if (_adaptive_processor->started_running == false) {
_adaptive_processor->started_running = true;
int64_t c = L->update_active_scan_operator_count(1);
if (c == 0) {
_adjust_scan_mem_limit(connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES,
L->get_arb_chunk_source_mem_bytes());
}
}

// Only use one chunk source profile, so we can see metrics on scan operator level.
// Since there is adaptive io tasks feature, chunk sources will be used unevenly,
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct ConnectorScanOperatorMemShareArbitrator {
int64_t scan_mem_limit = 0;
std::atomic<int64_t> total_chunk_source_mem_bytes = 0;

ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int scan_node_number);
ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int connector_scan_node_number);

int64_t set_scan_mem_ratio(double mem_ratio) {
scan_mem_limit = std::max<int64_t>(1, query_mem_limit * mem_ratio);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public void computeRangeLocations() {
boolean tabletIsNull = true;
for (Replica replica : allQueryableReplicas) {
ComputeNode node =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(replica.getBackendId());
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()
.getBackendOrComputeNode(replica.getBackendId());
if (node == null) {
LOG.debug("replica {} not exists", replica.getBackendId());
continue;
Expand Down Expand Up @@ -183,4 +184,9 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
public boolean canUseRuntimeAdaptiveDop() {
return true;
}

@Override
public boolean isRunningAsConnectorOperator() {
return false;
}
}
12 changes: 10 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ public List<TScanRangeLocations> updateScanRangeLocations(List<TScanRangeLocatio
return newLocations;
}


private void checkSomeAliveComputeNode() throws ErrorReportException {
// Note that it's theoretically possible that there were some living CN earlier in this query's execution, and then
// they all died, but in that case, the problem this will be surfaced later anyway.
Expand Down Expand Up @@ -764,7 +763,8 @@ public void checkIfScanRangeNumSafe(long scanRangeSize) {
long freeMemory = runtime.freeMemory();
if (totalScanRangeBytes > freeMemory / 2) {
LOG.warn(
"Try to allocate too many scan ranges for table {}, which may cause FE OOM, Partition Num:{}, tablet Num:{}, Scan Range Total Bytes:{}",
"Try to allocate too many scan ranges for table {}, which may cause FE OOM, Partition Num:{}, tablet " +
"Num:{}, Scan Range Total Bytes:{}",
olapTable.getName(), totalPartitionNum, totalTabletsNum, totalScanRangeBytes);
}
}
Expand Down Expand Up @@ -912,11 +912,14 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
return output.toString();
}

<<<<<<< HEAD
@Override
public int getNumInstances() {
return result.size();
}

=======
>>>>>>> 14ccfcc047 ([Enhancement] fix connector mem scan limit adjustment when no chunk source (#53112))
private void assignOrderByHints(List<String> keyColumnNames) {
// assign order by hint
for (RuntimeFilterDescription probeRuntimeFilter : probeRuntimeFilters) {
Expand Down Expand Up @@ -1450,4 +1453,9 @@ public void clearScanNodeForThriftBuild() {
bucketColumns.clear();
rowStoreKeyLiterals = Lists.newArrayList();
}

@Override
public boolean isRunningAsConnectorOperator() {
return false;
}
}
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,18 @@ protected List<TColumnAccessPath> columnAccessPathToThrift() {
protected boolean supportTopNRuntimeFilter() {
return false;
}
<<<<<<< HEAD
=======

@Override
public boolean needCollectExecStats() {
return true;
}

// We use this flag to know how many connector scan nodes at BE side, and connector framework
// will use this number to fair share memory usage between those scan nodes.
public boolean isRunningAsConnectorOperator() {
return true;
}
>>>>>>> 14ccfcc047 ([Enhancement] fix connector mem scan limit adjustment when no chunk source (#53112))
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -372,11 +371,14 @@ public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength)
return beScanRanges;
}

<<<<<<< HEAD
@Override
public int getNumInstances() {
return beScanRanges == null ? 1 : beScanRanges.size();
}

=======
>>>>>>> 14ccfcc047 ([Enhancement] fix connector mem scan limit adjustment when no chunk source (#53112))
@Override
public boolean canUseRuntimeAdaptiveDop() {
return true;
Expand All @@ -389,4 +391,9 @@ public String getCatalogName() {
public void setCatalogName(String catalogName) {
this.catalogName = catalogName;
}

@Override
public boolean isRunningAsConnectorOperator() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static JobSpec fromQuerySpec(ConnectContext context,
if (context.getLastQueryId() != null) {
queryGlobals.setLast_query_id(context.getLastQueryId().toString());
}
queryGlobals.setScan_node_number(scanNodes.size());
queryGlobals.setConnector_scan_node_number(scanNodes.stream().filter(x -> x.isRunningAsConnectorOperator()).count());

return new Builder()
.queryId(context.getExecutionId())
Expand Down Expand Up @@ -140,7 +140,7 @@ public static JobSpec fromMVMaintenanceJobSpec(ConnectContext context,
if (context.getLastQueryId() != null) {
queryGlobals.setLast_query_id(context.getLastQueryId().toString());
}
queryGlobals.setScan_node_number(scanNodes.size());
queryGlobals.setConnector_scan_node_number(scanNodes.stream().filter(x -> x.isRunningAsConnectorOperator()).count());

return new Builder()
.queryId(context.getExecutionId())
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ struct TQueryGlobals {

31: optional i64 timestamp_us

32: optional i64 scan_node_number
32: optional i64 connector_scan_node_number
}


Expand Down

0 comments on commit 906f250

Please sign in to comment.