Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] fix connector mem scan limit adjustment when no chunk source #53112

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ using DataSourcePtr = std::unique_ptr<DataSource>;

class DataSourceProvider {
public:
static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB
static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB
static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB
static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB
static constexpr int64_t DEFAULT_DATA_SOURCE_MEM_BYTES = 64 * 1024 * 1024; // 64MB
static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB
static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB

virtual ~DataSourceProvider() = default;

Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,12 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
}
}

int scan_node_number = 1;
if (query_globals.__isset.scan_node_number) {
scan_node_number = query_globals.scan_node_number;
int connector_scan_node_number = 1;
if (query_globals.__isset.connector_scan_node_number) {
connector_scan_node_number = query_globals.connector_scan_node_number;
}
_query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_ratio,
wg.get(), runtime_state, scan_node_number);
wg.get(), runtime_state, connector_scan_node_number);

auto query_mem_tracker = _query_ctx->mem_tracker();
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(query_mem_tracker.get());
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void QueryContext::cancel(const Status& status) {

void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit,
std::optional<double> spill_mem_reserve_ratio, workgroup::WorkGroup* wg,
RuntimeState* runtime_state, int scan_node_number) {
RuntimeState* runtime_state, int connector_scan_node_number) {
std::call_once(_init_mem_tracker_once, [=]() {
_profile = std::make_shared<RuntimeProfile>("Query" + print_id(_query_id));
auto* mem_tracker_counter =
Expand Down Expand Up @@ -142,7 +142,7 @@ void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent,
_static_query_mem_limit = std::min(big_query_mem_limit, _static_query_mem_limit);
}
_connector_scan_operator_mem_share_arbitrator = _object_pool.add(
new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, scan_node_number));
new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, connector_scan_node_number));

{
MemTracker* connector_scan_parent = GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
/// that there is a big query memory limit of this resource group.
void init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit = -1,
std::optional<double> spill_mem_limit = std::nullopt, workgroup::WorkGroup* wg = nullptr,
RuntimeState* state = nullptr, int scan_node_number = 1);
RuntimeState* state = nullptr, int connector_scan_node_number = 1);
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
MemTracker* connector_scan_mem_tracker() { return _connector_scan_mem_tracker.get(); }

Expand Down
37 changes: 11 additions & 26 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
#include "exec/connector_scan_node.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/workgroup/work_group.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"

namespace starrocks::pipeline {

// ==================== ConnectorScanOperatorFactory ====================
ConnectorScanOperatorMemShareArbitrator::ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit,
int scan_node_number)
int connector_scan_node_number)
: query_mem_limit(query_mem_limit),
scan_mem_limit(query_mem_limit),
total_chunk_source_mem_bytes(scan_node_number * connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES) {}
total_chunk_source_mem_bytes(connector_scan_node_number *
connector::DataSourceProvider::DEFAULT_DATA_SOURCE_MEM_BYTES) {}

int64_t ConnectorScanOperatorMemShareArbitrator::update_chunk_source_mem_bytes(int64_t old_value, int64_t new_value) {
int64_t diff = new_value - old_value;
Expand All @@ -54,9 +54,7 @@ class ConnectorScanOperatorIOTasksMemLimiter {
int64_t chunk_source_mem_bytes_update_count = 0;
int64_t arb_chunk_source_mem_bytes = 0;
mutable int64_t debug_output_timestamp = 0;

std::atomic<int64_t> open_scan_operator_count = 0;
std::atomic<int64_t> active_scan_operator_count = 0;

public:
ConnectorScanOperatorIOTasksMemLimiter(int64_t dop, bool shared_scan) : dop(dop), shared_scan(shared_scan) {}
Expand Down Expand Up @@ -127,9 +125,6 @@ class ConnectorScanOperatorIOTasksMemLimiter {
int64_t update_open_scan_operator_count(int delta) {
return open_scan_operator_count.fetch_add(delta, std::memory_order_seq_cst);
}
int64_t update_active_scan_operator_count(int delta) {
return active_scan_operator_count.fetch_add(delta, std::memory_order_seq_cst);
}
};

ConnectorScanOperatorFactory::ConnectorScanOperatorFactory(int32_t id, ScanNode* scan_node, RuntimeState* state,
Expand Down Expand Up @@ -218,9 +213,6 @@ struct ConnectorScanOperatorAdaptiveProcessor {
int try_add_io_tasks_fail_count = 0;
int check_slow_io = 0;
int32_t slow_io_latency_ms = config::connector_io_tasks_adjust_interval_ms;

// ------------------------
bool started_running = false;
};

// ==================== ConnectorScanOperator ====================
Expand Down Expand Up @@ -264,43 +256,36 @@ Status ConnectorScanOperator::do_prepare(RuntimeState* state) {
_unique_metrics->add_info_string("AdaptiveIOTasks", _enable_adaptive_io_tasks ? "True" : "False");
_adaptive_processor = state->obj_pool()->add(new ConnectorScanOperatorAdaptiveProcessor());
_adaptive_processor->op_start_time = GetCurrentTimeMicros();
_adaptive_processor->started_running = false;
if (options.__isset.connector_io_tasks_slow_io_latency_ms) {
_adaptive_processor->slow_io_latency_ms = options.connector_io_tasks_slow_io_latency_ms;
}

// As the first running scan operator, it will update the scan mem limit
{
auto* factory = down_cast<ConnectorScanOperatorFactory*>(_factory);
ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter;
L->update_open_scan_operator_count(1);
int64_t c = L->update_open_scan_operator_count(1);
if (c == 0) {
_adjust_scan_mem_limit(connector::DataSourceProvider::DEFAULT_DATA_SOURCE_MEM_BYTES,
L->get_arb_chunk_source_mem_bytes());
}
}
return Status::OK();
}

void ConnectorScanOperator::do_close(RuntimeState* state) {
// As the last closing scan operator, it will update the scan mem limit.
auto* factory = down_cast<ConnectorScanOperatorFactory*>(_factory);
ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter;
int64_t c = L->update_open_scan_operator_count(-1);
if (c == 1) {
if (L->update_active_scan_operator_count(0) > 0) {
_adjust_scan_mem_limit(L->get_arb_chunk_source_mem_bytes(), 0);
}
_adjust_scan_mem_limit(L->get_arb_chunk_source_mem_bytes(), 0);
}
}

ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
auto* scan_node = down_cast<ConnectorScanNode*>(_scan_node);
auto* factory = down_cast<ConnectorScanOperatorFactory*>(_factory);
ConnectorScanOperatorIOTasksMemLimiter* L = factory->_io_tasks_mem_limiter;

if (_adaptive_processor->started_running == false) {
_adaptive_processor->started_running = true;
int64_t c = L->update_active_scan_operator_count(1);
if (c == 0) {
_adjust_scan_mem_limit(connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES,
L->get_arb_chunk_source_mem_bytes());
}
}

// Only use one chunk source profile, so we can see metrics on scan operator level.
// Since there is adaptive io tasks feature, chunk sources will be used unevenly,
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct ConnectorScanOperatorMemShareArbitrator {
int64_t scan_mem_limit = 0;
std::atomic<int64_t> total_chunk_source_mem_bytes = 0;

ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int scan_node_number);
ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int connector_scan_node_number);

int64_t set_scan_mem_ratio(double mem_ratio) {
scan_mem_limit = std::max<int64_t>(1, query_mem_limit * mem_ratio);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public void computeRangeLocations() {
boolean tabletIsNull = true;
for (Replica replica : allQueryableReplicas) {
ComputeNode node =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(replica.getBackendId());
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()
.getBackendOrComputeNode(replica.getBackendId());
if (node == null) {
LOG.debug("replica {} not exists", replica.getBackendId());
continue;
Expand Down Expand Up @@ -197,4 +198,9 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
public boolean canUseRuntimeAdaptiveDop() {
return true;
}

@Override
public boolean isRunningAsConnectorOperator() {
return false;
}
}
10 changes: 7 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ public List<TScanRangeLocations> updateScanRangeLocations(List<TScanRangeLocatio
return newLocations;
}


private void checkSomeAliveComputeNode() throws ErrorReportException {
// Note that it's theoretically possible that there were some living CN earlier in this query's execution, and then
// they all died, but in that case, the problem this will be surfaced later anyway.
Expand Down Expand Up @@ -778,7 +777,8 @@ public void checkIfScanRangeNumSafe(long scanRangeSize) {
long freeMemory = runtime.freeMemory();
if (totalScanRangeBytes > freeMemory / 2) {
LOG.warn(
"Try to allocate too many scan ranges for table {}, which may cause FE OOM, Partition Num:{}, tablet Num:{}, Scan Range Total Bytes:{}",
"Try to allocate too many scan ranges for table {}, which may cause FE OOM, Partition Num:{}, tablet " +
"Num:{}, Scan Range Total Bytes:{}",
olapTable.getName(), totalPartitionNum, totalTabletsNum, totalScanRangeBytes);
}
}
Expand Down Expand Up @@ -942,7 +942,6 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
return output.toString();
}


private void assignOrderByHints(List<String> keyColumnNames) {
// assign order by hint
for (RuntimeFilterDescription probeRuntimeFilter : probeRuntimeFilters) {
Expand Down Expand Up @@ -1514,4 +1513,9 @@ public void clearScanNodeForThriftBuild() {
bucketColumns.clear();
rowStoreKeyLiterals = Lists.newArrayList();
}

@Override
public boolean isRunningAsConnectorOperator() {
return false;
}
}
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,10 @@ protected boolean supportTopNRuntimeFilter() {
public boolean needCollectExecStats() {
return true;
}

// We use this flag to know how many connector scan nodes at BE side, and connector framework
// will use this number to fair share memory usage between those scan nodes.
public boolean isRunningAsConnectorOperator() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -372,7 +371,6 @@ public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength)
return beScanRanges;
}


@Override
public boolean canUseRuntimeAdaptiveDop() {
return true;
Expand All @@ -385,4 +383,9 @@ public String getCatalogName() {
public void setCatalogName(String catalogName) {
this.catalogName = catalogName;
}

@Override
public boolean isRunningAsConnectorOperator() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static JobSpec fromQuerySpec(ConnectContext context,
if (context.getLastQueryId() != null) {
queryGlobals.setLast_query_id(context.getLastQueryId().toString());
}
queryGlobals.setScan_node_number(scanNodes.size());
queryGlobals.setConnector_scan_node_number(scanNodes.stream().filter(x -> x.isRunningAsConnectorOperator()).count());

return new Builder()
.queryId(context.getExecutionId())
Expand Down Expand Up @@ -143,7 +143,7 @@ public static JobSpec fromMVMaintenanceJobSpec(ConnectContext context,
if (context.getLastQueryId() != null) {
queryGlobals.setLast_query_id(context.getLastQueryId().toString());
}
queryGlobals.setScan_node_number(scanNodes.size());
queryGlobals.setConnector_scan_node_number(scanNodes.stream().filter(x -> x.isRunningAsConnectorOperator()).count());

return new Builder()
.queryId(context.getExecutionId())
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ struct TQueryGlobals {

31: optional i64 timestamp_us

32: optional i64 scan_node_number
32: optional i64 connector_scan_node_number
dirtysalt marked this conversation as resolved.
Show resolved Hide resolved
}


Expand Down
Loading