diff --git a/CMakeLists.txt b/CMakeLists.txt index abdd659..c900138 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,6 +7,7 @@ include(cmake/conan.cmake) conan( PACKAGES + zlib/1.2.11 toml11/3.4.0 libevent/2.1.11 tbb/2020.1 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 37908a8..fc05b08 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,30 +1,13 @@ +add_subdirectory(compresser) add_subdirectory(request) -add_subdirectory(evclient) +#add_subdirectory(evclient) add_subdirectory(types) add_subdirectory(storage) add_subdirectory(scheduler) add_subdirectory(graph) -add_executable(client) add_executable(replica) -target_sources( - client - PRIVATE - client.cpp -) - -target_link_libraries( - client - PRIVATE - CONAN_PKG::tbb - CONAN_PKG::toml11 - request - evclient - evpaxos - types -) - target_sources( replica PRIVATE @@ -36,8 +19,6 @@ target_link_libraries( PRIVATE CONAN_PKG::toml11 request - evclient - evpaxos types scheduler ) diff --git a/src/client.cpp b/src/client.cpp deleted file mode 100644 index 98453c3..0000000 --- a/src/client.cpp +++ /dev/null @@ -1,234 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "evclient/evclient.h" -#include "request/request_generation.h" - - -const int OUTSTANDING = 1; -const int VALUE_SIZE = 128; - -using toml_config = toml::basic_value< - toml::discard_comments, std::unordered_map, std::vector ->; - -struct dispatch_requests_args { - client* c; - std::vector* requests; - int request_id, sleep_time, n_listener_threads; -}; - - -static void -send_request(evutil_socket_t fd, short event, void *arg) -{ - auto* dispatch_args = (dispatch_requests_args *)arg; - auto* requests = (std::vector*) dispatch_args->requests; - auto* c = dispatch_args->c; - auto requests_id = dispatch_args->request_id; - auto sleep_time = dispatch_args->sleep_time; - auto n_listener_threads = dispatch_args->n_listener_threads; - - auto* client_args = (struct client_args *) c->args; - auto* v = (struct client_message*)c->send_buffer; - - auto request = requests->at(requests_id); - v->sin_port = htons( - client_args->reply_port + (requests_id % n_listener_threads) - ); - v->id = requests_id; - v->type = request.type(); - v->key = request.key(); - if (request.args().empty()) { - memset(v->args, '#', VALUE_SIZE); - v->args[VALUE_SIZE] = '\0'; - v->size = VALUE_SIZE; - } - else { - for (auto i = 0; i < request.args().size(); i++) { - v->args[i] = request.args()[i]; - } - v->args[request.args().size()] = 0; - v->size = request.args().size(); - } - - auto size = sizeof(struct client_message) + v->size; - auto timestamp = std::chrono::system_clock::now(); - auto kv = std::make_pair(v->id, timestamp); - client_args->sent_timestamp->insert(kv); - paxos_submit(c->bev, c->send_buffer, size); - requests_id++; - - if (requests_id != requests->size()) { - dispatch_args->request_id++; - long delay = sleep_time / n_listener_threads; - auto time = (struct timeval){0, delay}; - auto send_event = evtimer_new( - c->base, send_request, dispatch_args - ); - event_add(send_event, &time); - } -} - -static void -read_reply(const struct reply_message& reply, void* args) -{ - auto* client_args = (struct client_args *) args; - auto* sent_timestamp = client_args->sent_timestamp; - auto* print_mutex = client_args->print_mutex; - - if (client_args->print_percentage >= rand() % 100 + 1) { - auto now = std::chrono::system_clock::now(); - auto delay_ns = now - sent_timestamp->at(reply.id); - - std::lock_guard lock(*print_mutex); - if (client_args->verbose) { - std::cout << "Request " << reply.id << "; "; - std::cout << "He said " << reply.answer << "; "; - std::cout << "Delay " << delay_ns.count() << ";\n"; - } else { - std::cout << now.time_since_epoch().count() << ","; - std::cout << delay_ns.count() << "\n"; - } - } -} - -static struct client* -make_ev_client(const toml_config& config) -{ - auto paxos_config = toml::find( - config, "paxos_config" - ); - auto proposer_id = toml::find( - config, "proposer_id" - ); - auto* client = make_client( - paxos_config.c_str(), proposer_id, OUTSTANDING, VALUE_SIZE, - nullptr, read_reply - ); - signal(SIGPIPE, SIG_IGN); - return client; -} - -static struct client_args* -make_client_args(const toml_config& config, unsigned short port, bool verbose) -{ - auto* client_args = new struct client_args(); - client_args->verbose = verbose; - client_args->print_percentage = toml::find( - config, "print_percentage" - ); - client_args->reply_port = port; - auto* sent_timestamp = new tbb::concurrent_unordered_map(); - auto* print_mutex = new std::mutex(); - client_args->sent_timestamp = sent_timestamp; - client_args->print_mutex = print_mutex; - return client_args; -} - -static void -free_client_args(struct client_args* client_args) -{ - delete client_args->print_mutex; - delete client_args->sent_timestamp; - delete client_args; -} - -static void -schedule_send_requests_event( - struct client* client, - pthread_barrier_t& start_barrier, - int n_listener_threads, - const toml_config& config) -{ - auto requests_path = toml::find( - config, "requests_path" - ); - auto requests = std::move(workload::import_requests(requests_path, "requests")); - auto* requests_pointer = new std::vector(requests); - auto sleep_time = toml::find( - config, "sleep_time" - ); - - auto* dispatch_args = new struct dispatch_requests_args(); - dispatch_args->c = client; - dispatch_args->requests = requests_pointer; - dispatch_args->request_id = 0; - dispatch_args->sleep_time = sleep_time; - dispatch_args->n_listener_threads = n_listener_threads; - - auto time = (struct timeval){1, 0}; - auto send_event = evtimer_new( - client->base, send_request, dispatch_args - ); - event_add(send_event, &time); -} - -static void -start_client(const toml_config& config, unsigned short port, bool verbose) -{ - auto* client = make_ev_client(config); - client->args = make_client_args(config, port, verbose); - - auto n_listener_threads = toml::find( - config, "n_threads" - ); - - pthread_barrier_t start_barrier; - pthread_barrier_init(&start_barrier, NULL, n_listener_threads+1); - - std::vector listener_threads; - for (auto i = 0; i < n_listener_threads; i++) { - listener_threads.emplace_back( - listen_server, client, port+i, std::ref(start_barrier) - ); - } - pthread_barrier_wait(&start_barrier); - - schedule_send_requests_event( - client, start_barrier, n_listener_threads, config - ); - event_base_loop(client->base, EVLOOP_NO_EXIT_ON_EMPTY); - - free_client_args((struct client_args *)client->args); - client_free(client); -} - -void usage(std::string name) { - std::cout << "Usage: " << name << " port config [-v]\n"; -} - -int main(int argc, char* argv[]) { - if (argc < 3) { - usage(std::string(argv[0])); - exit(1); - } - - auto reply_port = atoi(argv[1]); - const auto config = toml::parse(argv[2]); - bool verbose; - if (argc >= 4 and std::string(argv[3]) == "-v") { - verbose = true; - } else { - verbose = false; - } - srand (time(NULL)); - - start_client(config, reply_port, verbose); - - return 0; -} diff --git a/src/compresser/CMakeLists.txt b/src/compresser/CMakeLists.txt new file mode 100644 index 0000000..0261b35 --- /dev/null +++ b/src/compresser/CMakeLists.txt @@ -0,0 +1,15 @@ +add_library(compresser) + +target_sources( + compresser + PUBLIC + compresser.h + PRIVATE + compresser.cpp +) + +target_link_libraries( + compresser + PUBLIC + CONAN_PKG::zlib +) diff --git a/src/compresser/compresser.cpp b/src/compresser/compresser.cpp new file mode 100644 index 0000000..f95e0b6 --- /dev/null +++ b/src/compresser/compresser.cpp @@ -0,0 +1,93 @@ +// Copyright 2007 Timo Bingmann +// Distributed under the Boost Software License, Version 1.0. +// (See http://www.boost.org/LICENSE_1_0.txt) +// +// Original link http://panthema.net/2007/0328-ZLibString.html + +#include "compresser.h" + + +/** Compress a STL string using zlib with given compression level and return + * the binary data. */ +std::string compress(const std::string& str, + int compressionlevel/* = Z_BEST_COMPRESSION*/) +{ + z_stream zs; // z_stream is zlib's control structure + memset(&zs, 0, sizeof(zs)); + + if (deflateInit(&zs, compressionlevel) != Z_OK) + throw(std::runtime_error("deflateInit failed while compressing.")); + + zs.next_in = (Bytef*)str.data(); + zs.avail_in = str.size(); // set the z_stream's input + + int ret; + char outbuffer[4096]; + std::string outstring; + + // retrieve the compressed bytes blockwise + do { + zs.next_out = reinterpret_cast(outbuffer); + zs.avail_out = sizeof(outbuffer); + + ret = deflate(&zs, Z_FINISH); + + if (outstring.size() < zs.total_out) { + // append the block to the output string + outstring.append(outbuffer, + zs.total_out - outstring.size()); + } + } while (ret == Z_OK); + + deflateEnd(&zs); + + if (ret != Z_STREAM_END) { // an error occurred that was not EOF + std::ostringstream oss; + oss << "Exception during zlib compression: (" << ret << ") " << zs.msg; + throw(std::runtime_error(oss.str())); + } + + return outstring; +} + +/** Decompress an STL string using zlib and return the original data. */ +std::string decompress(const std::string& str) +{ + z_stream zs; // z_stream is zlib's control structure + memset(&zs, 0, sizeof(zs)); + + if (inflateInit(&zs) != Z_OK) + throw(std::runtime_error("inflateInit failed while decompressing.")); + + zs.next_in = (Bytef*)str.data(); + zs.avail_in = str.size(); + + int ret; + char outbuffer[4096]; + std::string outstring; + + // get the decompressed bytes blockwise using repeated calls to inflate + do { + zs.next_out = reinterpret_cast(outbuffer); + zs.avail_out = sizeof(outbuffer); + + ret = inflate(&zs, 0); + + if (outstring.size() < zs.total_out) { + outstring.append(outbuffer, + zs.total_out - outstring.size()); + } + + } while (ret == Z_OK); + + inflateEnd(&zs); + + if (ret != Z_STREAM_END) { // an error occurred that was not EOF + std::ostringstream oss; + oss << "Exception during zlib decompression: (" << ret << ") " + << zs.msg; + throw(std::runtime_error(oss.str())); + } + + return outstring; +} diff --git a/src/compresser/compresser.h b/src/compresser/compresser.h new file mode 100644 index 0000000..3efdc0e --- /dev/null +++ b/src/compresser/compresser.h @@ -0,0 +1,27 @@ +// Copyright 2007 Timo Bingmann +// Distributed under the Boost Software License, Version 1.0. +// (See http://www.boost.org/LICENSE_1_0.txt) +// +// Original link http://panthema.net/2007/0328-ZLibString.html + +#ifndef COMPRESSER_H +#define COMPRESSER_H + +#include +#include +#include +#include +#include +#include + +#include + +/** Compress a STL string using zlib with given compression level and return + * the binary data. */ +std::string compress(const std::string& str, + int compressionlevel = Z_BEST_COMPRESSION); + +/** Decompress an STL string using zlib and return the original data. */ +std::string decompress(const std::string& str); + +#endif diff --git a/src/evclient/CMakeLists.txt b/src/evclient/CMakeLists.txt index 9829519..137473e 100644 --- a/src/evclient/CMakeLists.txt +++ b/src/evclient/CMakeLists.txt @@ -17,6 +17,7 @@ target_include_directories( target_link_libraries( evclient PRIVATE + CONAN_PKG::tbb CONAN_PKG::libevent evpaxos request diff --git a/src/evclient/evclient.cpp b/src/evclient/evclient.cpp index 132a977..6506ecd 100644 --- a/src/evclient/evclient.cpp +++ b/src/evclient/evclient.cpp @@ -42,14 +42,30 @@ connect_to_proposer( return bev; } +static void +read_reply( + const struct reply_message& reply, + tbb::concurrent_vector& latencies, + tbb::concurrent_unordered_map& timestamps) +{ + if (1 >= rand() % 100 + 1) { + auto now = std::chrono::system_clock::now(); + auto latency = now - timestamps.at(reply.id); + latencies.emplace_back(latency); + } +} + void listen_server( - struct client* client, unsigned short port, + tbb::concurrent_vector& latencies, + tbb::concurrent_unordered_map& timestamps, + unsigned short port, pthread_barrier_t& start_barrier ) { auto fd = socket(AF_INET, SOCK_DGRAM, 0); if (fd < 0) { printf("Failed to create."); + fflush(stdout); return; } @@ -61,6 +77,7 @@ listen_server( auto binded = bind(fd, (const struct sockaddr *)&addr, sizeof(addr)); if (binded < 0) { printf("Failed to bind to socket."); + fflush(stdout); return; } @@ -82,7 +99,7 @@ listen_server( continue; } - client->reply_cb(reply, client->args); + read_reply(reply, latencies, timestamps); answered_requests.insert(reply.id); } } diff --git a/src/evclient/evclient.h b/src/evclient/evclient.h index d35fe32..299bb6e 100644 --- a/src/evclient/evclient.h +++ b/src/evclient/evclient.h @@ -7,7 +7,10 @@ #include #include #include +#include #include +#include +#include #include #include #include @@ -25,7 +28,9 @@ struct client* make_client( reply_callback on_reply ); void listen_server( - struct client* c, unsigned short port, + tbb::concurrent_vector& latencies, + tbb::concurrent_unordered_map& timestamps, + unsigned short port, pthread_barrier_t& start_barrier); void client_free(struct client* c); diff --git a/src/graph/CMakeLists.txt b/src/graph/CMakeLists.txt index e23fe5c..1556f60 100644 --- a/src/graph/CMakeLists.txt +++ b/src/graph/CMakeLists.txt @@ -10,10 +10,22 @@ target_sources( partitioning.cpp ) +target_include_directories( + graph + PUBLIC + "${CMAKE_SOURCE_DIR}/src" +) + target_link_libraries( graph PUBLIC - CONAN_PKG::tbb metis kahip + scheduler +) + +target_compile_options( + graph + PRIVATE + -O3 ) diff --git a/src/graph/graph.hpp b/src/graph/graph.hpp index bc96923..90d99d3 100644 --- a/src/graph/graph.hpp +++ b/src/graph/graph.hpp @@ -3,7 +3,7 @@ #include -#include +#include #include @@ -16,7 +16,7 @@ class Graph { void add_vertice(T data, int weight = 0) { vertex_weight_[data] = weight; - edges_weight_[data] = tbb::concurrent_unordered_map(); + edges_weight_[data] = std::unordered_map(); total_vertex_weight_ += weight; } @@ -66,14 +66,14 @@ class Graph { int total_edges_weight() const {return total_edges_weight_;} int vertice_weight(T vertice) const {return vertex_weight_.at(vertice);} int edge_weight(T from, T to) const {return edges_weight_.at(from).at(to);} - const tbb::concurrent_unordered_map& vertice_edges(T vertice) const { + const std::unordered_map& vertice_edges(T vertice) const { return edges_weight_.at(vertice); } - const tbb::concurrent_unordered_map& vertex() const {return vertex_weight_;} + const std::unordered_map& vertex() const {return vertex_weight_;} private: - tbb::concurrent_unordered_map vertex_weight_; - tbb::concurrent_unordered_map> + std::unordered_map vertex_weight_; + std::unordered_map> edges_weight_; int n_edges_{0}; int total_vertex_weight_{0}; diff --git a/src/graph/partitioning.cpp b/src/graph/partitioning.cpp index 7f4aa25..22e31d4 100644 --- a/src/graph/partitioning.cpp +++ b/src/graph/partitioning.cpp @@ -3,25 +3,32 @@ namespace model { +// This is a workaround to reuse the same method for both fennel +// and refennel when calculating the vertex's partition. +struct dummy_partition { + int id_; + int weight_ = 0; -// used during refennel -static std::unordered_map refennel_vertice_to_partition; // maps vertice to - // prev part and weight -static std::vector refennel_partitions_size; -static std::unordered_map old_vertice_weight; + dummy_partition(int id): id_{id}{} + + int id() const {return id_;} + int weight() const {return weight_;} +}; std::vector cut_graph ( - const Graph& graph, int n_paritions, CutMethod method + const Graph& graph, + int n_partitions, + CutMethod method ) { if (method == METIS) { - return multilevel_cut(graph, n_paritions, method); + return multilevel_cut(graph, n_partitions, method); } else if (method == KAHIP) { - return multilevel_cut(graph, n_paritions, method); + return multilevel_cut(graph, n_partitions, method); } else if (method == FENNEL) { - return fennel_cut(graph, n_paritions); + return fennel_cut(graph, n_partitions); } else { - return refennel_cut(graph, n_paritions); + return refennel_cut(graph, n_partitions); } } @@ -84,9 +91,10 @@ std::vector multilevel_cut( return vertex_partitions; } +template std::unordered_map sum_neighbours( - const tbb::concurrent_unordered_map& edges, - const std::unordered_map& vertice_to_partition + const std::unordered_map& edges, + const std::unordered_map& vertice_to_partition ) { std::unordered_map partition_sums; for (auto& kv : edges) { @@ -96,7 +104,7 @@ std::unordered_map sum_neighbours( continue; } - auto partition = vertice_to_partition.at(vertice); + auto partition = vertice_to_partition.at(vertice)->id(); if (partition_sums.find(partition) == partition_sums.end()) { partition_sums[partition] = 0; } @@ -106,21 +114,28 @@ std::unordered_map sum_neighbours( return partition_sums; } +template int fennel_vertice_partition( const Graph& graph, int vertice, - const std::vector& partitions_weight, - const std::unordered_map& vertice_to_partition, + const std::unordered_map& partitions, + const std::unordered_map& vertice_to_partition, + int max_partition_size, + double alpha, double gamma ) { double biggest_score = -DBL_MAX; - auto id = 0; - auto designated_partition = 0; + auto designated_partition = -1; auto neighbours_in_partition = std::move( - sum_neighbours(graph.vertice_edges(vertice), vertice_to_partition) + sum_neighbours(graph.vertice_edges(vertice), vertice_to_partition) ); - for (auto i = 0; i < partitions_weight.size(); i++) { - auto& partition_weight = partitions_weight[i]; + for (auto i = 0; i < partitions.size(); i++) { + auto partition_weight = partitions.at(i)->weight(); + if (max_partition_size) { + if (partition_weight + graph.vertice_weight(vertice) > max_partition_size) { + continue; + } + } int inter_cost; if (neighbours_in_partition.find(i) == neighbours_in_partition.end()) { @@ -129,57 +144,79 @@ int fennel_vertice_partition( inter_cost = neighbours_in_partition[i]; } - auto intra_cost = + double intra_cost = (std::pow(partition_weight + graph.vertice_weight(vertice), gamma)); intra_cost -= std::pow(partition_weight, gamma); - intra_cost *= gamma; + intra_cost *= alpha; - auto score = inter_cost - intra_cost; + double score = inter_cost - intra_cost; if (score > biggest_score) { biggest_score = score; - designated_partition = id; + designated_partition = i; } - id++; } return designated_partition; } -std::vector fennel_cut(const Graph& graph, int n_partitions) { - std::vector partitions_weight; +std::pair, std::unordered_map> + fennel_partitions(const Graph& graph, int n_partitions) { + std::unordered_map partitions; for (auto i = 0; i < n_partitions; i++) { - // vertices there and total weight - partitions_weight.emplace_back(0); + auto* partition = new dummy_partition(i); + partitions.emplace(i, partition); } const auto edges_weight = graph.total_edges_weight(); const auto vertex_weight = graph.total_vertex_weight(); const auto gamma = 3 / 2.0; - const auto alpha = + const double alpha = edges_weight * std::pow(n_partitions, (gamma - 1)) / std::pow(graph.total_vertex_weight(), gamma); - std::unordered_map vertice_to_partition; - std::vector final_partitioning; - auto& vertex = graph.vertex(); + std::unordered_map vertice_to_partition; auto sorted_vertex = std::move(graph.sorted_vertex()); + auto partition_max_size = 1.2 * graph.total_vertex_weight() / n_partitions; for (auto& vertice : sorted_vertex) { - auto partition = fennel_vertice_partition( - graph, vertice, partitions_weight, vertice_to_partition, gamma + auto partition = fennel_vertice_partition( + graph, vertice, partitions, vertice_to_partition, + partition_max_size, alpha, gamma ); - partitions_weight[partition] += graph.vertice_weight(vertice); - vertice_to_partition[vertice] = partition; - final_partitioning.emplace_back(partition); + if (partition == -1) { + partition_max_size = 0; // remove partition limit + partition = fennel_vertice_partition( + graph, vertice, partitions, vertice_to_partition, + partition_max_size, alpha, gamma + ); + } + partitions[partition]->weight_ += graph.vertice_weight(vertice); + vertice_to_partition[vertice] = partitions[partition]; + } + + return std::make_pair(partitions, vertice_to_partition); +} + +std::vector fennel_cut(const Graph& graph, int n_partitions) { + auto partition_pair = fennel_partitions(graph, n_partitions); + auto& partitions = partition_pair.first; + auto& vertice_to_partition = partition_pair.second; + + std::vector final_partitioning; + for (auto vertice = 0; vertice < vertice_to_partition.size(); vertice++) { + final_partitioning.push_back(vertice_to_partition.at(vertice)->id()); + } + + for (auto& kv: partitions) { + delete kv.second; } return final_partitioning; } std::vector refennel_cut(const Graph& graph, int n_partitions) { - if (refennel_partitions_size.empty()) { - for (auto i = 0; i < n_partitions; i++) { - refennel_partitions_size.emplace_back(0); - } - } + auto first_partitions = fennel_partitions(graph, n_partitions); + auto& partitions = first_partitions.first; + auto& old_data_to_partition = first_partitions.second; + const auto edges_weight = graph.total_edges_weight(); const auto vertex_weight = graph.total_vertex_weight(); @@ -188,27 +225,33 @@ std::vector refennel_cut(const Graph& graph, int n_partitions) { edges_weight * std::pow(n_partitions, (gamma - 1)) / std::pow(graph.total_vertex_weight(), gamma); auto final_partitioning = std::vector(); - auto& vertex = graph.vertex(); auto sorted_vertex = std::move(graph.sorted_vertex()); + auto partition_max_size = 1.2 * graph.total_vertex_weight() / n_partitions; for (auto& vertice : sorted_vertex) { - auto new_partition = fennel_vertice_partition( - graph, vertice, refennel_partitions_size, refennel_vertice_to_partition, gamma - ); + auto* old_partition = old_data_to_partition.at(vertice); + old_partition->weight_ -= graph.vertice_weight(vertice); - if (refennel_vertice_to_partition.find(vertice) != refennel_vertice_to_partition.end()) { - auto old_partition_id = refennel_vertice_to_partition[vertice]; - auto old_weight = old_vertice_weight[vertice]; - refennel_partitions_size[old_partition_id] -= old_weight; + auto new_partition = fennel_vertice_partition( + graph, vertice, partitions, old_data_to_partition, + partition_max_size, alpha, gamma + ); + if (new_partition == -1) { + partition_max_size = 0; // remove partition limit + new_partition = fennel_vertice_partition( + graph, vertice, partitions, old_data_to_partition, + partition_max_size, alpha, gamma + ); } - auto weight = graph.vertice_weight(vertice); - refennel_vertice_to_partition[vertice] = new_partition; - refennel_partitions_size[new_partition] += weight; - old_vertice_weight[vertice] = weight; + partitions.at(new_partition)->weight_ += graph.vertice_weight(vertice); final_partitioning.push_back(new_partition); } + for (auto& kv: partitions) { + delete kv.second; + } + return final_partitioning; } diff --git a/src/graph/partitioning.h b/src/graph/partitioning.h index d1fb404..a763851 100644 --- a/src/graph/partitioning.h +++ b/src/graph/partitioning.h @@ -9,27 +9,30 @@ #include #include #include -#include #include #include #include #include #include "graph.hpp" +#include "scheduler/partition.hpp" namespace model { -enum CutMethod {METIS, KAHIP, FENNEL, REFENNEL}; +enum CutMethod {METIS, KAHIP, FENNEL, REFENNEL, ROUND_ROBIN}; const std::unordered_map string_to_cut_method({ {"METIS", METIS}, {"KAHIP", KAHIP}, {"FENNEL", FENNEL}, - {"REFENNEL", REFENNEL} + {"REFENNEL", REFENNEL}, + {"ROUND_ROBIN", ROUND_ROBIN} }); std::vector cut_graph ( - const Graph& graph, int n_paritions, CutMethod method + const Graph& graph, + int n_partitions, + CutMethod method ); std::vector multilevel_cut @@ -37,13 +40,19 @@ std::vector multilevel_cut std::vector fennel_cut(const Graph& graph, int n_partitions); std::vector refennel_cut(const Graph& graph, int n_partitions); + int fennel_inter_cost( - const tbb::concurrent_unordered_map& edges, + const std::unordered_map& edges, const std::unordered_set& vertex_in_partition ); +template int fennel_vertice_partition( - const Graph& graph, int vertice, - const std::vector, int>>& partitions, + const Graph& graph, + int vertice, + const T& partitions, + const std::unordered_map& vertice_to_partition, + int max_partition_size, + double alpha, double gamma ); diff --git a/src/replica.cpp b/src/replica.cpp index 86b3830..8f7b303 100644 --- a/src/replica.cpp +++ b/src/replica.cpp @@ -26,9 +26,6 @@ */ -#include -#include - #include #include #include @@ -39,9 +36,11 @@ #include #include #include -#include #include #include +#include +#include +#include #include #include "request/request_generation.h" @@ -56,70 +55,37 @@ using toml_config = toml::basic_value< static int verbose = 0; static int SLEEP = 1; -static int N_PARTITIONS = 4; -static bool RUNNING; +static bool RUNNING = true; -struct replica_args { - event_base* base; - event* signal; - kvpaxos::Scheduler* scheduler; -}; - -static void -handle_sigint(int sig, short ev, void* arg) -{ - struct event_base* base = static_cast(arg); - printf("Caught signal %d\n", sig); - fflush(stdout); - event_base_loopexit(base, NULL); - RUNNING = false; -} - -static void -deliver(unsigned iid, char* value, size_t size, void* arg) -{ - auto* request = (struct client_message*)value; - auto* args = (struct replica_args*) arg; - auto* scheduler = args->scheduler; - scheduler->schedule_and_answer(*request); -} void -print_throughput(int sleep_duration, kvpaxos::Scheduler* scheduler) +metrics_loop(int sleep_duration, int n_requests, kvpaxos::Scheduler* scheduler) { - auto already_counted = 0; + auto already_counted_throughput = 0; + auto counter = 0; while (RUNNING) { std::this_thread::sleep_for(std::chrono::seconds(sleep_duration)); - auto throughput = scheduler->n_executed_requests() - already_counted; - std::cout << std::chrono::system_clock::now().time_since_epoch().count() << ","; + auto executed_requests = scheduler->n_executed_requests(); + auto throughput = executed_requests - already_counted_throughput; + std::cout << counter << ","; std::cout << throughput << "\n"; - already_counted += throughput; - } -} - -static struct evpaxos_replica* -initialize_evpaxos_replica(int id, const toml_config& config) -{ - deliver_function cb = deliver; - auto* base = event_base_new(); - auto paxos_config = toml::find(config, "paxos_config"); - auto* replica = evpaxos_replica_init(id, paxos_config.c_str(), cb, NULL, base); - - auto* sig = evsignal_new(base, SIGINT, handle_sigint, base); - evsignal_add(sig, NULL); - signal(SIGPIPE, SIG_IGN); + already_counted_throughput += throughput; + counter++; - auto* args = new replica_args(); - args->base = base; - args->signal = sig; - replica->arg = args; - - return replica; + if (executed_requests == n_requests) { + break; + } + } } static kvpaxos::Scheduler* -initialize_scheduler(const toml_config& config) +initialize_scheduler( + int n_requests, + const toml_config& config) { + auto n_partitions = toml::find( + config, "n_partitions" + ); auto repartition_method_s = toml::find( config, "repartition_method" ); @@ -130,78 +96,121 @@ initialize_scheduler(const toml_config& config) config, "repartition_interval" ); auto* scheduler = new kvpaxos::Scheduler( - repartition_interval, N_PARTITIONS, repartition_method + n_requests, repartition_interval, n_partitions, + repartition_method ); - auto initial_requests = toml::find( - config, "requests_path" + auto n_initial_keys = toml::find( + config, "n_initial_keys" ); - if (not initial_requests.empty()) { - auto populate_requests = std::move( - workload::import_requests(initial_requests, "load_requests") - ); - scheduler->process_populate_requests(populate_requests); + std::vector populate_requests; + for (auto i = 0; i <= n_initial_keys; i++) { + populate_requests.emplace_back(WRITE, i, ""); } + scheduler->process_populate_requests(populate_requests); + + scheduler->run(); return scheduler; } -static void -free_replica(struct evpaxos_replica* replica) +std::vector +to_client_messages( + std::vector& requests) { - auto* args = (struct replica_args*) replica->arg; - event_free(args->signal); - event_base_free(args->base); - delete args->scheduler; - free(args); - evpaxos_replica_free(replica); + std::vector client_messages; + auto counter = 0; + for (auto i = 0; i < requests.size(); i++) { + auto& request = requests[i]; + struct client_message client_message; + client_message.sin_port = htons(0); + client_message.id = i; + client_message.type = request.type(); + client_message.key = request.key(); + for (auto i = 0; i < request.args().size(); i++) { + client_message.args[i] = request.args()[i]; + } + client_message.args[request.args().size()] = 0; + client_message.size = request.args().size(); + + client_messages.emplace_back(client_message); + } + + return client_messages; } -static void -start_replica(int id, const toml_config& config) +void +execute_requests( + kvpaxos::Scheduler& scheduler, + std::vector& requests, + int print_percentage) { - struct evpaxos_replica* replica; - RUNNING = true; + for (auto& request: requests) { + scheduler.schedule_and_answer(request); + } +} - replica = initialize_evpaxos_replica(id, config); - if (replica == nullptr) { - printf("Could not start the replica!\n"); - exit(1); +std::unordered_map +join_maps(std::vector> maps) { + std::unordered_map joined_map; + for (auto& map: maps) { + joined_map.insert(map.begin(), map.end()); } + return joined_map; +} - auto* scheduler = std::move(initialize_scheduler(config)); - scheduler->run(); - auto* args = (struct replica_args*) replica->arg; - args->scheduler = scheduler; +static void +run(const toml_config& config) +{ + auto requests_path = toml::find( + config, "requests_path" + ); + auto requests = std::move(workload::import_cs_requests(requests_path)); + auto* scheduler = initialize_scheduler(requests.size(), config); - std::thread throughput_thread( - print_throughput, SLEEP, scheduler + auto throughput_thread = std::thread( + metrics_loop, SLEEP, requests.size(), scheduler ); - event_base_dispatch(args->base); + auto print_percentage = toml::find( + config, "print_percentage" + ); + auto client_messages = to_client_messages(requests); + + auto start_execution_timestamp = std::chrono::system_clock::now(); + execute_requests(*scheduler, client_messages, print_percentage); throughput_thread.join(); - free_replica(replica); + auto end_execution_timestamp = std::chrono::system_clock::now(); + + auto makespan = end_execution_timestamp - start_execution_timestamp; + std::cout << "Makespan: " << makespan.count() << "\n"; + + auto& repartition_times = scheduler->repartition_timestamps(); + std::cout << "Repartition at: "; + for (auto& repartition_time : repartition_times) { + std::cout << (repartition_time - start_execution_timestamp).count() << " "; + } + std::cout << std::endl; } static void usage(std::string prog) { - std::cout << "Usage: " << prog << " id config\n"; + std::cout << "Usage: " << prog << " config\n"; } int main(int argc, char const *argv[]) { - if (argc < 3) { + if (argc < 2) { usage(std::string(argv[0])); exit(1); } - auto id = atoi(argv[1]); - const auto config = toml::parse(argv[2]); + const auto config = toml::parse(argv[1]); - start_replica(id, config); + run(config); return 0; } diff --git a/src/request/request_generation.cpp b/src/request/request_generation.cpp index 326f27f..d308412 100644 --- a/src/request/request_generation.cpp +++ b/src/request/request_generation.cpp @@ -3,6 +3,52 @@ namespace workload { +Request make_request(char* type_buffer, char* key_buffer, char* arg_buffer) { + auto type = static_cast(std::stoi(type_buffer)); + auto key = std::stoi(key_buffer); + auto arg = std::string(arg_buffer); + + return Request(type, key, arg); +} + +std::vector import_cs_requests(const std::string& file_path) +{ + std::ifstream infile(file_path); + + std::vector requests; + std::string line; + char type_buffer[2]; + char key_buffer[11]; + char arg_buffer[129]; + auto* reading_buffer = type_buffer; + auto buffer_index = 0; + while (std::getline(infile, line)) { + for (auto& character: line) { + if (character == ',') { + reading_buffer[buffer_index] = '\0'; + if (reading_buffer == type_buffer) { + reading_buffer = key_buffer; + } else if (reading_buffer == key_buffer) { + reading_buffer = arg_buffer; + } else { + reading_buffer = type_buffer; + requests.emplace_back(make_request( + type_buffer, + key_buffer, + arg_buffer + )); + } + buffer_index = 0; + } else { + reading_buffer[buffer_index] = character; + buffer_index++; + } + } + } + + return requests; +} + std::vector import_requests(const std::string& file_path, const std::string& field) { diff --git a/src/request/request_generation.h b/src/request/request_generation.h index 6451bb4..90e6f8c 100644 --- a/src/request/request_generation.h +++ b/src/request/request_generation.h @@ -2,8 +2,10 @@ #define WORKLOAD_REQUEST_GENERATOR_H #include +#include #include #include +#include #include #include @@ -16,6 +18,7 @@ namespace workload { typedef toml::basic_value toml_config; std::vector import_requests(const std::string& file_path, const std::string& field); +std::vector import_cs_requests(const std::string& file_path); /* Those generations were made for a simpler execution that doesn't differentiate diff --git a/src/scheduler/CMakeLists.txt b/src/scheduler/CMakeLists.txt index 10cb5f8..8575cd7 100644 --- a/src/scheduler/CMakeLists.txt +++ b/src/scheduler/CMakeLists.txt @@ -19,7 +19,6 @@ target_include_directories( target_link_libraries( scheduler PUBLIC - evpaxos graph storage request diff --git a/src/scheduler/partition.hpp b/src/scheduler/partition.hpp index a380006..2a1694e 100644 --- a/src/scheduler/partition.hpp +++ b/src/scheduler/partition.hpp @@ -3,7 +3,7 @@ #include -#include +#include #include #include #include @@ -19,6 +19,7 @@ #include #include #include +#include #include "graph/graph.hpp" #include "request/request.hpp" @@ -33,6 +34,7 @@ class Partition { public: Partition(int id) : id_{id}, + n_executed_requests_{0}, executing_{true} { socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); @@ -53,7 +55,6 @@ class Partition { } storage_.write(request.key(), request.args()); - workload_graph_.add_vertice(request.key()); } } @@ -70,18 +71,16 @@ class Partition { } void insert_data(const T& data, int weight = 0) { - data_set_.insert(data); weight_[data] = weight; total_weight_ += weight; } void remove_data(const T& data) { - data_set_.erase(data); - total_weight_ -= weight_[data]; + total_weight_ -= weight_.at(data); weight_.erase(data); } - void increase_weight(const T& data, int weight) { + void increase_weight(const T& data, int weight = 1) { weight_[data] += weight; total_weight_ += weight; } @@ -94,23 +93,12 @@ class Partition { return id_; } - const std::unordered_set& data() const { - return data_set_; - } - - static const model::Graph& workload_graph() { - return workload_graph_; - } - - static int n_executed_requests() { + int n_executed_requests() const { return n_executed_requests_; } - static std::shared_mutex& execution_mutex() { - return execution_mutex_; - } - private: + /* void on_event(struct bufferevent* bev, short ev, void *arg) { if (ev & BEV_EVENT_EOF || ev & BEV_EVENT_ERROR) { @@ -139,26 +127,7 @@ class Partition { printf("Failed to send answer\n"); } } - - void update_graph(const std::vector& data) { - for (auto i = 0; i < data.size(); i++) { - if (not workload_graph_.vertice_exists(data[i])) { - workload_graph_.add_vertice(data[i]); - } - - workload_graph_.increase_vertice_weight(data[i]); - for (auto j = i+1; j < data.size(); j++) { - if (not workload_graph_.vertice_exists(data[j])) { - workload_graph_.add_vertice(data[j]); - } - if (not workload_graph_.are_connected(data[i], data[j])) { - workload_graph_.add_edge(data[i], data[j]); - } - - workload_graph_.increase_edge_weight(data[i], data[j]); - } - } - } + */ void thread_loop() { while (executing_) { @@ -166,7 +135,6 @@ class Partition { if (not executing_) { return; } - std::shared_lock lock(execution_mutex_); queue_mutex_.lock(); auto request = std::move(requests_queue_.front()); @@ -183,7 +151,6 @@ class Partition { case READ: { answer = std::move(storage_.read(key)); - update_graph(std::vector{key}); break; } @@ -191,7 +158,6 @@ class Partition { { storage_.write(key, request_args); answer = request_args; - update_graph(std::vector{key}); break; } @@ -207,7 +173,6 @@ class Partition { std::vector keys(length); std::iota(keys.begin(), keys.end(), 1); - update_graph(keys); break; } @@ -233,46 +198,25 @@ class Partition { continue; } - reply_message reply; - reply.id = request.id; - strncpy(reply.answer, answer.c_str(), answer.size()); - reply.answer[answer.size()] = '\0'; - - answer_client((char *)&reply, sizeof(reply_message), request); - - std::lock_guard lk(executed_requests_mutex_); n_executed_requests_++; } } - int id_, socket_fd_; + int id_, socket_fd_, n_executed_requests_; static kvstorage::Storage storage_; - static model::Graph workload_graph_; - static int n_executed_requests_; - static std::mutex executed_requests_mutex_; bool executing_; std::thread worker_thread_; sem_t semaphore_; std::queue requests_queue_; std::mutex queue_mutex_; - static std::shared_mutex execution_mutex_; int total_weight_ = 0; - std::unordered_set data_set_; std::unordered_map weight_; }; template kvstorage::Storage Partition::storage_; -template -model::Graph Partition::workload_graph_; -template -int Partition::n_executed_requests_ = 0; -template -std::mutex Partition::executed_requests_mutex_; -template -std::shared_mutex Partition::execution_mutex_; } diff --git a/src/scheduler/scheduler.hpp b/src/scheduler/scheduler.hpp index 6aca7e1..4aae08b 100644 --- a/src/scheduler/scheduler.hpp +++ b/src/scheduler/scheduler.hpp @@ -13,8 +13,10 @@ #include #include #include +#include #include +#include "graph/graph.hpp" #include "graph/partitioning.h" #include "partition.hpp" #include "request/request.hpp" @@ -28,7 +30,8 @@ template class Scheduler { public: - Scheduler(int repartition_interval, + Scheduler(int n_requests, + int repartition_interval, int n_partitions, model::CutMethod repartition_method ) : n_partitions_{n_partitions}, @@ -36,16 +39,20 @@ class Scheduler { repartition_method_{repartition_method} { for (auto i = 0; i < n_partitions_; i++) { - partitions_.emplace(i, i); + auto* partition = new Partition(i); + partitions_.emplace(i, partition); } data_to_partition_ = new std::unordered_map*>(); - pthread_barrier_init(&start_repartition_barrier_, NULL, 2); - pthread_barrier_init(&finish_repartition_barrier_, NULL, 2); - repartition_thread_ = std::thread(&Scheduler::repartition_data_, this); + sem_init(&graph_requests_semaphore_, 0, 0); + pthread_barrier_init(&repartition_barrier_, NULL, 2); + graph_thread_ = std::thread(&Scheduler::update_graph_loop, this); } ~Scheduler() { + for (auto partition: partitions_) { + delete partition; + } delete data_to_partition_; } @@ -58,12 +65,21 @@ class Scheduler { void run() { for (auto& kv : partitions_) { - kv.second.start_worker_thread(); + kv.second->start_worker_thread(); } } int n_executed_requests() { - return Partition::n_executed_requests(); + auto n_executed_requests = 0; + for (auto& kv: partitions_) { + auto* partition = kv.second; + n_executed_requests += partition->n_executed_requests(); + } + return n_executed_requests; + } + + const std::vector& repartition_timestamps() const { + return repartition_timestamps_; } void schedule_and_answer(struct client_message& request) { @@ -81,7 +97,7 @@ class Scheduler { auto partitions = std::move(involved_partitions(request)); if (partitions.empty()) { request.type = ERROR; - return partitions_.at(0).push_request(request); + return partitions_.at(0)->push_request(request); } auto arbitrary_partition = *begin(partitions); @@ -93,16 +109,27 @@ class Scheduler { arbitrary_partition->push_request(request); } - n_dispatched_requests_++; - if (repartition_interval_ > 0) { + if (repartition_method_ != model::ROUND_ROBIN) { + graph_requests_mutex_.lock(); + graph_requests_queue_.push(request); + graph_requests_mutex_.unlock(); + sem_post(&graph_requests_semaphore_); + + n_dispatched_requests_++; if ( n_dispatched_requests_ % repartition_interval_ == 0 ) { - auto& execution_mutex = Partition::execution_mutex(); - std::unique_lock lock(execution_mutex); + struct client_message sync_message; + sync_message.type = SYNC; + + graph_requests_mutex_.lock(); + graph_requests_queue_.push(sync_message); + graph_requests_mutex_.unlock(); + sem_post(&graph_requests_semaphore_); - pthread_barrier_wait(&start_repartition_barrier_); - pthread_barrier_wait(&finish_repartition_barrier_); + pthread_barrier_wait(&repartition_barrier_); + repartition_data(); + sync_all_partitions(); } } } @@ -156,47 +183,107 @@ class Scheduler { void sync_all_partitions() { std::unordered_set*> partitions; for (auto i = 0; i < partitions_.size(); i++) { - partitions.insert(&partitions_.at(i)); + partitions.insert(partitions_.at(i)); } sync_partitions(partitions); } void add_key(T key) { auto partition_id = round_robin_counter_; - partitions_.at(partition_id).insert_data(key); - data_to_partition_->emplace(key, &partitions_.at(partition_id)); + data_to_partition_->emplace(key, partitions_.at(partition_id)); round_robin_counter_ = (round_robin_counter_+1) % n_partitions_; + + if (repartition_method_ != model::ROUND_ROBIN) { + struct client_message write_message; + write_message.type = WRITE; + write_message.key = key; + write_message.s_addr = (unsigned long) partitions_.at(partition_id); + write_message.sin_port = 1; + + graph_requests_mutex_.lock(); + graph_requests_queue_.push(write_message); + graph_requests_mutex_.unlock(); + sem_post(&graph_requests_semaphore_); + } } bool mapped(T key) const { return data_to_partition_->find(key) != data_to_partition_->end(); } - void repartition_data_() { - while (true) { - pthread_barrier_wait(&start_repartition_barrier_); + void update_graph_loop() { + while(true) { + sem_wait(&graph_requests_semaphore_); + graph_requests_mutex_.lock(); + auto request = std::move(graph_requests_queue_.front()); + graph_requests_queue_.pop(); + graph_requests_mutex_.unlock(); + + if (request.type == SYNC) { + pthread_barrier_wait(&repartition_barrier_); + } else { + if (request.type == WRITE and request.sin_port == 1) { + auto partition = (Partition*) request.s_addr; + data_to_partition_copy_.emplace(request.key, partition); + partition->insert_data(request.key); + continue; + } + update_graph(request); + } + } + } + + void update_graph(const client_message& message) { + std::vector data{message.key}; + if (message.type == SCAN) { + for (auto i = 1; i < std::stoi(message.args); i++) { + data.emplace_back(message.key+i); + } + } - auto& workload_graph = Partition::workload_graph(); - auto partition_scheme = std::move( - model::cut_graph(workload_graph, partitions_.size(), repartition_method_) - ); + for (auto i = 0; i < data.size(); i++) { + if (not workload_graph_.vertice_exists(data[i])) { + workload_graph_.add_vertice(data[i]); + } - delete data_to_partition_; - data_to_partition_ = new std::unordered_map*>(); - auto sorted_vertex = std::move(workload_graph.sorted_vertex()); - for (auto i = 0; i < partition_scheme.size(); i++) { - auto partition = partition_scheme[i]; - if (partition >= n_partitions_) { - printf("ERROR: partition was %d!\n", partition); - fflush(stdout); + workload_graph_.increase_vertice_weight(data[i]); + data_to_partition_copy_.at(data[i])->increase_weight(data[i]); + for (auto j = i+1; j < data.size(); j++) { + if (not workload_graph_.vertice_exists(data[j])) { + workload_graph_.add_vertice(data[j]); } - auto data = sorted_vertex[i]; - data_to_partition_->emplace(data, &partitions_.at(partition)); + if (not workload_graph_.are_connected(data[i], data[j])) { + workload_graph_.add_edge(data[i], data[j]); + } + + workload_graph_.increase_edge_weight(data[i], data[j]); } + } + } + + void repartition_data() { + auto start_timestamp = std::chrono::system_clock::now(); + repartition_timestamps_.emplace_back(start_timestamp); - pthread_barrier_wait(&finish_repartition_barrier_); + auto partition_scheme = model::cut_graph( + workload_graph_, n_partitions_, repartition_method_ + ); + + delete data_to_partition_; + data_to_partition_ = new std::unordered_map*>(); + auto sorted_vertex = std::move(workload_graph_.sorted_vertex()); + for (auto i = 0; i < partition_scheme.size(); i++) { + auto partition = partition_scheme[i]; + if (partition >= n_partitions_) { + printf("ERROR: partition was %d!\n", partition); + fflush(stdout); + } + auto data = sorted_vertex[i]; + data_to_partition_->emplace(data, partitions_.at(partition)); } + + data_to_partition_copy_ = *data_to_partition_; } int n_partitions_; @@ -204,13 +291,20 @@ class Scheduler { int sync_counter_ = 0; int n_dispatched_requests_ = 0; kvstorage::Storage storage_; - std::unordered_map> partitions_; + std::unordered_map*> partitions_; std::unordered_map*>* data_to_partition_; + std::unordered_map*> data_to_partition_copy_; + + std::thread graph_thread_; + std::queue graph_requests_queue_; + sem_t graph_requests_semaphore_; + std::mutex graph_requests_mutex_; + std::vector repartition_timestamps_; + model::Graph workload_graph_; model::CutMethod repartition_method_; int repartition_interval_; - std::thread repartition_thread_; - pthread_barrier_t start_repartition_barrier_, finish_repartition_barrier_; + pthread_barrier_t repartition_barrier_; }; }; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index ed7fad9..a3cb18b 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -17,6 +17,7 @@ target_include_directories( target_link_libraries( storage PRIVATE + compresser types CONAN_PKG::tbb ) diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index dfc8ac4..2f35ca7 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -3,19 +3,31 @@ namespace kvstorage { +int VALUE_SIZE = 4096; +std::string template_value(VALUE_SIZE, '*'); + + std::string Storage::read(int key) const { - return storage_.at(key); + try { + return decompress(storage_.at(key)); + } catch(...) { + // I'm not sure why sometimes decompression fails. + // It fails in what seems to be random keys and in less + // than 0.00001% of calls, so lets just ignore it for now. + // It'll be wise to investigate + return template_value; + } } void Storage::write(int key, const std::string& value) { - storage_[key] = value; + storage_[key] = compress(template_value); } std::vector Storage::scan(int start, int length) { auto values = std::vector(); for (auto i = 0; i < length; i++) { auto key = (start + i) % storage_.size(); - values.push_back(storage_[key]); + values.push_back(read(key)); } return values; } diff --git a/src/storage/storage.h b/src/storage/storage.h index 370c71e..31883c3 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -6,6 +6,7 @@ #include #include +#include "compresser/compresser.h" #include "tbb/concurrent_unordered_map.h" #include "types/types.h" diff --git a/src/types/CMakeLists.txt b/src/types/CMakeLists.txt index e3356c7..f401a14 100644 --- a/src/types/CMakeLists.txt +++ b/src/types/CMakeLists.txt @@ -12,5 +12,4 @@ target_link_libraries( types PUBLIC CONAN_PKG::tbb - evpaxos ) diff --git a/src/types/types.h b/src/types/types.h index 22fb945..5f4d6ef 100644 --- a/src/types/types.h +++ b/src/types/types.h @@ -16,25 +16,26 @@ #include #include -#include -#include - typedef std::chrono::_V2::system_clock::time_point time_point; -struct client_args { - bool verbose; - int print_percentage; - unsigned short reply_port; - tbb::concurrent_unordered_map* sent_timestamp; - std::mutex* print_mutex; -}; - struct reply_message { int id; char answer[1031]; }; +struct client_message { + int id; + unsigned long s_addr; + unsigned short sin_port; + int key; + int type; + bool record_timestamp; + char args[4]; + size_t size; +}; +typedef struct client_message client_message; + enum request_type { READ, @@ -44,184 +45,4 @@ enum request_type ERROR }; -struct stats -{ - long min_latency; - long max_latency; - long avg_latency; - int delivered_count; - size_t delivered_bytes; -}; - -typedef void (*reply_callback)(const struct reply_message& c, void *args); -struct client -{ - int id; - int value_size; - int outstanding; - char* send_buffer; - std::unordered_map* - sent_requests_timestamp; - struct stats stats; - struct event_base* base; - struct bufferevent* bev; - struct evconnlistener* listener; - reply_callback reply_cb; - struct event* stats_ev; - struct timeval stats_interval; - struct event* sig; - struct evlearner* learner; - void* args; -}; - -struct peer -{ - int id; - int status; - struct bufferevent* bev; - struct event* reconnect_ev; - struct sockaddr_in addr; - struct peers* peers; -}; - -struct evpaxos_replica -{ - struct peers* peers; - struct evlearner* learner; - struct evproposer* proposer; - struct evacceptor* acceptor; - deliver_function deliver; - void* arg; -}; - -enum paxos_message_type -{ - PAXOS_PREPARE, - PAXOS_PROMISE, - PAXOS_ACCEPT, - PAXOS_ACCEPTED, - PAXOS_PREEMPTED, - PAXOS_REPEAT, - PAXOS_TRIM, - PAXOS_ACCEPTOR_STATE, - PAXOS_CLIENT_VALUE -}; -typedef enum paxos_message_type paxos_message_type; - -struct paxos_value -{ - int paxos_value_len; - char *paxos_value_val; -}; -typedef struct paxos_value paxos_value; - -struct paxos_prepare -{ - uint32_t iid; - uint32_t ballot; -}; -typedef struct paxos_prepare paxos_prepare; - -struct paxos_promise -{ - uint32_t aid; - uint32_t iid; - uint32_t ballot; - uint32_t value_ballot; - paxos_value value; -}; -typedef struct paxos_promise paxos_promise; - -struct paxos_accept -{ - uint32_t iid; - uint32_t ballot; - paxos_value value; -}; -typedef struct paxos_accept paxos_accept; - -struct paxos_accepted -{ - uint32_t aid; - uint32_t iid; - uint32_t ballot; - uint32_t value_ballot; - paxos_value value; -}; -typedef struct paxos_accepted paxos_accepted; - -struct paxos_preempted -{ - uint32_t aid; - uint32_t iid; - uint32_t ballot; -}; -typedef struct paxos_preempted paxos_preempted; - -struct paxos_repeat -{ - uint32_t from; - uint32_t to; -}; -typedef struct paxos_repeat paxos_repeat; - -struct paxos_trim -{ - uint32_t iid; -}; -typedef struct paxos_trim paxos_trim; - -struct paxos_acceptor_state -{ - uint32_t aid; - uint32_t trim_iid; -}; -typedef struct paxos_acceptor_state paxos_acceptor_state; - -struct paxos_client_value -{ - paxos_value value; -}; -typedef struct paxos_client_value paxos_client_value; - - -struct paxos_message -{ - paxos_message_type type; - union - { - paxos_prepare prepare; - paxos_promise promise; - paxos_accept accept; - paxos_accepted accepted; - paxos_preempted preempted; - paxos_repeat repeat; - paxos_trim trim; - paxos_acceptor_state state; - paxos_client_value client_value; - } u; -}; -typedef struct paxos_message paxos_message; - -typedef void (*peer_cb)(struct peer* p, paxos_message* m, void* arg); - -struct subscription -{ - paxos_message_type type; - peer_cb callback; - void* arg; -}; - -struct peers -{ - int peers_count, clients_count; - struct peer** peers; /* peers we connected to */ - struct peer** clients; /* peers we accepted connections from */ - struct evconnlistener* listener; - struct event_base* base; - struct evpaxos_config* config; - int subs_count; - struct subscription subs[32]; -}; - #endif