Skip to content

Commit

Permalink
Switch to thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
OFFTKP committed Aug 17, 2024
1 parent 5c21bcc commit 5987e95
Showing 1 changed file with 134 additions and 71 deletions.
205 changes: 134 additions & 71 deletions src/https.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#include "https.hpp"

#ifndef EMSCRIPTEN
#include <atomic>
#include <curl/curl.h>
#include <condition_variable>
#include <openssl/err.h>
#include <openssl/ssl.h>
#include <thread>
#include <queue>
extern "C" {
#include "res.h"
}
Expand All @@ -13,6 +16,64 @@ extern "C" {
#endif

#ifndef EMSCRIPTEN
struct job {
http_request_e type;
std::string url;
std::string body;
std::vector<std::pair<std::string, std::string>> headers;
std::function<void(const std::vector<uint8_t>&)> callback;
};

struct thread_pool {
thread_pool(uint8_t n) {
for (uint8_t i = 0; i < n; i++) {
threads.push_back(std::thread(&thread_pool::main_loop, this));
}
}

~thread_pool() {
terminate_all = true;
jobs_cv.notify_all();
for (auto& t : threads) {
t.join();
}
}

void push_job(const job& j) {
{
std::unique_lock<std::mutex> lock(jobs_mutex);
jobs.push(j);
}
jobs_cv.notify_one();
}

private:
void main_loop() {
while (!terminate_all) {
job j;
{
std::unique_lock<std::mutex> lock(jobs_mutex);
jobs_cv.wait(lock, [this] { return !jobs.empty() || terminate_all; });
if (terminate_all) {
return;
}
j = jobs.front();
jobs.pop();
}
handle_job(j);
}
}

void handle_job(job& j);

std::atomic_bool terminate_all = { false };

std::vector<std::thread> threads;
std::mutex jobs_mutex;
std::condition_variable jobs_cv;
std::queue<job> jobs;
};

size_t curl_write_data(void* buffer, size_t size, size_t nmemb, void* d)
{
std::vector<uint8_t>* data = (std::vector<uint8_t>*)d;
Expand Down Expand Up @@ -126,88 +187,90 @@ CURLcode sslctx_function(CURL *curl, void *sslctx, void *parm)
rv = CURLE_OK;
return rv;
}
#endif

// Abstraction layer for http requests
void https_request(http_request_e type, const std::string& url, const std::string& body,
const std::vector<std::pair<std::string, std::string>>& headers,
std::function<void(const std::vector<uint8_t>&)> callback)
void thread_pool::handle_job(job& j)
{
#ifndef EMSCRIPTEN
std::thread request_thread([=] {
CURL* curl = curl_easy_init();
if (!curl)
{
printf("[cloud] failed to initialize curl\n");
return;
}
CURL* curl = curl_easy_init();
if (!curl)
{
printf("[cloud] failed to initialize curl\n");
return;
}

CURLcode res;
CURLcode res;
#define se_validate() if (res != CURLE_OK) { printf("curl failed, line: %d, error: %s\n", __LINE__, curl_easy_strerror(res)); return; }

std::vector<uint8_t> result;
res = curl_easy_setopt(curl, CURLOPT_FAILONERROR, 0L); se_validate();
res = curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); se_validate();
res = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_data); se_validate();
res = curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void*)&result); se_validate();
std::vector<uint8_t> result;
res = curl_easy_setopt(curl, CURLOPT_FAILONERROR, 0L); se_validate();
res = curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); se_validate();
res = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_data); se_validate();
res = curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void*)&result); se_validate();

/* Turn off the default CA locations, otherwise libcurl will load CA
* certificates from the locations that were detected/specified at
* build-time
*/
res = curl_easy_setopt(curl, CURLOPT_CAINFO, NULL); se_validate();
res = curl_easy_setopt(curl, CURLOPT_CAPATH, NULL); se_validate();
/* Turn off the default CA locations, otherwise libcurl will load CA
* certificates from the locations that were detected/specified at
* build-time
*/
res = curl_easy_setopt(curl, CURLOPT_CAINFO, NULL); se_validate();
res = curl_easy_setopt(curl, CURLOPT_CAPATH, NULL); se_validate();

res = curl_easy_setopt(curl, CURLOPT_SSL_CTX_FUNCTION, sslctx_function); se_validate();
res = curl_easy_setopt(curl, CURLOPT_TIMEOUT, 5); se_validate(); // 5 second timeout
res = curl_easy_setopt(curl, CURLOPT_SSL_CTX_FUNCTION, sslctx_function); se_validate();
res = curl_easy_setopt(curl, CURLOPT_TIMEOUT, 5); se_validate(); // 5 second timeout

switch (type)
{
case http_request_e::GET:
res = curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L); se_validate();
break;
case http_request_e::POST:
res = curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POST, 1L); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, body.size()); se_validate();
break;
case http_request_e::PATCH:
res = curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PATCH"); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, body.size()); se_validate();
break;
default:
printf("[cloud] invalid request type\n");
return;
}
switch (j.type)
{
case http_request_e::GET:
res = curl_easy_setopt(curl, CURLOPT_URL, j.url.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L); se_validate();
break;
case http_request_e::POST:
res = curl_easy_setopt(curl, CURLOPT_URL, j.url.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POST, 1L); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, j.body.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, j.body.size()); se_validate();
break;
case http_request_e::PATCH:
res = curl_easy_setopt(curl, CURLOPT_URL, j.url.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PATCH"); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, j.body.c_str()); se_validate();
res = curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, j.body.size()); se_validate();
break;
default:
printf("[cloud] invalid request type\n");
return;
}

struct curl_slist* chunk = NULL;
for (auto& header : headers)
{
std::string header_string = header.first + ": " + header.second;
chunk = curl_slist_append(chunk, header_string.c_str());
}
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
struct curl_slist* chunk = NULL;
for (auto& header : j.headers)
{
std::string header_string = header.first + ": " + header.second;
chunk = curl_slist_append(chunk, header_string.c_str());
}
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);

res = curl_easy_perform(curl);
if (res != CURLE_OK)
{
printf("[cloud] curl failed: %s\n", curl_easy_strerror(res));
callback({});
}
else
{
callback(result);
}
res = curl_easy_perform(curl);
if (res != CURLE_OK)
{
printf("[cloud] curl failed: %s\n", curl_easy_strerror(res));
j.callback({});
}
else
{
j.callback(result);
}

curl_slist_free_all(chunk);
curl_easy_cleanup(curl);
#undef se_validate
});
request_thread.detach();
curl_slist_free_all(chunk);
curl_easy_cleanup(curl);
}
#endif

// Abstraction layer for http requests
void https_request(http_request_e type, const std::string& url, const std::string& body,
const std::vector<std::pair<std::string, std::string>>& headers,
std::function<void(const std::vector<uint8_t>&)> callback)
{
#ifndef EMSCRIPTEN
static thread_pool pool(32);
pool.push_job({type, url, body, headers, callback});
#else
std::string method;
switch (type)
Expand Down

0 comments on commit 5987e95

Please sign in to comment.