Skip to content

Commit

Permalink
Merge pull request ipkn#289 from belugum/master
Browse files Browse the repository at this point in the history
Add basic load balancing
  • Loading branch information
The-EDev authored Dec 7, 2021
2 parents 120e1f3 + a997c2b commit 5037891
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
11 changes: 8 additions & 3 deletions include/crow/http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,17 @@ namespace crow
std::tuple<Middlewares...>* middlewares,
std::function<std::string()>& get_cached_date_str_f,
detail::task_timer& task_timer,
typename Adaptor::context* adaptor_ctx_):
typename Adaptor::context* adaptor_ctx_,
std::atomic<unsigned int>& queue_length):
adaptor_(io_service, adaptor_ctx_),
handler_(handler),
parser_(this),
server_name_(server_name),
middlewares_(middlewares),
get_cached_date_str(get_cached_date_str_f),
task_timer_(task_timer),
res_stream_threshold_(handler->stream_threshold())
res_stream_threshold_(handler->stream_threshold()),
queue_length_(queue_length)
{
#ifdef CROW_ENABLE_DEBUG
connectionCount++;
Expand Down Expand Up @@ -698,7 +700,8 @@ namespace crow
CROW_LOG_DEBUG << this << " is_reading " << is_reading << " is_writing " << is_writing;
if (!is_reading && !is_writing)
{
CROW_LOG_DEBUG << this << " delete (idle) ";
queue_length_--;
CROW_LOG_DEBUG << this << " delete (idle) (queue length: " << queue_length_ << ')';
delete this;
}
}
Expand Down Expand Up @@ -758,6 +761,8 @@ namespace crow
detail::task_timer& task_timer_;

size_t res_stream_threshold_;

std::atomic<unsigned int>& queue_length_;
};

} // namespace crow
34 changes: 24 additions & 10 deletions include/crow/http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace crow
server_name_(server_name),
port_(port),
bindaddr_(bindaddr),
task_queue_length_pool_(concurrency_),
middlewares_(middlewares),
adaptor_ctx_(adaptor_ctx)
{}
Expand Down Expand Up @@ -101,6 +102,7 @@ namespace crow
detail::task_timer task_timer(*io_service_pool_[i]);
task_timer.set_default_timeout(timeout_);
task_timer_pool_[i] = &task_timer;
task_queue_length_pool_[i] = 0;

init_count++;
while (1)
Expand Down Expand Up @@ -175,24 +177,34 @@ namespace crow
}

private:
asio::io_service& pick_io_service()
uint16_t pick_io_service_idx()
{
// TODO load balancing
roundrobin_index_++;
if (roundrobin_index_ >= io_service_pool_.size())
roundrobin_index_ = 0;
return *io_service_pool_[roundrobin_index_];
uint16_t min_queue_idx = 0;

// TODO improve load balancing
for (uint16_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
// No need to check other io_services if the current one has no tasks
{
if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
min_queue_idx = i;
}
return min_queue_idx;
}

void do_accept()
{
asio::io_service& is = pick_io_service();
uint16_t service_idx = pick_io_service_idx();
asio::io_service& is = *io_service_pool_[service_idx];
task_queue_length_pool_[service_idx]++;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];

auto p = new Connection<Adaptor, Handler, Middlewares...>(
is, handler_, server_name_, middlewares_,
get_cached_date_str_pool_[roundrobin_index_], *task_timer_pool_[roundrobin_index_], adaptor_ctx_);
get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);

acceptor_.async_accept(
p->socket(),
[this, p, &is](boost::system::error_code ec) {
[this, p, &is, service_idx](boost::system::error_code ec) {
if (!ec)
{
is.post(
Expand All @@ -202,6 +214,8 @@ namespace crow
}
else
{
task_queue_length_pool_[service_idx]--;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
delete p;
}
do_accept();
Expand All @@ -223,7 +237,7 @@ namespace crow
std::string server_name_;
uint16_t port_;
std::string bindaddr_;
unsigned int roundrobin_index_{};
std::vector<std::atomic<unsigned int>> task_queue_length_pool_;

std::chrono::milliseconds tick_interval_;
std::function<void()> tick_function_;
Expand Down

0 comments on commit 5037891

Please sign in to comment.