From 8507cbf63db2f349136b266d3e6e787b189f45a0 Mon Sep 17 00:00:00 2001 From: Alex Barghi <105237337+alexbarghi-nv@users.noreply.github.com> Date: Mon, 13 Jan 2025 13:16:10 -0500 Subject: [PATCH] [FEA] Heterogeneous Distributed Sampling (#4795) Adds support for heterogeneous distributed sampling to the cuGraph distributed sampler. Prerequisite for exposing this functionality to cuGraph-PyG. Has been initially tested with cuGraph-PyG. Updates the distributed sampler to use the new sampling API. Merge after #4775, #4827, #4820 Closes #4773 Closes #4401 Authors: - Alex Barghi (https://github.com/alexbarghi-nv) - Joseph Nke (https://github.com/jnke2016) - Ralph Liu (https://github.com/nv-rliu) Approvers: - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/4795 --- .../gnn/data_loading/dist_io/writer.py | 79 ++-- .../cugraph/gnn/data_loading/dist_sampler.py | 369 +++++++----------- .../tests/sampling/test_bulk_sampler.py | 6 +- .../tests/sampling/test_bulk_sampler_mg.py | 6 +- .../tests/sampling/test_dist_sampler.py | 143 ++++++- .../tests/sampling/test_dist_sampler_mg.py | 8 +- .../heterogeneous_biased_neighbor_sample.pyx | 3 +- .../heterogeneous_uniform_neighbor_sample.pyx | 3 +- .../homogeneous_biased_neighbor_sample.pyx | 3 +- .../homogeneous_uniform_neighbor_sample.pyx | 3 +- 10 files changed, 349 insertions(+), 274 deletions(-) diff --git a/python/cugraph/cugraph/gnn/data_loading/dist_io/writer.py b/python/cugraph/cugraph/gnn/data_loading/dist_io/writer.py index f8ad4719a76..fe5d169731e 100644 --- a/python/cugraph/cugraph/gnn/data_loading/dist_io/writer.py +++ b/python/cugraph/cugraph/gnn/data_loading/dist_io/writer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -79,9 +79,15 @@ def get_reader( return DistSampleReader(self._directory, format=self._format, rank=rank) def __write_minibatches_coo(self, minibatch_dict): - has_edge_ids = minibatch_dict["edge_id"] is not None - has_edge_types = minibatch_dict["edge_type"] is not None - has_weights = minibatch_dict["weight"] is not None + has_edge_ids = ( + "edge_id" in minibatch_dict and minibatch_dict["edge_id"] is not None + ) + has_edge_types = ( + "edge_type" in minibatch_dict and minibatch_dict["edge_type"] is not None + ) + has_weights = ( + "weight" in minibatch_dict and minibatch_dict["weight"] is not None + ) if minibatch_dict["renumber_map"] is None: raise ValueError( @@ -92,13 +98,12 @@ def __write_minibatches_coo(self, minibatch_dict): if len(minibatch_dict["batch_id"]) == 0: return - fanout_length = (len(minibatch_dict["label_hop_offsets"]) - 1) // len( - minibatch_dict["batch_id"] - ) + fanout_length = len(minibatch_dict["fanout"]) + total_num_batches = ( + len(minibatch_dict["label_hop_offsets"]) - 1 + ) / fanout_length - for p in range( - 0, int(ceil(len(minibatch_dict["batch_id"]) / self.__batches_per_partition)) - ): + for p in range(0, int(ceil(total_num_batches / self.__batches_per_partition))): partition_start = p * (self.__batches_per_partition) partition_end = (p + 1) * (self.__batches_per_partition) @@ -106,8 +111,9 @@ def __write_minibatches_coo(self, minibatch_dict): partition_start * fanout_length : partition_end * fanout_length + 1 ] - batch_id_array_p = minibatch_dict["batch_id"][partition_start:partition_end] - start_batch_id = batch_id_array_p[0] + num_batches_p = len(label_hop_offsets_array_p) - 1 + + start_batch_id = minibatch_dict["batch_start"] input_offsets_p = minibatch_dict["input_offsets"][ partition_start : (partition_end + 1) @@ -171,7 +177,7 @@ def __write_minibatches_coo(self, minibatch_dict): } ) - end_batch_id = start_batch_id + len(batch_id_array_p) - 1 + end_batch_id = start_batch_id + num_batches_p - 1 rank = minibatch_dict["rank"] if "rank" in minibatch_dict else 0 full_output_path = os.path.join( @@ -188,9 +194,15 @@ def __write_minibatches_coo(self, minibatch_dict): ) def __write_minibatches_csr(self, minibatch_dict): - has_edge_ids = minibatch_dict["edge_id"] is not None - has_edge_types = minibatch_dict["edge_type"] is not None - has_weights = minibatch_dict["weight"] is not None + has_edge_ids = ( + "edge_id" in minibatch_dict and minibatch_dict["edge_id"] is not None + ) + has_edge_types = ( + "edge_type" in minibatch_dict and minibatch_dict["edge_type"] is not None + ) + has_weights = ( + "weight" in minibatch_dict and minibatch_dict["weight"] is not None + ) if minibatch_dict["renumber_map"] is None: raise ValueError( @@ -201,13 +213,12 @@ def __write_minibatches_csr(self, minibatch_dict): if len(minibatch_dict["batch_id"]) == 0: return - fanout_length = (len(minibatch_dict["label_hop_offsets"]) - 1) // len( - minibatch_dict["batch_id"] - ) + fanout_length = len(minibatch_dict["fanout"]) + total_num_batches = ( + len(minibatch_dict["label_hop_offsets"]) - 1 + ) / fanout_length - for p in range( - 0, int(ceil(len(minibatch_dict["batch_id"]) / self.__batches_per_partition)) - ): + for p in range(0, int(ceil(total_num_batches / self.__batches_per_partition))): partition_start = p * (self.__batches_per_partition) partition_end = (p + 1) * (self.__batches_per_partition) @@ -215,8 +226,9 @@ def __write_minibatches_csr(self, minibatch_dict): partition_start * fanout_length : partition_end * fanout_length + 1 ] - batch_id_array_p = minibatch_dict["batch_id"][partition_start:partition_end] - start_batch_id = batch_id_array_p[0] + num_batches_p = len(label_hop_offsets_array_p) - 1 + + start_batch_id = minibatch_dict["batch_start"] input_offsets_p = minibatch_dict["input_offsets"][ partition_start : (partition_end + 1) @@ -292,7 +304,7 @@ def __write_minibatches_csr(self, minibatch_dict): } ) - end_batch_id = start_batch_id + len(batch_id_array_p) - 1 + end_batch_id = start_batch_id + num_batches_p - 1 rank = minibatch_dict["rank"] if "rank" in minibatch_dict else 0 full_output_path = os.path.join( @@ -309,12 +321,19 @@ def __write_minibatches_csr(self, minibatch_dict): ) def write_minibatches(self, minibatch_dict): - if (minibatch_dict["majors"] is not None) and ( - minibatch_dict["minors"] is not None - ): + if "minors" not in minibatch_dict: + raise ValueError("invalid columns") + + # PLC API specifies this behavior for empty input + # This needs to be handled here to avoid causing a hang + if len(minibatch_dict["minors"]) == 0: + return + + if "majors" in minibatch_dict and minibatch_dict["majors"] is not None: self.__write_minibatches_coo(minibatch_dict) - elif (minibatch_dict["major_offsets"] is not None) and ( - minibatch_dict["minors"] is not None + elif ( + "major_offsets" in minibatch_dict + and minibatch_dict["major_offsets"] is not None ): self.__write_minibatches_csr(minibatch_dict) else: diff --git a/python/cugraph/cugraph/gnn/data_loading/dist_sampler.py b/python/cugraph/cugraph/gnn/data_loading/dist_sampler.py index 0ff38741e1a..2edafe95716 100644 --- a/python/cugraph/cugraph/gnn/data_loading/dist_sampler.py +++ b/python/cugraph/cugraph/gnn/data_loading/dist_sampler.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -75,7 +75,7 @@ def __init__( def sample_batches( self, seeds: TensorType, - batch_ids: TensorType, + batch_id_offsets: TensorType, random_state: int = 0, assume_equal_input_size: bool = False, ) -> Dict[str, TensorType]: @@ -87,8 +87,10 @@ def sample_batches( ---------- seeds: TensorType Input seeds for a single call group (node ids). - batch_ids: TensorType - The batch id for each seed. + batch_id_offsets: TensorType + Offsets (start/end) of each batch. i.e. 0, 5, 10 + corresponds to 2 batches, the first from index 0-4, + inclusive, and the second from index 5-9, inclusive. random_state: int The random seed to use for sampling. assume_equal_input_size: bool @@ -102,78 +104,6 @@ def sample_batches( """ raise NotImplementedError("Must be implemented by subclass") - def get_label_list_and_output_rank( - self, local_label_list: TensorType, assume_equal_input_size: bool = False - ): - """ - Computes the label list and output rank mapping for - the list of labels (batch ids). - Subclasses may override this as needed depending on their - memory and compute constraints. - - Parameters - ---------- - local_label_list: TensorType - The list of unique labels on this rank. - assume_equal_input_size: bool - If True, assumes that all ranks have the same number of inputs (batches) - and skips some synchronization/gathering accordingly. - - Returns - ------- - label_list: TensorType - The global label list containing all labels used across ranks. - label_to_output_comm_rank: TensorType - The global mapping of labels to ranks. - """ - torch = import_optional("torch") - - world_size = torch.distributed.get_world_size() - - if assume_equal_input_size: - num_batches = len(local_label_list) * world_size - label_list = torch.empty((num_batches,), dtype=torch.int32, device="cuda") - w = torch.distributed.all_gather_into_tensor( - label_list, local_label_list, async_op=True - ) - - label_to_output_comm_rank = torch.concat( - [ - torch.full( - (len(local_label_list),), r, dtype=torch.int32, device="cuda" - ) - for r in range(world_size) - ] - ) - else: - num_batches = torch.tensor( - [len(local_label_list)], device="cuda", dtype=torch.int64 - ) - num_batches_all_ranks = torch.empty( - (world_size,), device="cuda", dtype=torch.int64 - ) - torch.distributed.all_gather_into_tensor(num_batches_all_ranks, num_batches) - - label_list = [ - torch.empty((n,), dtype=torch.int32, device="cuda") - for n in num_batches_all_ranks - ] - w = torch.distributed.all_gather( - label_list, local_label_list, async_op=True - ) - - label_to_output_comm_rank = torch.concat( - [ - torch.full((num_batches_r,), r, device="cuda", dtype=torch.int32) - for r, num_batches_r in enumerate(num_batches_all_ranks) - ] - ) - - w.wait() - if isinstance(label_list, list): - label_list = torch.concat(label_list) - return label_list, label_to_output_comm_rank - def get_start_batch_offset( self, local_num_batches: int, assume_equal_input_size: bool = False ) -> Tuple[int, bool]: @@ -240,23 +170,10 @@ def __sample_from_nodes_func( current_seeds, current_ix = current_seeds_and_ix - current_batches = torch.arange( - batch_id_start + call_id * batches_per_call, - batch_id_start - + call_id * batches_per_call - + int(ceil(len(current_seeds))) - + 1, - device="cuda", - dtype=torch.int32, - ) - - current_batches = current_batches.repeat_interleave(batch_size)[ - : len(current_seeds) - ] - # do qr division to get the number of batch_size batches and the # size of the last batch num_full, last_count = divmod(len(current_seeds), batch_size) + input_offsets = torch.concatenate( [ torch.tensor([0], device="cuda", dtype=torch.int64), @@ -269,10 +186,10 @@ def __sample_from_nodes_func( minibatch_dict = self.sample_batches( seeds=current_seeds, - batch_ids=current_batches, + batch_id_offsets=input_offsets, random_state=random_state, - assume_equal_input_size=assume_equal_input_size, ) + minibatch_dict["input_index"] = current_ix.cuda() minibatch_dict["input_offsets"] = input_offsets @@ -286,10 +203,19 @@ def __sample_from_nodes_func( if v is not None } - return iter([(minibatch_dict, current_batches[0], current_batches[-1])]) + return iter( + [ + ( + minibatch_dict, + batch_id_start, + batch_id_start + input_offsets.numel() - 2, + ) + ] + ) else: + minibatch_dict["batch_start"] = batch_id_start self.__writer.write_minibatches(minibatch_dict) - return None + return batch_id_start + input_offsets.numel() - 1 def __get_call_groups( self, @@ -370,6 +296,8 @@ def sample_from_nodes( if input_id is None: input_id = torch.arange(num_seeds, dtype=torch.int64, device="cpu") + else: + input_id = torch.as_tensor(input_id, device="cpu") local_num_batches = int(ceil(num_seeds / batch_size)) batch_id_start, input_size_is_equal = self.get_start_batch_offset( @@ -383,13 +311,13 @@ def sample_from_nodes( assume_equal_input_size=input_size_is_equal, ) - sample_args = ( + sample_args = [ batch_id_start, batch_size, batches_per_call, random_state, input_size_is_equal, - ) + ] if self.__writer is None: # Buffered sampling @@ -403,7 +331,7 @@ def sample_from_nodes( for i, current_seeds_and_ix in enumerate( zip(nodes_call_groups, index_call_groups) ): - self.__sample_from_nodes_func( + sample_args[0] = self.__sample_from_nodes_func( i, current_seeds_and_ix, *sample_args, @@ -493,21 +421,13 @@ def __sample_from_edges_func( if len(u) > 0: current_seeds = torch.concat([a[0] for a, _ in u]) current_inv = torch.concat([a[1][i] for a, i in u]) - current_batches = torch.concat( - [ - torch.full( - (a[0].numel(),), - i + batch_id_start + (call_id * batches_per_call), - device="cuda", - dtype=torch.int32, - ) - for i, (a, _) in enumerate(u) - ] + current_batch_offsets = torch.tensor( + [a[0].numel() for (a, _) in u], device="cuda", dtype=torch.int64 ) else: current_seeds = torch.tensor([], device="cuda", dtype=torch.int64) current_inv = torch.tensor([], device="cuda", dtype=torch.int64) - current_batches = torch.tensor([], device="cuda", dtype=torch.int32) + current_batch_offsets = torch.tensor([], device="cuda", dtype=torch.int64) del u # Join with the leftovers @@ -519,28 +439,33 @@ def __sample_from_edges_func( leftover_seeds, lui = leftover_seeds.unique_consecutive(return_inverse=True) leftover_inv = lui[lz] - current_seeds = torch.concat([current_seeds, leftover_seeds]) - current_inv = torch.concat([current_inv, leftover_inv]) - current_batches = torch.concat( - [ - current_batches, - torch.full( - (leftover_seeds.numel(),), - (current_batches[-1] + 1) if current_batches.numel() > 0 else 0, - device="cuda", - dtype=torch.int32, - ), - ] - ) + if leftover_seeds.numel() > 0: + current_seeds = torch.concat([current_seeds, leftover_seeds]) + current_inv = torch.concat([current_inv, leftover_inv]) + current_batch_offsets = torch.concat( + [ + current_batch_offsets, + torch.tensor( + [leftover_seeds.numel()], device="cuda", dtype=torch.int64 + ), + ] + ) del leftover_seeds del lz del lui + if current_batch_offsets.numel() > 0: + current_batch_offsets = torch.concat( + [ + torch.tensor([0], device="cuda", dtype=torch.int64), + current_batch_offsets, + ] + ).cumsum(-1) + minibatch_dict = self.sample_batches( seeds=current_seeds, - batch_ids=current_batches, + batch_id_offsets=current_batch_offsets, random_state=random_state, - assume_equal_input_size=assume_equal_input_size, ) minibatch_dict["input_index"] = current_ix.cuda() minibatch_dict["input_offsets"] = input_offsets @@ -558,10 +483,19 @@ def __sample_from_edges_func( if v is not None } - return iter([(minibatch_dict, current_batches[0], current_batches[-1])]) + return iter( + [ + ( + minibatch_dict, + batch_id_start, + batch_id_start + current_batch_offsets.numel() - 2, + ) + ] + ) else: + minibatch_dict["batch_start"] = batch_id_start self.__writer.write_minibatches(minibatch_dict) - return None + return batch_id_start + current_batch_offsets.numel() - 1 def sample_from_edges( self, @@ -618,13 +552,13 @@ def sample_from_edges( assume_equal_input_size=input_size_is_equal, ) - sample_args = ( + sample_args = [ batch_id_start, batch_size, batches_per_call, random_state, input_size_is_equal, - ) + ] if self.__writer is None: # Buffered sampling @@ -638,7 +572,7 @@ def sample_from_edges( for i, current_seeds_and_ix in enumerate( zip(edges_call_groups, index_call_groups) ): - self.__sample_from_edges_func( + sample_args[0] = self.__sample_from_edges_func( i, current_seeds_and_ix, *sample_args, @@ -699,13 +633,21 @@ def __init__( compress_per_hop: bool = False, with_replacement: bool = False, biased: bool = False, + heterogeneous: bool = False, + vertex_type_offsets: Optional[TensorType] = None, + num_edge_types: int = 1, ): + self.__fanout = fanout - self.__prior_sources_behavior = prior_sources_behavior - self.__deduplicate_sources = deduplicate_sources - self.__compress_per_hop = compress_per_hop - self.__compression = compression - self.__with_replacement = with_replacement + self.__func_kwargs = { + "h_fan_out": np.asarray(fanout, dtype="int32"), + "prior_sources_behavior": prior_sources_behavior, + "retain_seeds": retain_original_seeds, + "deduplicate_sources": deduplicate_sources, + "compress_per_hop": compress_per_hop, + "compression": compression, + "with_replacement": with_replacement, + } # It is currently required that graphs are weighted for biased # sampling. So setting the function here is safe. In the future, @@ -713,28 +655,66 @@ def __init__( # change. # TODO allow func to be a call to a future remote sampling API # if the provided graph is in another process (rapidsai/cugraph#4623). - self.__func = ( - pylibcugraph.biased_neighbor_sample - if biased - else pylibcugraph.uniform_neighbor_sample - ) + if heterogeneous: + if vertex_type_offsets is None: + raise ValueError("Heterogeneous sampling requires vertex type offsets.") + self.__func = ( + pylibcugraph.heterogeneous_biased_neighbor_sample + if biased + else pylibcugraph.heterogeneous_uniform_neighbor_sample + ) + self.__func_kwargs["num_edge_types"] = num_edge_types + self.__func_kwargs["vertex_type_offsets"] = cupy.asarray( + vertex_type_offsets + ) + else: + self.__func = ( + pylibcugraph.homogeneous_biased_neighbor_sample + if biased + else pylibcugraph.homogeneous_uniform_neighbor_sample + ) + + if num_edge_types > 1 and not heterogeneous: + raise ValueError( + "Heterogeneous sampling must be selected if there is > 1 edge type." + ) super().__init__( graph, writer, - local_seeds_per_call=self.__calc_local_seeds_per_call(local_seeds_per_call), + local_seeds_per_call=self.__calc_local_seeds_per_call( + local_seeds_per_call, + heterogeneous=heterogeneous, + num_edge_types=num_edge_types, + ), retain_original_seeds=retain_original_seeds, ) - def __calc_local_seeds_per_call(self, local_seeds_per_call: Optional[int] = None): + def __calc_local_seeds_per_call( + self, + local_seeds_per_call: Optional[int] = None, + heterogeneous: bool = False, + num_edge_types: int = 1, + ): torch = import_optional("torch") + fanout = self.__fanout + if local_seeds_per_call is None: - if len([x for x in self.__fanout if x <= 0]) > 0: + if len([x for x in fanout if x <= 0]) > 0: return NeighborSampler.UNKNOWN_VERTICES_DEFAULT + if heterogeneous: + if len(fanout) % num_edge_types != 0: + raise ValueError(f"Illegal fanout for {num_edge_types} edge types.") + num_hops = len(fanout) // num_edge_types + fanout = [ + sum([fanout[t * num_hops + h] for t in range(num_edge_types)]) + for h in range(num_hops) + ] + total_memory = torch.cuda.get_device_properties(0).total_memory - fanout_prod = reduce(lambda x, y: x * y, self.__fanout) + fanout_prod = reduce(lambda x, y: x * y, fanout) return int( NeighborSampler.BASE_VERTICES_PER_BYTE * total_memory / fanout_prod ) @@ -744,94 +724,25 @@ def __calc_local_seeds_per_call(self, local_seeds_per_call: Optional[int] = None def sample_batches( self, seeds: TensorType, - batch_ids: TensorType, + batch_id_offsets: TensorType, random_state: int = 0, - assume_equal_input_size: bool = False, ) -> Dict[str, TensorType]: torch = import_optional("torch") - if self.is_multi_gpu: - rank = torch.distributed.get_rank() - - batch_ids = batch_ids.to(device="cuda", dtype=torch.int32) - local_label_list = torch.unique(batch_ids) - - label_list, label_to_output_comm_rank = self.get_label_list_and_output_rank( - local_label_list, assume_equal_input_size=assume_equal_input_size - ) - - if self._retain_original_seeds: - label_offsets = torch.concat( - [ - torch.searchsorted(batch_ids, local_label_list), - torch.tensor( - [batch_ids.shape[0]], device="cuda", dtype=torch.int64 - ), - ] - ) - else: - label_offsets = None - - sampling_results_dict = self.__func( - self._resource_handle, - self._graph, - start_list=cupy.asarray(seeds), - batch_id_list=cupy.asarray(batch_ids), - label_list=cupy.asarray(label_list), - label_to_output_comm_rank=cupy.asarray(label_to_output_comm_rank), - h_fan_out=np.array(self.__fanout, dtype="int32"), - with_replacement=self.__with_replacement, - do_expensive_check=False, - with_edge_properties=True, - random_state=random_state + rank, - prior_sources_behavior=self.__prior_sources_behavior, - deduplicate_sources=self.__deduplicate_sources, - return_hops=True, - renumber=True, - compression=self.__compression, - compress_per_hop=self.__compress_per_hop, - retain_seeds=self._retain_original_seeds, - label_offsets=None - if label_offsets is None - else cupy.asarray(label_offsets), - return_dict=True, - ) - sampling_results_dict["rank"] = rank - else: - if self._retain_original_seeds: - batch_ids = batch_ids.to(device="cuda", dtype=torch.int32) - local_label_list = torch.unique(batch_ids) - label_offsets = torch.concat( - [ - torch.searchsorted(batch_ids, local_label_list), - torch.tensor( - [batch_ids.shape[0]], device="cuda", dtype=torch.int64 - ), - ] - ) - else: - label_offsets = None - - sampling_results_dict = self.__func( - self._resource_handle, - self._graph, - start_list=cupy.asarray(seeds), - batch_id_list=cupy.asarray(batch_ids), - h_fan_out=np.array(self.__fanout, dtype="int32"), - with_replacement=self.__with_replacement, - do_expensive_check=False, - with_edge_properties=True, - random_state=random_state, - prior_sources_behavior=self.__prior_sources_behavior, - deduplicate_sources=self.__deduplicate_sources, - return_hops=True, - renumber=True, - compression=self.__compression, - compress_per_hop=self.__compress_per_hop, - retain_seeds=self._retain_original_seeds, - label_offsets=None - if label_offsets is None - else cupy.asarray(label_offsets), - return_dict=True, - ) - + rank = torch.distributed.get_rank() if self.is_multi_gpu else 0 + + kwargs = { + "resource_handle": self._resource_handle, + "input_graph": self._graph, + "start_vertex_list": cupy.asarray(seeds), + "starting_vertex_label_offsets": cupy.asarray(batch_id_offsets), + "renumber": True, + "return_hops": True, + "do_expensive_check": False, + "random_state": random_state + rank, + } + kwargs.update(self.__func_kwargs) + sampling_results_dict = self.__func(**kwargs) + + sampling_results_dict["fanout"] = cupy.array(self.__fanout, dtype="int32") + sampling_results_dict["rank"] = rank return sampling_results_dict diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py index 3c5d6428001..cffa399c6ca 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# Copyright (c) 2023-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -278,6 +278,10 @@ def test_bulk_sampler_empty_batches(scratch_dir): assert len(os.listdir(samples_path)) == 1 + # There are 3 batches [0, 1, 2] where batch 1 has no results. In fact, seeds + # [7, 8, 9] have no outgoing edges. The previous implementation returned and + # offsets array omitting seeds with no outgoing edges from the + # edge_label_offsets which is no longer the case df = cudf.read_parquet(os.path.join(samples_path, "batch=0-1.parquet")) assert df[ diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py index 3fddb8f405b..fa73ce95792 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# Copyright (c) 2023-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -228,6 +228,10 @@ def test_bulk_sampler_empty_batches(dask_client, scratch_dir): assert len(os.listdir(samples_path)) == 1 + # There are 3 batches [0, 1, 2] where batch 1 has no results. In fact, seeds + # [7, 8, 9] have no outgoing edges. The previous implementation returned and + # offsets array omitting seeds with no outgoing edges from the + # edge_label_offsets which is no longer the case df = cudf.read_parquet(os.path.join(samples_path, "batch=0-1.parquet")) assert df[ diff --git a/python/cugraph/cugraph/tests/sampling/test_dist_sampler.py b/python/cugraph/cugraph/tests/sampling/test_dist_sampler.py index 64db0232fb1..a30a8f90b80 100644 --- a/python/cugraph/cugraph/tests/sampling/test_dist_sampler.py +++ b/python/cugraph/cugraph/tests/sampling/test_dist_sampler.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -79,13 +79,12 @@ def test_dist_sampler_simple( ) recovered_samples = cudf.read_parquet(samples_path) - print(recovered_samples) original_el = karate.get_edgelist() for b in range(len(seeds) // batch_size): el_start = recovered_samples.label_hop_offsets.iloc[b * len(fanout)] el_end = recovered_samples.label_hop_offsets.iloc[(b + 1) * len(fanout)] - print(el_start, el_end) + src = recovered_samples.majors.iloc[el_start:el_end] dst = recovered_samples.minors.iloc[el_start:el_end] edge_id = recovered_samples.edge_id.iloc[el_start:el_end] @@ -107,7 +106,7 @@ def test_dist_sampler_simple( @pytest.mark.sg @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") @pytest.mark.parametrize("seeds_per_call", [4, 5, 10]) -@pytest.mark.parametrize("compression", ["COO", "CSR"]) +@pytest.mark.parametrize("compression", ["CSR", "COO"]) def test_dist_sampler_buffered_in_memory( scratch_dir: str, karate_graph: SGGraph, seeds_per_call: int, compression: str ): @@ -152,10 +151,142 @@ def test_dist_sampler_buffered_in_memory( br, bs, be = buffered_results[k] ur, us, ue = unbuffered_results[k] - assert bs == us - assert be == ue + assert (be - bs) == (ue - us) for col in ur.columns: assert (br[col].dropna() == ur[col].dropna()).all() shutil.rmtree(samples_path) + + +@pytest.mark.sg +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +def test_dist_sampler_hetero_from_nodes(): + props = GraphProperties( + is_symmetric=False, + is_multigraph=True, + ) + + handle = ResourceHandle() + + srcs = cupy.array([4, 5, 6, 7, 8, 9, 8, 9, 8, 7, 6, 5, 4, 5]) + dsts = cupy.array([0, 1, 2, 3, 3, 0, 4, 5, 6, 8, 7, 8, 9, 9]) + eids = cupy.array([0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 6, 7]) + etps = cupy.array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1], dtype="int32") + + graph = SGGraph( + handle, + props, + srcs, + dsts, + vertices_array=cupy.arange(10), + edge_id_array=eids, + edge_type_array=etps, + weight_array=cupy.ones((14,), dtype="float32"), + ) + + sampler = UniformNeighborSampler( + graph, + fanout=[-1, -1, -1, -1], + writer=None, + compression="COO", + heterogeneous=True, + vertex_type_offsets=cupy.array([0, 4, 10]), + num_edge_types=2, + deduplicate_sources=True, + ) + + out = sampler.sample_from_nodes( + nodes=cupy.array([4, 5]), + input_id=cupy.array([5, 10]), + ) + + out = [z for z in out] + assert len(out) == 1 + out, _, _ = out[0] + + lho = out["label_type_hop_offsets"] + + # Edge type 0 + emap = out["edge_renumber_map"][ + out["edge_renumber_map_offsets"][0] : out["edge_renumber_map_offsets"][1] + ] + + smap = out["map"][out["renumber_map_offsets"][1] : out["renumber_map_offsets"][2]] + + dmap = out["map"][out["renumber_map_offsets"][0] : out["renumber_map_offsets"][1]] + + # Edge type 0, hop 0 + hop_start = lho[0] + hop_end = lho[1] + + assert hop_end - hop_start == 2 + + e = out["edge_id"][hop_start:hop_end] + e = emap[e] + assert sorted(e.tolist()) == [0, 1] + + s = cupy.asarray(smap[out["majors"][hop_start:hop_end]]) + d = cupy.asarray(dmap[out["minors"][hop_start:hop_end]]) + + assert sorted(s.tolist()) == [4, 5] + assert sorted(d.tolist()) == [0, 1] + + # Edge type 0, hop 1 + hop_start = int(lho[1]) + hop_end = int(lho[2]) + + assert hop_end - hop_start == 2 + + e = out["edge_id"][hop_start:hop_end] + e = emap[e] + assert sorted(e.tolist()) == [4, 5] + + s = cupy.asarray(smap[out["majors"][hop_start:hop_end]]) + d = cupy.asarray(dmap[out["minors"][hop_start:hop_end]]) + + assert sorted(s.tolist()) == [8, 9] + assert sorted(d.tolist()) == [0, 3] + + ############################# + + # Edge type 1 + emap = out["edge_renumber_map"][ + out["edge_renumber_map_offsets"][1] : out["edge_renumber_map_offsets"][2] + ] + + smap = out["map"][out["renumber_map_offsets"][1] : out["renumber_map_offsets"][2]] + + dmap = smap + + # Edge type 1, hop 0 + hop_start = lho[2] + hop_end = lho[3] + + assert hop_end - hop_start == 3 + + e = out["edge_id"][hop_start:hop_end] + e = emap[e] + assert sorted(e.tolist()) == [5, 6, 7] + + s = cupy.asarray(smap[out["majors"][hop_start:hop_end]]) + d = cupy.asarray(dmap[out["minors"][hop_start:hop_end]]) + + assert sorted(s.tolist()) == [4, 5, 5] + assert sorted(d.tolist()) == [8, 9, 9] + + # Edge type 1, hop 1 + hop_start = lho[3] + hop_end = lho[4] + + assert hop_end - hop_start == 3 + + e = out["edge_id"][hop_start:hop_end] + e = emap[e] + assert sorted(e.tolist()) == [0, 1, 2] + + s = cupy.asarray(smap[out["majors"][hop_start:hop_end]]) + d = cupy.asarray(dmap[out["minors"][hop_start:hop_end]]) + + assert sorted(s.tolist()) == [8, 8, 9] + assert sorted(d.tolist()) == [4, 5, 6] diff --git a/python/cugraph/cugraph/tests/sampling/test_dist_sampler_mg.py b/python/cugraph/cugraph/tests/sampling/test_dist_sampler_mg.py index 5bb541d6cf3..3cc7859b5d9 100644 --- a/python/cugraph/cugraph/tests/sampling/test_dist_sampler_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_dist_sampler_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -59,6 +59,9 @@ def karate_mg_graph(rank, world_size): [el.src.astype("int64")], [el.dst.astype("int64")], edge_id_array=[el.eid], + vertices_array=[ + cupy.array_split(cupy.arange(34, dtype="int64"), world_size)[rank] + ], ) return G @@ -290,8 +293,7 @@ def run_test_dist_sampler_buffered_in_memory( br, bs, be = buffered_results[k] ur, us, ue = unbuffered_results[k] - assert bs == us - assert be == ue + assert be - bs == ue - us for col in ur.columns: assert (br[col].dropna() == ur[col].dropna()).all() diff --git a/python/pylibcugraph/pylibcugraph/heterogeneous_biased_neighbor_sample.pyx b/python/pylibcugraph/pylibcugraph/heterogeneous_biased_neighbor_sample.pyx index ee0e85fa3bd..44fe0644a64 100644 --- a/python/pylibcugraph/pylibcugraph/heterogeneous_biased_neighbor_sample.pyx +++ b/python/pylibcugraph/pylibcugraph/heterogeneous_biased_neighbor_sample.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. +# Copyright (c) 2022-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -89,6 +89,7 @@ def heterogeneous_biased_neighbor_sample(ResourceHandle resource_handle, starting_vertex_label_offsets, vertex_type_offsets, h_fan_out, + * num_edge_types, bool_t with_replacement, bool_t do_expensive_check, diff --git a/python/pylibcugraph/pylibcugraph/heterogeneous_uniform_neighbor_sample.pyx b/python/pylibcugraph/pylibcugraph/heterogeneous_uniform_neighbor_sample.pyx index dbee65323d7..4488815a6d9 100644 --- a/python/pylibcugraph/pylibcugraph/heterogeneous_uniform_neighbor_sample.pyx +++ b/python/pylibcugraph/pylibcugraph/heterogeneous_uniform_neighbor_sample.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. +# Copyright (c) 2022-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -86,6 +86,7 @@ def heterogeneous_uniform_neighbor_sample(ResourceHandle resource_handle, starting_vertex_label_offsets, vertex_type_offsets, h_fan_out, + *, num_edge_types, bool_t with_replacement, bool_t do_expensive_check, diff --git a/python/pylibcugraph/pylibcugraph/homogeneous_biased_neighbor_sample.pyx b/python/pylibcugraph/pylibcugraph/homogeneous_biased_neighbor_sample.pyx index cbd7a5dcffb..8fd88253028 100644 --- a/python/pylibcugraph/pylibcugraph/homogeneous_biased_neighbor_sample.pyx +++ b/python/pylibcugraph/pylibcugraph/homogeneous_biased_neighbor_sample.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. +# Copyright (c) 2022-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -88,6 +88,7 @@ def homogeneous_biased_neighbor_sample(ResourceHandle resource_handle, start_vertex_list, starting_vertex_label_offsets, h_fan_out, + *, bool_t with_replacement, bool_t do_expensive_check, prior_sources_behavior=None, diff --git a/python/pylibcugraph/pylibcugraph/homogeneous_uniform_neighbor_sample.pyx b/python/pylibcugraph/pylibcugraph/homogeneous_uniform_neighbor_sample.pyx index bb88ffcf6af..14f474c5bd3 100644 --- a/python/pylibcugraph/pylibcugraph/homogeneous_uniform_neighbor_sample.pyx +++ b/python/pylibcugraph/pylibcugraph/homogeneous_uniform_neighbor_sample.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. +# Copyright (c) 2022-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -85,6 +85,7 @@ def homogeneous_uniform_neighbor_sample(ResourceHandle resource_handle, start_vertex_list, starting_vertex_label_offsets, h_fan_out, + *, bool_t with_replacement, bool_t do_expensive_check, prior_sources_behavior=None,