From cc5dd0a9c96838d82dfedc387f1b41d4c220b0b6 Mon Sep 17 00:00:00 2001 From: Amber Ehrlich Date: Fri, 29 Mar 2024 16:40:34 -0400 Subject: [PATCH] fix: fix memory leak on libstdc++ ditched the std::multimap in queues.cpp for a sorted vector, and established clearer ownership of the request data yes the requests are still kept alive for a minute --- include/dpp/queues.h | 61 ++++++++++-- src/dpp/cluster.cpp | 6 +- src/dpp/queues.cpp | 218 ++++++++++++++++++++++++++----------------- 3 files changed, 188 insertions(+), 97 deletions(-) diff --git a/include/dpp/queues.h b/include/dpp/queues.h index b3e9b8ec97..3097d5bcb6 100644 --- a/include/dpp/queues.h +++ b/include/dpp/queues.h @@ -434,9 +434,9 @@ class DPP_EXPORT in_thread { std::map buckets; /** - * @brief Queue of requests to be made. + * @brief Queue of requests to be made. Sorted by http_request::endpoint. */ - std::map> requests_in; + std::vector> requests_in; /** * @brief Inbound queue thread loop. @@ -465,7 +465,7 @@ class DPP_EXPORT in_thread { * @param req http_request to post. The pointer will be freed when it has * been executed. */ - void post_request(http_request* req); + void post_request(std::unique_ptr req); }; /** @@ -516,10 +516,25 @@ class DPP_EXPORT request_queue { */ std::condition_variable out_ready; + /** + * @brief A completed request. Contains both the request and the response + */ + struct completed_request { + /** + * @brief Request sent + */ + std::unique_ptr request; + + /** + * @brief Response to the request + */ + std::unique_ptr response; + }; + /** * @brief Completed requests queue */ - std::queue> responses_out; + std::queue responses_out; /** * @brief A vector of inbound request threads forming a pool. @@ -533,9 +548,38 @@ class DPP_EXPORT request_queue { std::vector requests_in; /** - * @brief Completed requests to delete + * @brief A request queued for deletion in the queue. + */ + struct queued_deleting_request { + /** + * @brief Time to delete the request + */ + time_t time_to_delete; + + /** + * @brief The request to delete + */ + completed_request request; + + /** + * @brief Comparator for sorting purposes + * @param other Other queued request to compare the deletion time with + * @return bool Whether this request comes before another in strict ordering + */ + bool operator<(const queued_deleting_request& other) const noexcept; + + /** + * @brief Comparator for sorting purposes + * @param time Time to compare with + * @return bool Whether this request's deletion time is lower than the time given, for strict ordering + */ + bool operator<(time_t time) const noexcept; + }; + + /** + * @brief Completed requests to delete. Sorted by deletion time */ - std::multimap> responses_to_delete; + std::vector responses_to_delete; /** * @brief Set to true if the threads should terminate @@ -597,14 +641,13 @@ class DPP_EXPORT request_queue { ~request_queue(); /** - * @brief Put a http_request into the request queue. You should ALWAYS "new" an object - * to pass to here -- don't submit an object that's on the stack! + * @brief Put a http_request into the request queue. * @note Will use a simple hash function to determine which of the 'in queues' to place * this request onto. * @param req request to add * @return reference to self */ - request_queue& post_request(http_request *req); + request_queue& post_request(std::unique_ptr req); /** * @brief Returns true if the bot is currently globally rate limited diff --git a/src/dpp/cluster.cpp b/src/dpp/cluster.cpp index e8a13a9bab..ffc08dccdb 100644 --- a/src/dpp/cluster.cpp +++ b/src/dpp/cluster.cpp @@ -323,7 +323,7 @@ json error_response(const std::string& message, http_request_completion_t& rv) void cluster::post_rest(const std::string &endpoint, const std::string &major_parameters, const std::string ¶meters, http_method method, const std::string &postdata, json_encode_t callback, const std::string &filename, const std::string &filecontent, const std::string &filemimetype, const std::string &protocol) { /* NOTE: This is not a memory leak! The request_queue will free the http_request once it reaches the end of its lifecycle */ - rest->post_request(new http_request(endpoint + (!major_parameters.empty() ? "/" : "") + major_parameters, parameters, [endpoint, callback](http_request_completion_t rv) { + rest->post_request(std::make_unique(endpoint + (!major_parameters.empty() ? "/" : "") + major_parameters, parameters, [endpoint, callback](http_request_completion_t rv) { json j; if (rv.error == h_success && !rv.body.empty()) { try { @@ -351,7 +351,7 @@ void cluster::post_rest_multipart(const std::string &endpoint, const std::string } /* NOTE: This is not a memory leak! The request_queue will free the http_request once it reaches the end of its lifecycle */ - rest->post_request(new http_request(endpoint + (!major_parameters.empty() ? "/" : "") + major_parameters, parameters, [endpoint, callback](http_request_completion_t rv) { + rest->post_request(std::make_unique(endpoint + (!major_parameters.empty() ? "/" : "") + major_parameters, parameters, [endpoint, callback](http_request_completion_t rv) { json j; if (rv.error == h_success && !rv.body.empty()) { try { @@ -370,7 +370,7 @@ void cluster::post_rest_multipart(const std::string &endpoint, const std::string void cluster::request(const std::string &url, http_method method, http_completion_event callback, const std::string &postdata, const std::string &mimetype, const std::multimap &headers, const std::string &protocol) { /* NOTE: This is not a memory leak! The request_queue will free the http_request once it reaches the end of its lifecycle */ - raw_rest->post_request(new http_request(url, callback, method, postdata, mimetype, headers, protocol)); + raw_rest->post_request(std::make_unique(url, callback, method, postdata, mimetype, headers, protocol)); } gateway::gateway() : shards(0), session_start_total(0), session_start_remaining(0), session_start_reset_after(0), session_start_max_concurrency(0) { diff --git a/src/dpp/queues.cpp b/src/dpp/queues.cpp index 0d28de6b88..3625fadd2b 100644 --- a/src/dpp/queues.cpp +++ b/src/dpp/queues.cpp @@ -236,9 +236,46 @@ request_queue::~request_queue() out_ready.notify_one(); out_thread->join(); delete out_thread; - for (auto& ri : requests_in) { - delete ri; - } +} + +namespace +{ + +/** + * @brief Comparator for sorting a request container + */ +struct compare_request { + /** + * @brief Less_than comparator for sorting + * @param lhs Left-hand side + * @param rhs Right-hand side + * @return Whether lhs comes before rhs in strict ordering + */ + bool operator()(const std::unique_ptr& lhs, const std::unique_ptr& rhs) const noexcept { + return std::less{}(lhs->endpoint, rhs->endpoint); + }; + + /** + * @brief Less_than comparator for sorting + * @param lhs Left-hand side + * @param rhs Right-hand side + * @return Whether lhs comes before rhs in strict ordering + */ + bool operator()(const std::unique_ptr& lhs, std::string_view rhs) const noexcept { + return std::less{}(lhs->endpoint, rhs); + }; + + /** + * @brief Less_than comparator for sorting + * @param lhs Left-hand side + * @param rhs Right-hand side + * @return Whether lhs comes before rhs in strict ordering + */ + bool operator()(std::string_view lhs, const std::unique_ptr& rhs) const noexcept { + return std::less{}(lhs, rhs->endpoint); + }; +}; + } void in_thread::in_loop(uint32_t index) @@ -246,93 +283,92 @@ void in_thread::in_loop(uint32_t index) utility::set_thread_name(std::string("http_req/") + std::to_string(index)); while (!terminating) { std::mutex mtx; - std::unique_lock lock{ mtx }; + std::unique_lock lock{ mtx }; in_ready.wait_for(lock, std::chrono::seconds(1)); /* New request to be sent! */ if (!requests->globally_ratelimited) { - std::map> requests_in_copy; + std::vector requests_view; { - /* Make a safe copy within a mutex */ + /* Gather all the requests first within a mutex */ std::shared_lock lock(in_mutex); if (requests_in.empty()) { /* Nothing to copy, wait again */ continue; } - requests_in_copy = requests_in; + requests_view.reserve(requests_in.size()); + std::transform(requests_in.begin(), requests_in.end(), std::back_inserter(requests_view), [](const std::unique_ptr &r) { + return r.get(); + }); } - for (auto & bucket : requests_in_copy) { - for (auto req : bucket.second) { - - http_request_completion_t rv; - auto currbucket = buckets.find(bucket.first); - - if (currbucket != buckets.end()) { - /* There's a bucket for this request. Check its status. If the bucket says to wait, - * skip all requests in this bucket till its ok. - */ - if (currbucket->second.remaining < 1) { - uint64_t wait = (currbucket->second.retry_after ? currbucket->second.retry_after : currbucket->second.reset_after); - if ((uint64_t)time(nullptr) > currbucket->second.timestamp + wait) { - /* Time has passed, we can process this bucket again. send its request. */ - rv = req->run(creator); - } else { - if (!req->waiting) { - req->waiting = true; - } - /* Time not up yet, wait more */ - break; - } + for (auto& request_view : requests_view) { + const std::string &key = request_view->endpoint; + http_request_completion_t rv; + auto currbucket = buckets.find(key); + + if (currbucket != buckets.end()) { + /* There's a bucket for this request. Check its status. If the bucket says to wait, + * skip all requests in this bucket till its ok. + */ + if (currbucket->second.remaining < 1) { + uint64_t wait = (currbucket->second.retry_after ? currbucket->second.retry_after : currbucket->second.reset_after); + if ((uint64_t)time(nullptr) > currbucket->second.timestamp + wait) { + /* Time has passed, we can process this bucket again. send its request. */ + rv = request_view->run(creator); } else { - /* There's limit remaining, we can just run the request */ - rv = req->run(creator); + if (!request_view->waiting) { + request_view->waiting = true; + } + /* Time not up yet, wait more */ + break; } } else { - /* No bucket for this endpoint yet. Just send it, and make one from its reply */ - rv = req->run(creator); + /* There's limit remaining, we can just run the request */ + rv = request_view->run(creator); } - - bucket_t newbucket; - newbucket.limit = rv.ratelimit_limit; - newbucket.remaining = rv.ratelimit_remaining; - newbucket.reset_after = rv.ratelimit_reset_after; - newbucket.retry_after = rv.ratelimit_retry_after; - newbucket.timestamp = time(nullptr); - requests->globally_ratelimited = rv.ratelimit_global; - if (requests->globally_ratelimited) { - requests->globally_limited_for = (newbucket.retry_after ? newbucket.retry_after : newbucket.reset_after); - } - buckets[req->endpoint] = newbucket; - - /* Make a new entry in the completion list and notify */ - http_request_completion_t* hrc = new http_request_completion_t(); - *hrc = rv; - { - std::unique_lock lock(requests->out_mutex); - requests->responses_out.push(std::make_pair(hrc, req)); - } - requests->out_ready.notify_one(); + } else { + /* No bucket for this endpoint yet. Just send it, and make one from its reply */ + rv = request_view->run(creator); } - } - { - std::unique_lock lock(in_mutex); - bool again = false; - do { - again = false; - for (auto & bucket : requests_in) { - for (auto req = bucket.second.begin(); req != bucket.second.end(); ++req) { - if ((*req)->is_completed()) { - requests_in[bucket.first].erase(req); - again = true; - goto out; /* Only clean way out of a nested loop */ - } + bucket_t newbucket; + newbucket.limit = rv.ratelimit_limit; + newbucket.remaining = rv.ratelimit_remaining; + newbucket.reset_after = rv.ratelimit_reset_after; + newbucket.retry_after = rv.ratelimit_retry_after; + newbucket.timestamp = time(nullptr); + requests->globally_ratelimited = rv.ratelimit_global; + if (requests->globally_ratelimited) { + requests->globally_limited_for = (newbucket.retry_after ? newbucket.retry_after : newbucket.reset_after); + } + buckets[request_view->endpoint] = newbucket; + + /* Remove the request from the incoming requests to transfer it to completed requests */ + std::unique_ptr request; + { + /* Find the owned pointer in requests_in */ + std::scoped_lock lock1{in_mutex}; + + auto [begin, end] = std::equal_range(requests_in.begin(), requests_in.end(), key, compare_request{}); + for (auto it = begin; it != end; ++it) { + if (it->get() == request_view) { + /* Grab and remove */ + request = std::move(*it); + requests_in.erase(it); + break; } } - out:; - } while (again); + } + /* Make a new entry in the completion list and notify */ + auto hrc = std::make_unique(); + *hrc = rv; + { + std::scoped_lock lock1(requests->out_mutex); + requests->responses_out.push({std::move(request), std::move(hrc)}); + } + requests->out_ready.notify_one(); } } else { @@ -346,47 +382,59 @@ void in_thread::in_loop(uint32_t index) } } +bool request_queue::queued_deleting_request::operator<(const queued_deleting_request& other) const noexcept { + return time_to_delete < other.time_to_delete; +} + +bool request_queue::queued_deleting_request::operator<(time_t time) const noexcept { + return time_to_delete < time; +} + + void request_queue::out_loop() { utility::set_thread_name("req_callback"); while (!terminating) { std::mutex mtx; - std::unique_lock lock{ mtx }; + std::unique_lock lock{ mtx }; out_ready.wait_for(lock, std::chrono::seconds(1)); time_t now = time(nullptr); /* A request has been completed! */ - std::pair queue_head = {}; + completed_request queue_head = {}; { - std::unique_lock lock(out_mutex); + std::scoped_lock lock1(out_mutex); if (responses_out.size()) { - queue_head = responses_out.front(); + queue_head = std::move(responses_out.front()); responses_out.pop(); } } - if (queue_head.first && queue_head.second) { - queue_head.second->complete(*queue_head.first); + if (queue_head.request && queue_head.response) { + queue_head.request->complete(*queue_head.response); /* Queue deletions for 60 seconds from now */ - responses_to_delete.insert(std::make_pair(now + 60, queue_head)); + auto when = now + 60; + auto where = std::lower_bound(responses_to_delete.begin(), responses_to_delete.end(), when); + responses_to_delete.insert(where, {when, std::move(queue_head)}); } /* Check for deletable items every second regardless of select status */ - while (responses_to_delete.size() && now >= responses_to_delete.begin()->first) { - delete responses_to_delete.begin()->second.first; - delete responses_to_delete.begin()->second.second; - responses_to_delete.erase(responses_to_delete.begin()); + auto end = std::lower_bound(responses_to_delete.begin(), responses_to_delete.end(), now); + if (end != responses_to_delete.begin()) { + responses_to_delete.erase(responses_to_delete.begin(), end); } } } /* Post a http_request into the queue */ -void in_thread::post_request(http_request* req) +void in_thread::post_request(std::unique_ptr req) { { - std::unique_lock lock(in_mutex); - requests_in[req->endpoint].push_back(req); + std::scoped_lock lock(in_mutex); + + auto where = std::lower_bound(requests_in.begin(), requests_in.end(), req->endpoint, compare_request{}); + requests_in.emplace(where, std::move(req)); } in_ready.notify_one(); } @@ -410,9 +458,9 @@ inline uint32_t hash(const char *s) } /* Post a http_request into a request queue */ -request_queue& request_queue::post_request(http_request* req) +request_queue& request_queue::post_request(std::unique_ptr req) { - requests_in[hash(req->endpoint.c_str()) % in_thread_pool_size]->post_request(req); + requests_in[hash(req->endpoint.c_str()) % in_thread_pool_size]->post_request(std::move(req)); return *this; }