Skip to content

Commit

Permalink
[BugFix] Fix running fragment count during exec env exit (#52048)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Xiaohua Cai <[email protected]>
(cherry picked from commit e259267)

# Conflicts:
#	be/src/testutil/init_test_env.h
  • Loading branch information
kevincai authored and mergify[bot] committed Oct 18, 2024
1 parent 05a5ad5 commit cfcc0f7
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 11 deletions.
24 changes: 15 additions & 9 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,22 +684,28 @@ void ExecEnv::destroy() {
}

void ExecEnv::_wait_for_fragments_finish() {
size_t max_loop_cnt_cfg = config::loop_count_wait_fragments_finish;
if (max_loop_cnt_cfg == 0) {
size_t max_loop_secs = config::loop_count_wait_fragments_finish * 10;
if (max_loop_secs == 0) {
return;
}

size_t running_fragments = _fragment_mgr->running_fragment_count();
size_t loop_cnt = 0;
size_t running_fragments = _get_running_fragments_count();
size_t loop_secs = 0;

while (running_fragments && loop_cnt < max_loop_cnt_cfg) {
DLOG(INFO) << running_fragments << " fragment(s) are still running...";
sleep(10);
running_fragments = _fragment_mgr->running_fragment_count();
loop_cnt++;
while (running_fragments > 0 && loop_secs < max_loop_secs) {
LOG(INFO) << running_fragments << " fragment(s) are still running...";
sleep(1);
running_fragments = _get_running_fragments_count();
loop_secs++;
}
}

size_t ExecEnv::_get_running_fragments_count() const {
// fragment is registered in _fragment_mgr in non-pipeline env
// while _query_context_mgr is used in pipeline engine.
return _fragment_mgr->running_fragment_count() + _query_context_mgr->size();
}

void ExecEnv::wait_for_finish() {
_wait_for_fragments_finish();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ class ExecEnv {

private:
void _wait_for_fragments_finish();
size_t _get_running_fragments_count() const;

std::vector<StorePath> _store_paths;
// Leave protected so that subclasses can override
Expand Down
16 changes: 14 additions & 2 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ extern std::atomic<bool> k_starrocks_exit_quick;
using PromiseStatus = std::promise<Status>;
using PromiseStatusSharedPtr = std::shared_ptr<PromiseStatus>;

namespace {
static bool is_shutdown_in_progress() {
return k_starrocks_exit.load(std::memory_order_relaxed) || k_starrocks_exit_quick.load(std::memory_order_relaxed);
}
} // namespace

template <typename T>
PInternalServiceImplBase<T>::PInternalServiceImplBase(ExecEnv* exec_env) : _exec_env(exec_env) {}

Expand Down Expand Up @@ -296,7 +302,7 @@ void PInternalServiceImplBase<T>::_exec_plan_fragment(google::protobuf::RpcContr
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
auto* cntl = static_cast<brpc::Controller*>(cntl_base);
if (k_starrocks_exit.load(std::memory_order_relaxed) || k_starrocks_exit_quick.load(std::memory_order_relaxed)) {
if (is_shutdown_in_progress()) {
cntl->SetFailed(brpc::EINTERNAL, "BE is shutting down");
LOG(WARNING) << "reject exec plan fragment because of exit";
return;
Expand Down Expand Up @@ -328,6 +334,12 @@ void PInternalServiceImplBase<T>::_exec_batch_plan_fragments(google::protobuf::R
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
auto* cntl = static_cast<brpc::Controller*>(cntl_base);
if (is_shutdown_in_progress()) {
cntl->SetFailed(brpc::EINTERNAL, "BE is shutting down");
LOG(WARNING) << "reject exec plan fragment because of exit";
return;
}

auto ser_request = cntl->request_attachment().to_string();
std::shared_ptr<TExecBatchPlanFragmentsParams> t_batch_requests = std::make_shared<TExecBatchPlanFragmentsParams>();
{
Expand Down Expand Up @@ -1161,7 +1173,7 @@ void PInternalServiceImplBase<T>::exec_short_circuit(google::protobuf::RpcContro
watch.start();

auto* cntl = static_cast<brpc::Controller*>(cntl_base);
if (k_starrocks_exit.load(std::memory_order_relaxed)) {
if (is_shutdown_in_progress()) {
cntl->SetFailed(brpc::EINTERNAL, "BE is shutting down");
return;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/testutil/init_test_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ int init_test_env(int argc, char** argv) {
// clear some trash objects kept in tablet_manager so mem_tracker checks will not fail
CHECK(StorageEngine::instance()->tablet_manager()->start_trash_sweep().ok());
(void)butil::DeleteFile(storage_root, true);
<<<<<<< HEAD
TEST_clear_all_columns_this_thread();
=======
exec_env->wait_for_finish();
>>>>>>> e259267bb8 ([BugFix] Fix running fragment count during exec env exit (#52048))
// delete engine
StorageEngine::instance()->stop();
// destroy exec env
Expand Down

0 comments on commit cfcc0f7

Please sign in to comment.