From db849b97e62887040ee558d3013a6b890f4b6037 Mon Sep 17 00:00:00 2001 From: Daniel Salwasser Date: Wed, 29 May 2024 17:29:55 +0200 Subject: [PATCH 1/6] feat(compressed-graph): overcommit less memory during parallel compression --- apps/io/parhip_parser.cc | 38 ++++++++++--------- .../compressed_graph_builder.cc | 24 ++++++++++-- .../datastructures/compressed_graph_builder.h | 19 ++++++++++ 3 files changed, 61 insertions(+), 20 deletions(-) diff --git a/apps/io/parhip_parser.cc b/apps/io/parhip_parser.cc index 5dc7c51a..2a872648 100644 --- a/apps/io/parhip_parser.cc +++ b/apps/io/parhip_parser.cc @@ -393,22 +393,6 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool return (nodes[node] - nodes_offset_base) / sizeof(NodeID); }; - // Initializes the data structures used to build the compressed graph in parallel. - ParallelCompressedGraphBuilder builder( - header.num_nodes, header.num_edges, header.has_node_weights, header.has_edge_weights, sorted - ); - - tbb::enumerable_thread_specific> offsets_ets; - tbb::enumerable_thread_specific>> neighbourhood_ets; - tbb::enumerable_thread_specific neighbourhood_builder_ets([&] { - return CompressedEdgesBuilder( - num_nodes, num_edges, header.has_edge_weights, builder.edge_weights() - ); - }); - - const std::size_t num_threads = tbb::this_task_arena::max_concurrency(); - ConcurrentCircularVectorMutex buffer(num_threads); - // To compress the graph in parallel the nodes are split into chunks. Each parallel task fetches // a chunk and compresses the neighbourhoods of the corresponding nodes. The compressed // neighborhoods are meanwhile temporarily stored in a buffer. They are moved into the @@ -418,13 +402,17 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool const EdgeID max_chunk_size = num_edges / kNumChunks; std::vector> chunks; + NodeID max_degree = 0; TIMED_SCOPE("Compute chunks") { NodeID cur_chunk_start = 0; EdgeID cur_chunk_size = 0; for (NodeID node = 0; node < num_nodes; ++node) { const auto degree = static_cast((nodes[node + 1] - nodes[node]) / sizeof(NodeID)); - cur_chunk_size += degree; + if (degree > max_degree) { + max_degree = degree; + } + cur_chunk_size += degree; if (cur_chunk_size >= max_chunk_size) { if (cur_chunk_start == node) { chunks.emplace_back(node, node + 1); @@ -443,6 +431,22 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool } }; + // Initializes the data structures used to build the compressed graph in parallel. + ParallelCompressedGraphBuilder builder( + header.num_nodes, header.num_edges, header.has_node_weights, header.has_edge_weights, sorted + ); + + tbb::enumerable_thread_specific> offsets_ets; + tbb::enumerable_thread_specific>> neighbourhood_ets; + tbb::enumerable_thread_specific neighbourhood_builder_ets([&] { + return CompressedEdgesBuilder( + num_nodes, num_edges, max_degree, header.has_edge_weights, builder.edge_weights() + ); + }); + + const std::size_t num_threads = tbb::this_task_arena::max_concurrency(); + ConcurrentCircularVectorMutex buffer(num_threads); + tbb::enumerable_thread_specific dbg_ets; tbb::parallel_for(0, chunks.size(), [&](const auto) { auto &dbg = dbg_ets.local(); diff --git a/kaminpar-shm/datastructures/compressed_graph_builder.cc b/kaminpar-shm/datastructures/compressed_graph_builder.cc index 7ec77a56..729ca1d3 100644 --- a/kaminpar-shm/datastructures/compressed_graph_builder.cc +++ b/kaminpar-shm/datastructures/compressed_graph_builder.cc @@ -25,13 +25,18 @@ namespace kaminpar::shm { namespace { +template [[nodiscard]] std::size_t compressed_edge_array_max_size(const NodeID num_nodes, const EdgeID num_edges) { std::size_t edge_id_width; - if constexpr (CompressedGraph::kIntervalEncoding) { - edge_id_width = marked_varint_length(num_edges); + if constexpr (kActualNumEdges) { + if constexpr (CompressedGraph::kIntervalEncoding) { + edge_id_width = marked_varint_length(num_edges); + } else { + edge_id_width = varint_length(num_edges); + } } else { - edge_id_width = varint_length(num_edges); + edge_id_width = varint_max_length(); } std::size_t max_size = num_nodes * edge_id_width + num_edges * varint_length(num_nodes); @@ -63,6 +68,19 @@ CompressedEdgesBuilder::CompressedEdgesBuilder( _compressed_data_start = heap_profiler::overcommit_memory(max_size); } +CompressedEdgesBuilder::CompressedEdgesBuilder( + const NodeID num_nodes, + const EdgeID num_edges, + const NodeID max_degree, + bool has_edge_weights, + StaticArray &edge_weights +) + : _has_edge_weights(has_edge_weights), + _edge_weights(edge_weights) { + const std::size_t max_size = compressed_edge_array_max_size(num_nodes, max_degree); + _compressed_data_start = heap_profiler::overcommit_memory(max_size); +} + void CompressedEdgesBuilder::init(const EdgeID first_edge) { _compressed_data = _compressed_data_start.get(); diff --git a/kaminpar-shm/datastructures/compressed_graph_builder.h b/kaminpar-shm/datastructures/compressed_graph_builder.h index 22e23977..32c31be7 100644 --- a/kaminpar-shm/datastructures/compressed_graph_builder.h +++ b/kaminpar-shm/datastructures/compressed_graph_builder.h @@ -34,6 +34,25 @@ class CompressedEdgesBuilder { StaticArray &edge_weights ); + /*! + * Constructs a new CompressedEdgesBuilder where the maxmimum degree specifies the number of edges + * that are compressed at once. + * + * @param num_nodes The number of nodes of the graph to compress. + * @param num_edges The number of edges of the graph to compress. + * @param max_degree The maximum degree of the graph to compress. + * @param has_edge_weights Whether the graph to compress has edge weights. + * @param edge_weights A reference to the edge weights of the compressed graph. + * @param edge_weights A reference to the edge weights of the compressed graph. + */ + CompressedEdgesBuilder( + const NodeID num_nodes, + const EdgeID num_edges, + const NodeID max_degree, + bool has_edge_weights, + StaticArray &edge_weights + ); + CompressedEdgesBuilder(const CompressedEdgesBuilder &) = delete; CompressedEdgesBuilder &operator=(const CompressedEdgesBuilder &) = delete; From c1a1d4dbd2f3ae763ce15fe9a072d0a861a8ca64 Mon Sep 17 00:00:00 2001 From: Daniel Salwasser Date: Sat, 1 Jun 2024 12:37:08 +0200 Subject: [PATCH 2/6] feat(kaminpar-shm): add option to compress graph in memory for the input benchmark --- apps/benchmarks/shm_input_benchmark.cc | 48 +++++++++++++---- apps/io/shm_io.cc | 21 +++++--- apps/io/shm_io.h | 17 ++++-- .../compressed_graph_builder.cc | 52 +++++++++++++------ 4 files changed, 103 insertions(+), 35 deletions(-) diff --git a/apps/benchmarks/shm_input_benchmark.cc b/apps/benchmarks/shm_input_benchmark.cc index f446de77..05dbf55f 100644 --- a/apps/benchmarks/shm_input_benchmark.cc +++ b/apps/benchmarks/shm_input_benchmark.cc @@ -12,6 +12,7 @@ #include #include "kaminpar-shm/context_io.h" +#include "kaminpar-shm/datastructures/compressed_graph_builder.h" #include "kaminpar-common/console_io.h" #include "kaminpar-common/logger.h" @@ -31,6 +32,7 @@ int main(int argc, char *argv[]) { // Parse CLI arguments std::string graph_filename; io::GraphFileFormat graph_file_format = io::GraphFileFormat::METIS; + bool compress_in_memory = false; int seed = 0; CLI::App app("Shared-memory input benchmark"); @@ -41,9 +43,15 @@ int main(int argc, char *argv[]) { - metis - parhip)") ->capture_default_str(); + app.add_flag( + "--compress-in-memory", + compress_in_memory, + "Whether to compress the input graph in memory when graph compression is enabled" + ) + ->capture_default_str(); app.add_option("-t,--threads", ctx.parallel.num_threads, "Number of threads") ->capture_default_str(); - app.add_option("-s,--seed", seed, "Seed for random number generation.")->capture_default_str(); + app.add_option("-s,--seed", seed, "Seed for random number generation")->capture_default_str(); app.add_option("-k,--k", ctx.partition.k); app.add_option("-e,--epsilon", ctx.partition.epsilon); create_graph_compression_options(&app, ctx); @@ -59,14 +67,36 @@ int main(int argc, char *argv[]) { SCOPED_HEAP_PROFILER("Read Input Graph"); SCOPED_TIMER("Read Input Graph"); - Graph graph = io::read( - graph_filename, - graph_file_format, - ctx.compression.enabled, - ctx.compression.may_dismiss, - false - ); - ctx.setup(graph); + if (ctx.compression.enabled && compress_in_memory) { + CSRGraph csr_graph = TIMED_SCOPE("Read CSR Graph") { + SCOPED_HEAP_PROFILER("Read CSR Graph"); + return io::csr_read(graph_filename, graph_file_format, false); + }; + + SCOPED_TIMER("Compress CSR Graph"); + SCOPED_HEAP_PROFILER("Compress CSR Graph"); + + const bool sequential_compression = ctx.parallel.num_threads <= 1; + if (sequential_compression) { + Graph graph = + Graph(std::make_unique(CompressedGraphBuilder::compress(csr_graph))); + ctx.setup(graph); + } else { + Graph graph = Graph( + std::make_unique(ParallelCompressedGraphBuilder::compress(csr_graph)) + ); + ctx.setup(graph); + } + } else { + Graph graph = io::read( + graph_filename, + graph_file_format, + ctx.compression.enabled, + ctx.compression.may_dismiss, + false + ); + ctx.setup(graph); + } } DISABLE_HEAP_PROFILER(); diff --git a/apps/io/shm_io.cc b/apps/io/shm_io.cc index f4cce9d2..ec8064ef 100644 --- a/apps/io/shm_io.cc +++ b/apps/io/shm_io.cc @@ -25,6 +25,18 @@ std::unordered_map get_graph_file_formats() { }; } +CSRGraph +csr_read(const std::string &filename, const GraphFileFormat file_format, const bool sorted) { + switch (file_format) { + case GraphFileFormat::METIS: + return metis::csr_read(filename, sorted); + case GraphFileFormat::PARHIP: + return parhip::csr_read(filename, sorted); + default: + __builtin_unreachable(); + } +} + Graph read( const std::string &filename, const GraphFileFormat file_format, @@ -61,14 +73,7 @@ Graph read( } } - switch (file_format) { - case GraphFileFormat::METIS: - return Graph(std::make_unique(metis::csr_read(filename, sorted))); - case GraphFileFormat::PARHIP: - return Graph(std::make_unique(parhip::csr_read(filename, sorted))); - default: - __builtin_unreachable(); - } + return Graph(std::make_unique(csr_read(filename, file_format, sorted))); } namespace partition { diff --git a/apps/io/shm_io.h b/apps/io/shm_io.h index 517e0286..3df3885a 100644 --- a/apps/io/shm_io.h +++ b/apps/io/shm_io.h @@ -11,6 +11,7 @@ #include #include +#include "kaminpar-shm/datastructures/csr_graph.h" #include "kaminpar-shm/datastructures/graph.h" namespace kaminpar::shm::io { @@ -30,16 +31,26 @@ enum class GraphFileFormat { */ [[nodiscard]] std::unordered_map get_graph_file_formats(); +/** + * Reads a graph that is stored in METIS or ParHip format. + * + * @param filename The name of the file to read. + * @param file_format The format of the file used to store the graph. + * @param sorted Whether the nodes of the graph to read are stored in degree-buckets order. + * @return The graph to read in CSR format. + */ +CSRGraph +csr_read(const std::string &filename, const GraphFileFormat file_format, const bool sorted); + /*! * Reads a graph that is either stored in METIS, ParHiP or compressed format. * * @param filename The name of the file to read. * @param file_format The format of the file used to store the graph. * @param compress Whether to compress the graph. - * @param may_dismiss Whether the compressed graph is only returned when it uses less memory than - * the uncompressed graph. + * @param may_dismiss Whether to only return the compressed graph if it uses less memory than the + * uncompressed graph. * @param sorted Whether the nodes of the graph to read are stored in degree-buckets order. - * @param validate Whether to validate the graph. * @return The graph to read. */ Graph read( diff --git a/kaminpar-shm/datastructures/compressed_graph_builder.cc b/kaminpar-shm/datastructures/compressed_graph_builder.cc index 729ca1d3..c1cf9cee 100644 --- a/kaminpar-shm/datastructures/compressed_graph_builder.cc +++ b/kaminpar-shm/datastructures/compressed_graph_builder.cc @@ -520,9 +520,38 @@ std::int64_t CompressedGraphBuilder::total_edge_weight() const { } CompressedGraph ParallelCompressedGraphBuilder::compress(const CSRGraph &graph) { + const NodeID num_nodes = graph.n(); + const EdgeID num_edges = graph.m(); const bool has_node_weights = graph.is_node_weighted(); const bool has_edge_weights = graph.is_edge_weighted(); + constexpr std::size_t kNumChunks = 5000; + const EdgeID max_chunk_size = num_edges / kNumChunks; + std::vector> chunks; + + NodeID cur_chunk_start = 0; + EdgeID cur_chunk_size = 0; + for (NodeID node = 0; node < num_nodes; ++node) { + const NodeID degree = graph.degree(node); + + cur_chunk_size += degree; + if (cur_chunk_size >= max_chunk_size) { + if (cur_chunk_start == node) { + chunks.emplace_back(node, node + 1); + cur_chunk_start = node + 1; + } else { + chunks.emplace_back(cur_chunk_start, node); + cur_chunk_start = node; + } + + cur_chunk_size = 0; + } + } + + if (cur_chunk_start != num_nodes) { + chunks.emplace_back(cur_chunk_start, num_nodes); + } + ParallelCompressedGraphBuilder builder( graph.n(), graph.m(), has_node_weights, has_edge_weights, graph.sorted() ); @@ -530,33 +559,26 @@ CompressedGraph ParallelCompressedGraphBuilder::compress(const CSRGraph &graph) tbb::enumerable_thread_specific> offsets_ets; tbb::enumerable_thread_specific>> neighbourhood_ets; tbb::enumerable_thread_specific neighbourhood_builder_ets([&] { - return CompressedEdgesBuilder(graph.n(), graph.m(), has_edge_weights, builder.edge_weights()); + return CompressedEdgesBuilder( + graph.n(), graph.m(), graph.max_degree(), has_edge_weights, builder.edge_weights() + ); }); - ConcurrentCircularVectorSpinlock buffer(tbb::this_task_arena::max_concurrency()); - - constexpr NodeID chunk_size = 4096; - const NodeID num_chunks = math::div_ceil(graph.n(), chunk_size); - const NodeID last_chunk_size = - ((graph.n() % chunk_size) != 0) ? (graph.n() % chunk_size) : chunk_size; + const std::size_t num_threads = tbb::this_task_arena::max_concurrency(); + ConcurrentCircularVectorMutex buffer(num_threads); - tbb::parallel_for(0, num_chunks, [&](const auto) { + tbb::parallel_for(0, chunks.size(), [&](const auto) { std::vector &offsets = offsets_ets.local(); std::vector> &neighbourhood = neighbourhood_ets.local(); CompressedEdgesBuilder &neighbourhood_builder = neighbourhood_builder_ets.local(); - NodeWeight local_node_weight = 0; - EdgeWeight local_edge_weight = 0; - const NodeID chunk = buffer.next(); - const NodeID start_node = chunk * chunk_size; - - const NodeID chunk_length = (chunk + 1 == num_chunks) ? last_chunk_size : chunk_size; - const NodeID end_node = start_node + chunk_length; + const auto [start_node, end_node] = chunks[chunk]; const EdgeID first_edge = graph.first_edge(start_node); neighbourhood_builder.init(first_edge); + NodeWeight local_node_weight = 0; for (NodeID node = start_node; node < end_node; ++node) { for (const auto [incident_edge, adjacent_node] : graph.neighbors(node)) { neighbourhood.emplace_back(adjacent_node, graph.edge_weight(incident_edge)); From 4ea738e0c87fe988aa88970425e791bd0fa257f1 Mon Sep 17 00:00:00 2001 From: Daniel Salwasser Date: Sat, 1 Jun 2024 13:19:14 +0200 Subject: [PATCH 3/6] feat(kaminpar-common): use static array as the container for the fast reset array --- kaminpar-common/datastructures/fast_reset_array.h | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/kaminpar-common/datastructures/fast_reset_array.h b/kaminpar-common/datastructures/fast_reset_array.h index 39c85241..ff4e56e1 100644 --- a/kaminpar-common/datastructures/fast_reset_array.h +++ b/kaminpar-common/datastructures/fast_reset_array.h @@ -11,6 +11,7 @@ #include "kaminpar-common/assert.h" #include "kaminpar-common/datastructures/scalable_vector.h" +#include "kaminpar-common/datastructures/static_array.h" #include "kaminpar-common/heap_profiler.h" #include "kaminpar-common/ranges.h" @@ -22,7 +23,7 @@ template class FastResetArray { using const_reference = const Value &; using size_type = Size; - explicit FastResetArray(const std::size_t capacity = 0) : _data(capacity) { + explicit FastResetArray(const std::size_t capacity = 0) : _data(capacity, static_array::seq) { RECORD_DATA_STRUCT(capacity * sizeof(value_type), _struct); } @@ -40,7 +41,7 @@ template class FastResetArray { IF_HEAP_PROFILING( _struct->size = std::max( _struct->size, - _data.capacity() * sizeof(value_type) + _used_entries.capacity() * sizeof(size_type) + _data.size() * sizeof(value_type) + _used_entries.capacity() * sizeof(size_type) ) ); } @@ -101,12 +102,12 @@ template class FastResetArray { return _data.size(); } void resize(const std::size_t capacity) { - _data.resize(capacity); + _data.resize(capacity, static_array::seq); IF_HEAP_PROFILING( _struct->size = std::max( _struct->size, - _data.capacity() * sizeof(value_type) + _used_entries.capacity() * sizeof(size_type) + _data.size() * sizeof(value_type) + _used_entries.capacity() * sizeof(size_type) ) ); } @@ -116,7 +117,7 @@ template class FastResetArray { } private: - ScalableVector _data; + StaticArray _data; ScalableVector _used_entries{}; IF_HEAP_PROFILING(heap_profiler::DataStructure *_struct); From 506a66a9daf5c4b3ab5ceaad30cdbdaaf946e8c6 Mon Sep 17 00:00:00 2001 From: Daniel Salwasser Date: Sat, 1 Jun 2024 13:20:00 +0200 Subject: [PATCH 4/6] feat(kaminpar-common): use static array as the container for the concurrent fast reset array --- .../concurrent_fast_reset_array.h | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/kaminpar-common/datastructures/concurrent_fast_reset_array.h b/kaminpar-common/datastructures/concurrent_fast_reset_array.h index 8bdf4a60..c0d97292 100644 --- a/kaminpar-common/datastructures/concurrent_fast_reset_array.h +++ b/kaminpar-common/datastructures/concurrent_fast_reset_array.h @@ -12,8 +12,10 @@ #include #include +#include #include +#include "kaminpar-common/datastructures/static_array.h" #include "kaminpar-common/heap_profiler.h" #include "kaminpar-common/parallel/aligned_element.h" @@ -23,7 +25,7 @@ namespace kaminpar { * A static array that can reset used elements in O(# of used elements). * * @tparam Value The type of value to store. - * @tparam Size The type of index to use to access and save values. + * @tparam Size The type of index to use to access and store values. */ template class ConcurrentFastResetArray { public: @@ -34,7 +36,7 @@ template class ConcurrentFastReset /*! * Constructs a new ConcurrentFastResetArray. * - * @param capacity The capacity of the map, i.e. the amount of values to possibly save. + * @param capacity The capacity of the map, i.e., the number of values that can be stored. */ explicit ConcurrentFastResetArray(const std::size_t capacity = 0) : _data(capacity) { RECORD_DATA_STRUCT(capacity * sizeof(value_type), _struct); @@ -46,8 +48,8 @@ template class ConcurrentFastReset * * @return The capacity of this array. */ - std::size_t capacity() const { - return _data.capacity(); + [[nodiscard]] std::size_t capacity() const { + return _data.size(); } /*! @@ -72,9 +74,9 @@ template class ConcurrentFastReset } /*! - * Resized the array. + * Resizes the array. * - * @param capacity The new capacity of the map, i.e. the amount of values to possibly save. + * @param capacity The new capacity of the map, i.e., the number of values that can be stored. */ void resize(const size_type capacity) { IF_HEAP_PROFILING(_struct->size = std::max(_struct->size, capacity * sizeof(value_type))); @@ -86,18 +88,17 @@ template class ConcurrentFastReset * Frees the memory used by this data structure. */ void free() { - _data.clear(); - _data.shrink_to_fit(); + _data.free(); _used_entries_tls.clear(); _used_entries_tls.shrink_to_fit(); } /*! - * Iterates over all thread-local vector of used entries and clears them afterwards. + * Iterates over all thread-local vectors of used entries and clears them afterwards. * * @param l The function object that is invoked with a thread-local vector of used entries before - * they are cleared. + * its cleared. */ template void iterate_and_reset(Lambda &&l) { tbb::parallel_for(0, _used_entries_tls.size(), [&](const auto i) { @@ -112,7 +113,7 @@ template class ConcurrentFastReset } private: - std::vector _data; + StaticArray _data; std::vector>> _used_entries_tls; IF_HEAP_PROFILING(heap_profiler::DataStructure *_struct); From a03b1eeccd301c3494755a35cd7d3e77aec33014 Mon Sep 17 00:00:00 2001 From: Daniel Salwasser Date: Sat, 1 Jun 2024 19:30:39 +0200 Subject: [PATCH 5/6] feat(compressed-graph): enable deg-bucket reordering of the compressed graph while reading --- apps/KaMinPar.cc | 5 +- apps/benchmarks/shm_input_benchmark.cc | 9 +- .../shm_label_propagation_benchmark.cc | 2 +- apps/io/parhip_parser.cc | 151 ++++++++++++++---- apps/io/parhip_parser.h | 3 +- apps/io/shm_io.cc | 6 +- apps/io/shm_io.h | 4 +- apps/tools/shm_graph_properties_tool.cc | 7 +- kaminpar-shm/kaminpar.cc | 2 +- 9 files changed, 148 insertions(+), 41 deletions(-) diff --git a/apps/KaMinPar.cc b/apps/KaMinPar.cc index f32a71ee..6002c5e5 100644 --- a/apps/KaMinPar.cc +++ b/apps/KaMinPar.cc @@ -192,7 +192,8 @@ int main(int argc, char *argv[]) { std::exit(0); } - if (ctx.compression.enabled && ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS) { + if (ctx.compression.enabled && app.graph_file_format == io::GraphFileFormat::METIS && + ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS) { std::cout << "The nodes of the compressed graph cannot be rearranged by degree buckets!" << std::endl; std::exit(0); @@ -236,7 +237,7 @@ int main(int argc, char *argv[]) { app.graph_file_format, ctx.compression.enabled, ctx.compression.may_dismiss, - ctx.node_ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS + ctx.node_ordering ); }; diff --git a/apps/benchmarks/shm_input_benchmark.cc b/apps/benchmarks/shm_input_benchmark.cc index 05dbf55f..2e75f458 100644 --- a/apps/benchmarks/shm_input_benchmark.cc +++ b/apps/benchmarks/shm_input_benchmark.cc @@ -43,6 +43,13 @@ int main(int argc, char *argv[]) { - metis - parhip)") ->capture_default_str(); + app.add_option("--node-order", ctx.node_ordering) + ->transform(CLI::CheckedTransformer(get_node_orderings()).description("")) + ->description(R"(Criteria by which the nodes of the graph are sorted and rearranged: + - natural: keep node order of the graph (do not rearrange) + - deg-buckets: sort nodes by degree bucket and rearrange accordingly + - implicit-deg-buckets: nodes of the input graph are sorted by deg-buckets order)") + ->capture_default_str(); app.add_flag( "--compress-in-memory", compress_in_memory, @@ -93,7 +100,7 @@ int main(int argc, char *argv[]) { graph_file_format, ctx.compression.enabled, ctx.compression.may_dismiss, - false + ctx.node_ordering ); ctx.setup(graph); } diff --git a/apps/benchmarks/shm_label_propagation_benchmark.cc b/apps/benchmarks/shm_label_propagation_benchmark.cc index 92c740ee..53d73098 100644 --- a/apps/benchmarks/shm_label_propagation_benchmark.cc +++ b/apps/benchmarks/shm_label_propagation_benchmark.cc @@ -65,7 +65,7 @@ int main(int argc, char *argv[]) { graph_file_format, ctx.compression.enabled, ctx.compression.may_dismiss, - ctx.node_ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS + ctx.node_ordering ); ctx.setup(graph); diff --git a/apps/io/parhip_parser.cc b/apps/io/parhip_parser.cc index 2a872648..1a3be4ef 100644 --- a/apps/io/parhip_parser.cc +++ b/apps/io/parhip_parser.cc @@ -20,9 +20,12 @@ #include #include "kaminpar-shm/datastructures/compressed_graph_builder.h" +#include "kaminpar-shm/kaminpar.h" #include "kaminpar-common/datastructures/concurrent_circular_vector.h" +#include "kaminpar-common/datastructures/static_array.h" #include "kaminpar-common/logger.h" +#include "kaminpar-common/parallel/loops.h" #include "kaminpar-common/timer.h" namespace kaminpar::shm::io::parhip { @@ -357,9 +360,66 @@ void print_stats(const auto &stats_ets) { } } // namespace debug + +std::pair, StaticArray> +sort_by_degree_buckets(const NodeID n, const StaticArray °rees) { + auto find_bucket = [&](const NodeID deg) { + return deg == 0 ? (kNumberOfDegreeBuckets - 1) : degree_bucket(deg); + }; + + const int cpus = std::min(tbb::this_task_arena::max_concurrency(), n); + RECORD("permutation") StaticArray permutation(n, static_array::noinit); + RECORD("inverse_permutation") StaticArray inverse_permutation(n, static_array::noinit); + + using Buckets = std::array + 1>; + std::vector> local_buckets(cpus + 1); + + parallel::deterministic_for(0, n, [&](const NodeID from, const NodeID to, const int cpu) { + KASSERT(cpu < cpus); + + for (NodeID u = from; u < to; ++u) { + const auto bucket = find_bucket(degrees[u]); + permutation[u] = local_buckets[cpu + 1][bucket]++; + } + }); + + // Build a table of prefix numbers to correct the position of each node in the + // final permutation After the previous loop, permutation[u] contains the + // position of u in the thread-local bucket. (i) account for smaller buckets + // --> add prefix computed in global_buckets (ii) account for the same bucket + // in smaller processor IDs --> add prefix computed in local_buckets + Buckets global_buckets{}; + for (int id = 1; id < cpus + 1; ++id) { + for (std::size_t i = 0; i + 1 < global_buckets.size(); ++i) { + global_buckets[i + 1] += local_buckets[id][i]; + } + } + parallel::prefix_sum(global_buckets.begin(), global_buckets.end(), global_buckets.begin()); + for (std::size_t i = 0; i < global_buckets.size(); ++i) { + for (int id = 0; id + 1 < cpus; ++id) { + local_buckets[id + 1][i] += local_buckets[id][i]; + } + } + + // Apply offsets to obtain global permutation + parallel::deterministic_for(0, n, [&](const NodeID from, const NodeID to, const int cpu) { + KASSERT(cpu < cpus); + + for (NodeID u = from; u < to; ++u) { + const NodeID bucket = find_bucket(degrees[u]); + permutation[u] += global_buckets[bucket] + local_buckets[cpu][bucket]; + } + }); + + // Compute inverse permutation + tbb::parallel_for(0, n, [&](const NodeID u) { inverse_permutation[permutation[u]] = u; }); + + return {std::move(permutation), std::move(inverse_permutation)}; +} + } // namespace -CompressedGraph compressed_read_parallel(const std::string &filename, const bool sorted) { +CompressedGraph compressed_read_parallel(const std::string &filename, const NodeOrdering ordering) { try { BinaryReader reader(filename); @@ -392,6 +452,28 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool const auto map_edge_offset = [&](const NodeID node) { return (nodes[node] - nodes_offset_base) / sizeof(NodeID); }; + const auto fetch_degree = [&](const NodeID node) { + return static_cast((nodes[node + 1] - nodes[node]) / sizeof(NodeID)); + }; + + RECORD("degrees") StaticArray degrees(num_nodes, static_array::noinit); + TIMED_SCOPE("Read degrees") { + tbb::parallel_for(tbb::blocked_range(0, num_nodes), [&](const auto &r) { + for (NodeID u = r.begin(); u != r.end(); ++u) { + degrees[u] = fetch_degree(u); + } + }); + }; + + const bool sort_by_degree_bucket = ordering == NodeOrdering::DEGREE_BUCKETS; + StaticArray permutation; + StaticArray inverse_permutation; + if (sort_by_degree_bucket) { + SCOPED_TIMER("Compute permutation"); + auto [perm, inv_perm] = sort_by_degree_buckets(num_nodes, degrees); + permutation = std::move(perm); + inverse_permutation = std::move(inv_perm); + } // To compress the graph in parallel the nodes are split into chunks. Each parallel task fetches // a chunk and compresses the neighbourhoods of the corresponding nodes. The compressed @@ -400,38 +482,46 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool // chunks is determined. constexpr std::size_t kNumChunks = 5000; const EdgeID max_chunk_size = num_edges / kNumChunks; - std::vector> chunks; + std::vector> chunks; NodeID max_degree = 0; TIMED_SCOPE("Compute chunks") { NodeID cur_chunk_start = 0; EdgeID cur_chunk_size = 0; - for (NodeID node = 0; node < num_nodes; ++node) { - const auto degree = static_cast((nodes[node + 1] - nodes[node]) / sizeof(NodeID)); - if (degree > max_degree) { - max_degree = degree; - } + EdgeID cur_first_edge = 0; + for (NodeID i = 0; i < num_nodes; ++i) { + NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i; + + const NodeID degree = degrees[node]; + max_degree = std::max(max_degree, degree); cur_chunk_size += degree; if (cur_chunk_size >= max_chunk_size) { - if (cur_chunk_start == node) { - chunks.emplace_back(node, node + 1); - cur_chunk_start = node + 1; + if (cur_chunk_start == i) { + chunks.emplace_back(cur_chunk_start, i + 1, cur_first_edge); + + cur_chunk_start = i + 1; + cur_first_edge += degree; + cur_chunk_size = 0; } else { - chunks.emplace_back(cur_chunk_start, node); - cur_chunk_start = node; - } + chunks.emplace_back(cur_chunk_start, i, cur_first_edge); - cur_chunk_size = 0; + cur_chunk_start = i; + cur_first_edge += cur_chunk_size - degree; + cur_chunk_size = degree; + } } } if (cur_chunk_start != num_nodes) { - chunks.emplace_back(cur_chunk_start, num_nodes); + chunks.emplace_back(cur_chunk_start, num_nodes, cur_first_edge); } }; + degrees.free(); + // Initializes the data structures used to build the compressed graph in parallel. + const bool sorted = ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS || sort_by_degree_bucket; ParallelCompressedGraphBuilder builder( header.num_nodes, header.num_edges, header.has_node_weights, header.has_edge_weights, sorted ); @@ -457,28 +547,29 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool CompressedEdgesBuilder &neighbourhood_builder = neighbourhood_builder_ets.local(); const NodeID chunk = buffer.next(); - const auto [start_node, end_node] = chunks[chunk]; - - EdgeID edge = map_edge_offset(start_node); - neighbourhood_builder.init(edge); + const auto [start, end, first_edge] = chunks[chunk]; NodeWeight local_node_weight = 0; + neighbourhood_builder.init(first_edge); // Compress the neighborhoods of the nodes in the fetched chunk. debug::scoped_time(dbg.compression_time, [&] { - for (NodeID node = start_node; node < end_node; ++node) { - const auto degree = static_cast((nodes[node + 1] - nodes[node]) / sizeof(NodeID)); + for (NodeID i = start; i < end; ++i) { + const NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i; + const NodeID degree = fetch_degree(node); IF_DBG dbg.num_edges += degree; - for (NodeID i = 0; i < degree; ++i) { - const NodeID adjacent_node = edges[edge]; + EdgeID edge = map_edge_offset(node); + for (NodeID j = 0; j < degree; ++j) { + const NodeID adjacent_node = + sort_by_degree_bucket ? permutation[edges[edge]] : edges[edge]; const EdgeWeight edge_weight = header.has_edge_weights ? edge_weights[edge] : 1; neighbourhood.emplace_back(adjacent_node, edge_weight); edge += 1; } - const EdgeID local_offset = neighbourhood_builder.add(node, neighbourhood); + const EdgeID local_offset = neighbourhood_builder.add(i, neighbourhood); offsets.push_back(local_offset); neighbourhood.clear(); @@ -494,18 +585,18 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool // Store the edge offset and node weight for each node in the chunk and copy the compressed // neighborhoods into the actual compressed edge array. debug::scoped_time(dbg.copy_time, [&] { - NodeID node = start_node; - for (EdgeID local_offset : offsets) { - builder.add_node(node, offset + local_offset); + for (NodeID i = start; i < end; ++i) { + const EdgeID local_offset = offsets[i - start]; + + builder.add_node(i, offset + local_offset); if (header.has_node_weights) { + const NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i; const NodeWeight node_weight = node_weights[node]; local_node_weight += node_weight; - builder.add_node_weight(node, node_weight); + builder.add_node_weight(i, node_weight); } - - node += 1; } offsets.clear(); diff --git a/apps/io/parhip_parser.h b/apps/io/parhip_parser.h index 08459e20..79ddbb96 100644 --- a/apps/io/parhip_parser.h +++ b/apps/io/parhip_parser.h @@ -11,6 +11,7 @@ #include "kaminpar-shm/datastructures/compressed_graph.h" #include "kaminpar-shm/datastructures/csr_graph.h" +#include "kaminpar-shm/kaminpar.h" namespace kaminpar::shm::io::parhip { @@ -39,6 +40,6 @@ CompressedGraph compressed_read(const std::string &filename, const bool sorted); * @param sorted Whether the nodes of the graph to read are stored in degree-buckets order. * @return The graph that is stored in the file. */ -CompressedGraph compressed_read_parallel(const std::string &filename, const bool sorted); +CompressedGraph compressed_read_parallel(const std::string &filename, const NodeOrdering ordering); } // namespace kaminpar::shm::io::parhip diff --git a/apps/io/shm_io.cc b/apps/io/shm_io.cc index ec8064ef..7a3be04b 100644 --- a/apps/io/shm_io.cc +++ b/apps/io/shm_io.cc @@ -42,8 +42,10 @@ Graph read( const GraphFileFormat file_format, const bool compress, const bool may_dismiss, - const bool sorted + const NodeOrdering ordering ) { + const bool sorted = ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS; + if (compressed_binary::is_compressed(filename)) { if (!compress) { LOG_ERROR << "The input graph is stored in a compressed format but graph compression is" @@ -61,7 +63,7 @@ Graph read( return metis::compress_read(filename, sorted, may_dismiss); } case GraphFileFormat::PARHIP: { - return std::optional(parhip::compressed_read_parallel(filename, sorted)); + return std::optional(parhip::compressed_read_parallel(filename, ordering)); } default: __builtin_unreachable(); diff --git a/apps/io/shm_io.h b/apps/io/shm_io.h index 3df3885a..2468ea13 100644 --- a/apps/io/shm_io.h +++ b/apps/io/shm_io.h @@ -50,7 +50,7 @@ csr_read(const std::string &filename, const GraphFileFormat file_format, const b * @param compress Whether to compress the graph. * @param may_dismiss Whether to only return the compressed graph if it uses less memory than the * uncompressed graph. - * @param sorted Whether the nodes of the graph to read are stored in degree-buckets order. + * @param ordering The node ordering of the graph to read. * @return The graph to read. */ Graph read( @@ -58,7 +58,7 @@ Graph read( const GraphFileFormat file_format, const bool compress, const bool may_dismiss, - const bool sorted + const NodeOrdering ordering ); namespace partition { diff --git a/apps/tools/shm_graph_properties_tool.cc b/apps/tools/shm_graph_properties_tool.cc index b47e3f81..aae0078f 100644 --- a/apps/tools/shm_graph_properties_tool.cc +++ b/apps/tools/shm_graph_properties_tool.cc @@ -12,6 +12,7 @@ #include #include "kaminpar-shm/context_io.h" +#include "kaminpar-shm/kaminpar.h" #include "kaminpar-common/console_io.h" #include "kaminpar-common/logger.h" @@ -92,7 +93,11 @@ int main(int argc, char *argv[]) { tbb::global_control gc(tbb::global_control::max_allowed_parallelism, ctx.parallel.num_threads); Graph graph = io::read( - graph_filename, graph_file_format, ctx.compression.enabled, ctx.compression.may_dismiss, false + graph_filename, + graph_file_format, + ctx.compression.enabled, + ctx.compression.may_dismiss, + NodeOrdering::NATURAL ); ctx.debug.graph_name = str::extract_basename(graph_filename); diff --git a/kaminpar-shm/kaminpar.cc b/kaminpar-shm/kaminpar.cc index ad6b6c7e..7673bf69 100644 --- a/kaminpar-shm/kaminpar.cc +++ b/kaminpar-shm/kaminpar.cc @@ -189,7 +189,7 @@ EdgeWeight KaMinPar::compute_partition(const BlockID k, BlockID *partition) { START_TIMER("Partitioning"); if (!_was_rearranged) { - if (_ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS) { + if (_ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS && !_graph_ptr->sorted()) { CSRGraph &csr_graph = *dynamic_cast(_graph_ptr->underlying_graph()); _graph_ptr = std::make_unique(graph::rearrange_by_degree_buckets(csr_graph)); } From 01707834a6b78356de3bd9c619723dd39db6ea23 Mon Sep 17 00:00:00 2001 From: Daniel Salwasser Date: Mon, 3 Jun 2024 08:35:55 +0200 Subject: [PATCH 6/6] feat(kaminpar-shm): update graph compression tool to support new io --- apps/tools/shm_graph_compression_tool.cc | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/apps/tools/shm_graph_compression_tool.cc b/apps/tools/shm_graph_compression_tool.cc index 8298f4c7..856195fb 100644 --- a/apps/tools/shm_graph_compression_tool.cc +++ b/apps/tools/shm_graph_compression_tool.cc @@ -11,6 +11,9 @@ #include +#include "kaminpar-shm/context_io.h" +#include "kaminpar-shm/kaminpar.h" + #include "kaminpar-common/logger.h" #include "apps/io/metis_parser.h" @@ -27,6 +30,7 @@ int main(int argc, char *argv[]) { std::string graph_filename; std::string compressed_graph_filename; GraphFileFormat graph_file_format = io::GraphFileFormat::METIS; + NodeOrdering node_ordering = NodeOrdering::NATURAL; int num_threads = 1; CLI::App app("Shared-memory graph compression tool"); @@ -38,6 +42,13 @@ int main(int argc, char *argv[]) { ->description(R"(Graph file formats: - metis - parhip)"); + app.add_option("--node-order", node_ordering) + ->transform(CLI::CheckedTransformer(get_node_orderings()).description("")) + ->description(R"(Criteria by which the nodes of the graph are sorted and rearranged: + - natural: keep node order of the graph (do not rearrange) + - deg-buckets: sort nodes by degree bucket and rearrange accordingly + - implicit-deg-buckets: nodes of the input graph are sorted by deg-buckets order)") + ->capture_default_str(); app.add_option("-t,--threads", num_threads, "Number of threads"); CLI11_PARSE(app, argc, argv); @@ -48,9 +59,11 @@ int main(int argc, char *argv[]) { CompressedGraph graph = [&] { switch (graph_file_format) { case GraphFileFormat::METIS: - return *metis::compress_read(graph_filename, false, false); + return *metis::compress_read( + graph_filename, node_ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS, false + ); case GraphFileFormat::PARHIP: - return parhip::compressed_read(graph_filename, false); + return parhip::compressed_read_parallel(graph_filename, node_ordering); default: __builtin_unreachable(); }