Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: revised TimerTaskManager and add some comments #2776

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ int DispatchThread::StartThread() {
}

// Adding timer tasks and run timertaskThread
timerTaskThread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
timer_task_thread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });
timerTaskThread_.set_thread_name("TimerTaskThread");
timerTaskThread_.StartThread();
timer_task_thread_.set_thread_name("DispacherTimerTaskThread");
timer_task_thread_.StartThread();
return ServerThread::StartThread();
}

Expand All @@ -88,7 +88,7 @@ int DispatchThread::StopThread() {
worker_thread_[i]->private_data_ = nullptr;
}
}
timerTaskThread_.StopThread();
timer_task_thread_.StopThread();
return ServerThread::StopThread();
}

Expand Down
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class DispatchThread : public ServerThread {
*/
std::shared_mutex block_mtx_;

TimerTaskThread timerTaskThread_;
TimerTaskThread timer_task_thread_;
}; // class DispatchThread

} // namespace net
Expand Down
46 changes: 18 additions & 28 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,31 @@ uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interv
int64_t next_expired_time = NowInMs() + interval_ms;
exec_queue_.insert({next_expired_time, new_task.task_id});

if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个逻辑不要删掉

min_interval_ms_ = interval_ms;
}
// return the id of this task
return new_task.task_id;
}

int64_t TimerTaskManager::NowInMs() {
auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
}
int TimerTaskManager::ExecTimerTask() {

int32_t TimerTaskManager::ExecTimerTask() {
std::vector<ExecTsWithId> fired_tasks_;
int64_t now_in_ms = NowInMs();
// traverse in ascending order
for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) {
if (pair->exec_ts <= now_in_ms) {
auto it = id_to_task_.find(pair->id);
// traverse in ascending order, and exec expired tasks
for (auto pair : exec_queue_) {
if (pair.exec_ts <= now_in_ms) {
auto it = id_to_task_.find(pair.id);
assert(it != id_to_task_.end());
it->second.fun();
fired_tasks_.push_back({pair->exec_ts, pair->id});
fired_tasks_.push_back({pair.exec_ts, pair.id});
now_in_ms = NowInMs();
} else {
break;
}
}

for (auto task : fired_tasks_) {
exec_queue_.erase(task);
auto it = id_to_task_.find(task.id);
Expand All @@ -69,15 +69,19 @@ int TimerTaskManager::ExecTimerTask() {
exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id});
} else {
// this task only need to be exec once, completely remove this task
int interval_del = it->second.interval_ms;
id_to_task_.erase(task.id);
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}
}
}
return min_interval_ms_;

if (exec_queue_.empty()) {
//no task to exec, epoll will use -1 as timeout value, and sink into endless wait
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好嘛,原始的 RenewMinIntervalMs() 函数避免使用 -1,你这干脆来个 -1

return -1;
}
int32_t gap_between_now_and_next_task = static_cast<int32_t>(exec_queue_.begin()->exec_ts - NowInMs());
gap_between_now_and_next_task = gap_between_now_and_next_task < 0 ? 0 : gap_between_now_and_next_task;
return gap_between_now_and_next_task;
}

bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
// remove the task
auto task_to_del = id_to_task_.find(task_id);
Expand All @@ -87,11 +91,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
int interval_del = task_to_del->second.interval_ms;
id_to_task_.erase(task_to_del);

// renew the min_interval_ms_
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}

// remove from exec queue
ExecTsWithId target_key = {-1, 0};
for (auto pair : exec_queue_) {
Expand All @@ -106,15 +105,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
return true;
}

void TimerTaskManager::RenewMinIntervalMs() {
min_interval_ms_ = -1;
for (auto pair : id_to_task_) {
if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) {
min_interval_ms_ = pair.second.interval_ms;
}
}
}

TimerTaskThread::~TimerTaskThread() {
if (!timer_task_manager_.Empty()) {
LOG(INFO) << "TimerTaskThread exit !!!";
Expand Down
14 changes: 8 additions & 6 deletions src/net/src/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,26 @@ class TimerTaskManager {
~TimerTaskManager() = default;

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
//return the newest min_minterval_ms
//return the time gap between now and next task-expired time, which can be used as the timeout value of epoll
int ExecTimerTask();
bool DelTimerTaskByTaskId(uint32_t task_id);
int GetMinIntervalMs() const { return min_interval_ms_; }
int64_t NowInMs();
void RenewMinIntervalMs();
bool Empty(){ return 0 == last_task_id_; }
bool Empty() const { return 0 == last_task_id_; }

private:
//items stored in std::set are ascending ordered, we regard it as an auto sorted queue
std::set<ExecTsWithId> exec_queue_;
std::unordered_map<uint32_t, TimedTask> id_to_task_;
uint32_t last_task_id_{0};
int min_interval_ms_{-1};
};



/*
* For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started,
* but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect
* the timer_task_manager_ and also a pipe to wake up the maybe being endless-wait epoll(if all task consumed, epoll will sink into
* endless wait) to implement the feature.
*/
class TimerTaskThread : public Thread {
public:
TimerTaskThread(){
Expand Down
Loading