diff --git a/docs/source/install/index.rst b/docs/source/install/index.rst index 25249e8e36b9..23391ebce1cd 100644 --- a/docs/source/install/index.rst +++ b/docs/source/install/index.rst @@ -5,11 +5,13 @@ System requirements ------------------- DGL works with the following operating systems: -* Ubuntu 16.04 +* Ubuntu 20.04+ +* CentOS 8+ +* RHEL 8+ * macOS X * Windows 10 -DGL requires Python version 3.6, 3.7, 3.8 or 3.9. +DGL requires Python version 3.7, 3.8, 3.9, 3.10, 3.11. DGL supports multiple tensor libraries as backends, e.g., PyTorch, MXNet. For requirements on backends and how to select one, see :ref:`backends`. diff --git a/graphbolt/src/cuda/index_select_csc_impl.cu b/graphbolt/src/cuda/index_select_csc_impl.cu index d1a6a89af18f..ce8af7a9f615 100644 --- a/graphbolt/src/cuda/index_select_csc_impl.cu +++ b/graphbolt/src/cuda/index_select_csc_impl.cu @@ -14,12 +14,13 @@ #include #include "./common.h" +#include "./max_uva_threads.h" #include "./utils.h" namespace graphbolt { namespace ops { -constexpr int BLOCK_SIZE = 128; +constexpr int BLOCK_SIZE = CUDA_MAX_NUM_THREADS; // Given the in_degree array and a permutation, returns in_degree of the output // and the permuted and modified in_degree of the input. The modified in_degree @@ -130,7 +131,10 @@ std::tuple UVAIndexSelectCSCCopyIndices( torch::Tensor output_indices = torch::empty(output_size.value(), options.dtype(indices.scalar_type())); const dim3 block(BLOCK_SIZE); - const dim3 grid((edge_count_aligned + BLOCK_SIZE - 1) / BLOCK_SIZE); + const dim3 grid( + (std::min(edge_count_aligned, cuda::max_uva_threads.value_or(1 << 20)) + + BLOCK_SIZE - 1) / + BLOCK_SIZE); // Find the smallest integer type to store the coo_aligned_rows tensor. const int num_bits = cuda::NumberOfBits(num_nodes); diff --git a/graphbolt/src/cuda/index_select_impl.cu b/graphbolt/src/cuda/index_select_impl.cu index 389d2430f227..43fd144848b0 100644 --- a/graphbolt/src/cuda/index_select_impl.cu +++ b/graphbolt/src/cuda/index_select_impl.cu @@ -131,7 +131,7 @@ torch::Tensor UVAIndexSelectImpl_(torch::Tensor input, torch::Tensor index) { IndexSelectSingleKernel, num_blocks, num_threads, 0, input_ptr, input_len, index_sorted_ptr, return_len, ret_ptr, permutation_ptr); } else { - constexpr int BLOCK_SIZE = 512; + constexpr int BLOCK_SIZE = CUDA_MAX_NUM_THREADS; dim3 block(BLOCK_SIZE, 1); while (static_cast(block.x) >= 2 * aligned_feature_size) { block.x >>= 1; diff --git a/python/dgl/distributed/dist_graph.py b/python/dgl/distributed/dist_graph.py index 4defa3223937..5bf76498ec97 100644 --- a/python/dgl/distributed/dist_graph.py +++ b/python/dgl/distributed/dist_graph.py @@ -60,18 +60,21 @@ class InitGraphRequest(rpc.Request): with shared memory. """ - def __init__(self, graph_name): + def __init__(self, graph_name, use_graphbolt): self._graph_name = graph_name + self._use_graphbolt = use_graphbolt def __getstate__(self): - return self._graph_name + return self._graph_name, self._use_graphbolt def __setstate__(self, state): - self._graph_name = state + self._graph_name, self._use_graphbolt = state def process_request(self, server_state): if server_state.graph is None: - server_state.graph = _get_graph_from_shared_mem(self._graph_name) + server_state.graph = _get_graph_from_shared_mem( + self._graph_name, self._use_graphbolt + ) return InitGraphResponse(self._graph_name) @@ -153,13 +156,15 @@ def _exist_shared_mem_array(graph_name, name): return exist_shared_mem_array(_get_edata_path(graph_name, name)) -def _get_graph_from_shared_mem(graph_name): +def _get_graph_from_shared_mem(graph_name, use_graphbolt): """Get the graph from the DistGraph server. The DistGraph server puts the graph structure of the local partition in the shared memory. The client can access the graph structure and some metadata on nodes and edges directly through shared memory to reduce the overhead of data access. """ + if use_graphbolt: + return gb.load_from_shared_memory(graph_name) g, ntypes, etypes = heterograph_index.create_heterograph_from_shared_memory( graph_name ) @@ -524,6 +529,8 @@ class DistGraph: part_config : str, optional The path of partition configuration file generated by :py:meth:`dgl.distributed.partition.partition_graph`. It's used in the standalone mode. + use_graphbolt : bool, optional + Whether to load GraphBolt partition. Default: False. Examples -------- @@ -557,9 +564,15 @@ class DistGraph: manually setting up servers and trainers. The setup is not fully tested yet. """ - def __init__(self, graph_name, gpb=None, part_config=None): + def __init__( + self, graph_name, gpb=None, part_config=None, use_graphbolt=False + ): self.graph_name = graph_name + self._use_graphbolt = use_graphbolt if os.environ.get("DGL_DIST_MODE", "standalone") == "standalone": + assert ( + use_graphbolt is False + ), "GraphBolt is not supported in standalone mode." assert ( part_config is not None ), "When running in the standalone model, the partition config file is required" @@ -600,7 +613,9 @@ def __init__(self, graph_name, gpb=None, part_config=None): self._init(gpb) # Tell the backup servers to load the graph structure from shared memory. for server_id in range(self._client.num_servers): - rpc.send_request(server_id, InitGraphRequest(graph_name)) + rpc.send_request( + server_id, InitGraphRequest(graph_name, use_graphbolt) + ) for server_id in range(self._client.num_servers): rpc.recv_response() self._client.barrier() @@ -625,7 +640,9 @@ def _init(self, gpb): assert ( self._client is not None ), "Distributed module is not initialized. Please call dgl.distributed.initialize." - self._g = _get_graph_from_shared_mem(self.graph_name) + self._g = _get_graph_from_shared_mem( + self.graph_name, self._use_graphbolt + ) self._gpb = get_shared_mem_partition_book(self.graph_name) if self._gpb is None: self._gpb = gpb @@ -682,10 +699,10 @@ def _init_edata_store(self): self._edata_store[etype] = data def __getstate__(self): - return self.graph_name, self._gpb + return self.graph_name, self._gpb, self._use_graphbolt def __setstate__(self, state): - self.graph_name, gpb = state + self.graph_name, gpb, self._use_graphbolt = state self._init(gpb) self._init_ndata_store() @@ -1230,6 +1247,9 @@ def find_edges(self, edges, etype=None): tensor The destination node ID array. """ + assert ( + self._use_graphbolt is False + ), "find_edges is not supported in GraphBolt." if etype is None: assert ( len(self.etypes) == 1 diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 028935086d48..6928d24da534 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -638,6 +638,8 @@ def partition_graph( num_trainers_per_machine=1, objtype="cut", graph_formats=None, + use_graphbolt=False, + **kwargs, ): """Partition a graph for distributed training and store the partitions on files. @@ -811,6 +813,10 @@ def partition_graph( ``csc`` and ``csr``. If not specified, save one format only according to what format is available. If multiple formats are available, selection priority from high to low is ``coo``, ``csc``, ``csr``. + use_graphbolt : bool, optional + Whether to save partitions in GraphBolt format. Default: False. + kwargs : dict + Other keyword arguments for converting DGL partitions to GraphBolt. Returns ------- @@ -1298,7 +1304,8 @@ def get_homogeneous(g, balance_ntypes): ) ) - _dump_part_config(f"{out_path}/{graph_name}.json", part_metadata) + part_config = os.path.join(out_path, graph_name + ".json") + _dump_part_config(part_config, part_metadata) num_cuts = sim_g.num_edges() - tot_num_inner_edges if num_parts == 1: @@ -1309,6 +1316,12 @@ def get_homogeneous(g, balance_ntypes): ) ) + if use_graphbolt: + dgl_partition_to_graphbolt( + part_config, + **kwargs, + ) + if return_mapping: return orig_nids, orig_eids diff --git a/python/dgl/graphbolt/dataloader.py b/python/dgl/graphbolt/dataloader.py index f89e43e0c8d9..b0dd9daccfaf 100644 --- a/python/dgl/graphbolt/dataloader.py +++ b/python/dgl/graphbolt/dataloader.py @@ -1,6 +1,6 @@ """Graph Bolt DataLoaders""" -from queue import Queue +from collections import deque import torch import torch.utils.data @@ -69,18 +69,18 @@ def __init__(self, datapipe, buffer_size=1): raise ValueError( "'buffer_size' is required to be a positive integer." ) - self.buffer = Queue(buffer_size) + self.buffer = deque(maxlen=buffer_size) def __iter__(self): for data in self.datapipe: - if not self.buffer.full(): - self.buffer.put(data) + if len(self.buffer) < self.buffer.maxlen: + self.buffer.append(data) else: - return_data = self.buffer.get() - self.buffer.put(data) + return_data = self.buffer.popleft() + self.buffer.append(data) yield return_data - while not self.buffer.empty(): - yield self.buffer.get() + while len(self.buffer) > 0: + yield self.buffer.popleft() class Awaiter(dp.iter.IterDataPipe): diff --git a/python/dgl/graphbolt/impl/neighbor_sampler.py b/python/dgl/graphbolt/impl/neighbor_sampler.py index ef10d49d7584..605da8ff5ce3 100644 --- a/python/dgl/graphbolt/impl/neighbor_sampler.py +++ b/python/dgl/graphbolt/impl/neighbor_sampler.py @@ -1,9 +1,12 @@ """Neighbor subgraph samplers for GraphBolt.""" +from functools import partial + import torch from torch.utils.data import functional_datapipe from ..internal import compact_csc_format, unique_and_compact_csc_formats +from ..minibatch_transformer import MiniBatchTransformer from ..subgraph_sampler import SubgraphSampler from .sampled_subgraph_impl import SampledSubgraphImpl @@ -12,8 +15,66 @@ __all__ = ["NeighborSampler", "LayerNeighborSampler"] +@functional_datapipe("sample_per_layer") +class SamplePerLayer(MiniBatchTransformer): + """Sample neighbor edges from a graph for a single layer.""" + + def __init__(self, datapipe, sampler, fanout, replace, prob_name): + super().__init__(datapipe, self._sample_per_layer) + self.sampler = sampler + self.fanout = fanout + self.replace = replace + self.prob_name = prob_name + + def _sample_per_layer(self, minibatch): + subgraph = self.sampler( + minibatch._seed_nodes, self.fanout, self.replace, self.prob_name + ) + minibatch.sampled_subgraphs.insert(0, subgraph) + return minibatch + + +@functional_datapipe("compact_per_layer") +class CompactPerLayer(MiniBatchTransformer): + """Compact the sampled edges for a single layer.""" + + def __init__(self, datapipe, deduplicate): + super().__init__(datapipe, self._compact_per_layer) + self.deduplicate = deduplicate + + def _compact_per_layer(self, minibatch): + subgraph = minibatch.sampled_subgraphs[0] + seeds = minibatch._seed_nodes + if self.deduplicate: + ( + original_row_node_ids, + compacted_csc_format, + ) = unique_and_compact_csc_formats(subgraph.sampled_csc, seeds) + subgraph = SampledSubgraphImpl( + sampled_csc=compacted_csc_format, + original_column_node_ids=seeds, + original_row_node_ids=original_row_node_ids, + original_edge_ids=subgraph.original_edge_ids, + ) + else: + ( + original_row_node_ids, + compacted_csc_format, + ) = compact_csc_format(subgraph.sampled_csc, seeds) + subgraph = SampledSubgraphImpl( + sampled_csc=compacted_csc_format, + original_column_node_ids=seeds, + original_row_node_ids=original_row_node_ids, + original_edge_ids=subgraph.original_edge_ids, + ) + minibatch._seed_nodes = original_row_node_ids + minibatch.sampled_subgraphs[0] = subgraph + return minibatch + + @functional_datapipe("sample_neighbor") class NeighborSampler(SubgraphSampler): + # pylint: disable=abstract-method """Sample neighbor edges from a graph and return a subgraph. Functional name: :obj:`sample_neighbor`. @@ -95,6 +156,7 @@ class NeighborSampler(SubgraphSampler): )] """ + # pylint: disable=useless-super-delegation def __init__( self, datapipe, @@ -103,26 +165,19 @@ def __init__( replace=False, prob_name=None, deduplicate=True, + sampler=None, ): - super().__init__(datapipe) - self.graph = graph - # Convert fanouts to a list of tensors. - self.fanouts = [] - for fanout in fanouts: - if not isinstance(fanout, torch.Tensor): - fanout = torch.LongTensor([int(fanout)]) - self.fanouts.insert(0, fanout) - self.replace = replace - self.prob_name = prob_name - self.deduplicate = deduplicate - self.sampler = graph.sample_neighbors + if sampler is None: + sampler = graph.sample_neighbors + super().__init__( + datapipe, graph, fanouts, replace, prob_name, deduplicate, sampler + ) - def sample_subgraphs(self, seeds, seeds_timestamp): - subgraphs = [] - num_layers = len(self.fanouts) + def _prepare(self, node_type_to_id, minibatch): + seeds = minibatch._seed_nodes # Enrich seeds with all node types. if isinstance(seeds, dict): - ntypes = list(self.graph.node_type_to_id.keys()) + ntypes = list(node_type_to_id.keys()) # Loop over different seeds to extract the device they are on. device = None dtype = None @@ -134,42 +189,37 @@ def sample_subgraphs(self, seeds, seeds_timestamp): seeds = { ntype: seeds.get(ntype, default_tensor) for ntype in ntypes } - for hop in range(num_layers): - subgraph = self.sampler( - seeds, - self.fanouts[hop], - self.replace, - self.prob_name, + minibatch._seed_nodes = seeds + minibatch.sampled_subgraphs = [] + return minibatch + + @staticmethod + def _set_input_nodes(minibatch): + minibatch.input_nodes = minibatch._seed_nodes + return minibatch + + # pylint: disable=arguments-differ + def sampling_stages( + self, datapipe, graph, fanouts, replace, prob_name, deduplicate, sampler + ): + datapipe = datapipe.transform( + partial(self._prepare, graph.node_type_to_id) + ) + for fanout in reversed(fanouts): + # Convert fanout to tensor. + if not isinstance(fanout, torch.Tensor): + fanout = torch.LongTensor([int(fanout)]) + datapipe = datapipe.sample_per_layer( + sampler, fanout, replace, prob_name ) - if self.deduplicate: - ( - original_row_node_ids, - compacted_csc_format, - ) = unique_and_compact_csc_formats(subgraph.sampled_csc, seeds) - subgraph = SampledSubgraphImpl( - sampled_csc=compacted_csc_format, - original_column_node_ids=seeds, - original_row_node_ids=original_row_node_ids, - original_edge_ids=subgraph.original_edge_ids, - ) - else: - ( - original_row_node_ids, - compacted_csc_format, - ) = compact_csc_format(subgraph.sampled_csc, seeds) - subgraph = SampledSubgraphImpl( - sampled_csc=compacted_csc_format, - original_column_node_ids=seeds, - original_row_node_ids=original_row_node_ids, - original_edge_ids=subgraph.original_edge_ids, - ) - subgraphs.insert(0, subgraph) - seeds = original_row_node_ids - return seeds, subgraphs + datapipe = datapipe.compact_per_layer(deduplicate) + + return datapipe.transform(self._set_input_nodes) @functional_datapipe("sample_layer_neighbor") class LayerNeighborSampler(NeighborSampler): + # pylint: disable=abstract-method """Sample layer neighbor edges from a graph and return a subgraph. Functional name: :obj:`sample_layer_neighbor`. @@ -280,5 +330,5 @@ def __init__( replace, prob_name, deduplicate, + graph.sample_layer_neighbors, ) - self.sampler = graph.sample_layer_neighbors diff --git a/python/dgl/graphbolt/subgraph_sampler.py b/python/dgl/graphbolt/subgraph_sampler.py index 3e3c3d9b507c..b05b8ca30619 100644 --- a/python/dgl/graphbolt/subgraph_sampler.py +++ b/python/dgl/graphbolt/subgraph_sampler.py @@ -22,21 +22,44 @@ class SubgraphSampler(MiniBatchTransformer): Functional name: :obj:`sample_subgraph`. This class is the base class of all subgraph samplers. Any subclass of - SubgraphSampler should implement the :meth:`sample_subgraphs` method. + SubgraphSampler should implement either the :meth:`sample_subgraphs` method + or the :meth:`sampling_stages` method to define the fine-grained sampling + stages to take advantage of optimizations provided by the GraphBolt + DataLoader. Parameters ---------- datapipe : DataPipe The datapipe. + args : Non-Keyword Arguments + Arguments to be passed into sampling_stages. + kwargs : Keyword Arguments + Arguments to be passed into sampling_stages. """ def __init__( self, datapipe, + *args, + **kwargs, ): - super().__init__(datapipe, self._sample) + datapipe = datapipe.transform(self._preprocess) + datapipe = self.sampling_stages(datapipe, *args, **kwargs) + datapipe = datapipe.transform(self._postprocess) + super().__init__(datapipe, self._identity) - def _sample(self, minibatch): + @staticmethod + def _identity(minibatch): + return minibatch + + @staticmethod + def _postprocess(minibatch): + delattr(minibatch, "_seed_nodes") + delattr(minibatch, "_seeds_timestamp") + return minibatch + + @staticmethod + def _preprocess(minibatch): if minibatch.node_pairs is not None: ( seeds, @@ -44,7 +67,7 @@ def _sample(self, minibatch): minibatch.compacted_node_pairs, minibatch.compacted_negative_srcs, minibatch.compacted_negative_dsts, - ) = self._node_pairs_preprocess(minibatch) + ) = SubgraphSampler._node_pairs_preprocess(minibatch) elif minibatch.seed_nodes is not None: seeds = minibatch.seed_nodes seeds_timestamp = ( @@ -55,13 +78,12 @@ def _sample(self, minibatch): f"Invalid minibatch {minibatch}: Either `node_pairs` or " "`seed_nodes` should have a value." ) - ( - minibatch.input_nodes, - minibatch.sampled_subgraphs, - ) = self.sample_subgraphs(seeds, seeds_timestamp) + minibatch._seed_nodes = seeds + minibatch._seeds_timestamp = seeds_timestamp return minibatch - def _node_pairs_preprocess(self, minibatch): + @staticmethod + def _node_pairs_preprocess(minibatch): use_timestamp = hasattr(minibatch, "timestamp") node_pairs = minibatch.node_pairs neg_src, neg_dst = minibatch.negative_srcs, minibatch.negative_dsts @@ -191,6 +213,23 @@ def _node_pairs_preprocess(self, minibatch): compacted_negative_dsts if has_neg_dst else None, ) + def _sample(self, minibatch): + ( + minibatch.input_nodes, + minibatch.sampled_subgraphs, + ) = self.sample_subgraphs( + minibatch._seed_nodes, minibatch._seeds_timestamp + ) + return minibatch + + def sampling_stages(self, datapipe): + """The sampling stages are defined here by chaining to the datapipe. The + default implementation expects :meth:`sample_subgraphs` to be + implemented. To define fine-grained stages, this method should be + overridden. + """ + return datapipe.transform(self._sample) + def sample_subgraphs(self, seeds, seeds_timestamp): """Sample subgraphs from the given seeds, possibly with temporal constraints. diff --git a/tests/distributed/test_dist_graph_store.py b/tests/distributed/test_dist_graph_store.py index b473ef163215..8ba98ef4f47c 100644 --- a/tests/distributed/test_dist_graph_store.py +++ b/tests/distributed/test_dist_graph_store.py @@ -13,6 +13,7 @@ import backend as F import dgl +import dgl.graphbolt as gb import numpy as np import pytest import torch as th @@ -38,12 +39,33 @@ import struct +def _verify_dist_graph_server_dgl(g): + # verify dtype of underlying graph + cg = g.client_g + for k, dtype in dgl.distributed.dist_graph.RESERVED_FIELD_DTYPE.items(): + if k in cg.ndata: + assert ( + F.dtype(cg.ndata[k]) == dtype + ), "Data type of {} in ndata should be {}.".format(k, dtype) + if k in cg.edata: + assert ( + F.dtype(cg.edata[k]) == dtype + ), "Data type of {} in edata should be {}.".format(k, dtype) + + +def _verify_dist_graph_server_graphbolt(g): + graph = g.client_g + assert isinstance(graph, gb.FusedCSCSamplingGraph) + # [Rui][TODO] verify dtype of underlying graph. + + def run_server( graph_name, server_id, server_count, num_clients, shared_mem, + use_graphbolt=False, ): g = DistGraphServer( server_id, @@ -53,19 +75,15 @@ def run_server( "/tmp/dist_graph/{}.json".format(graph_name), disable_shared_mem=not shared_mem, graph_format=["csc", "coo"], + use_graphbolt=use_graphbolt, ) - print("start server", server_id) - # verify dtype of underlying graph - cg = g.client_g - for k, dtype in dgl.distributed.dist_graph.RESERVED_FIELD_DTYPE.items(): - if k in cg.ndata: - assert ( - F.dtype(cg.ndata[k]) == dtype - ), "Data type of {} in ndata should be {}.".format(k, dtype) - if k in cg.edata: - assert ( - F.dtype(cg.edata[k]) == dtype - ), "Data type of {} in edata should be {}.".format(k, dtype) + print(f"Starting server[{server_id}] with use_graphbolt={use_graphbolt}") + _verify = ( + _verify_dist_graph_server_graphbolt + if use_graphbolt + else _verify_dist_graph_server_dgl + ) + _verify(g) g.start() @@ -110,25 +128,35 @@ def check_dist_graph_empty(g, num_clients, num_nodes, num_edges): def run_client_empty( - graph_name, part_id, server_count, num_clients, num_nodes, num_edges + graph_name, + part_id, + server_count, + num_clients, + num_nodes, + num_edges, + use_graphbolt=False, ): os.environ["DGL_NUM_SERVER"] = str(server_count) dgl.distributed.initialize("kv_ip_config.txt") gpb, graph_name, _, _ = load_partition_book( "/tmp/dist_graph/{}.json".format(graph_name), part_id ) - g = DistGraph(graph_name, gpb=gpb) + g = DistGraph(graph_name, gpb=gpb, use_graphbolt=use_graphbolt) check_dist_graph_empty(g, num_clients, num_nodes, num_edges) -def check_server_client_empty(shared_mem, num_servers, num_clients): +def check_server_client_empty( + shared_mem, num_servers, num_clients, use_graphbolt=False +): prepare_dist(num_servers) g = create_random_graph(10000) # Partition the graph num_parts = 1 graph_name = "dist_graph_test_1" - partition_graph(g, graph_name, num_parts, "/tmp/dist_graph") + partition_graph( + g, graph_name, num_parts, "/tmp/dist_graph", use_graphbolt=use_graphbolt + ) # let's just test on one partition for now. # We cannot run multiple servers and clients on the same machine. @@ -137,7 +165,14 @@ def check_server_client_empty(shared_mem, num_servers, num_clients): for serv_id in range(num_servers): p = ctx.Process( target=run_server, - args=(graph_name, serv_id, num_servers, num_clients, shared_mem), + args=( + graph_name, + serv_id, + num_servers, + num_clients, + shared_mem, + use_graphbolt, + ), ) serv_ps.append(p) p.start() @@ -154,6 +189,7 @@ def check_server_client_empty(shared_mem, num_servers, num_clients): num_clients, g.num_nodes(), g.num_edges(), + use_graphbolt, ), ) p.start() @@ -178,6 +214,7 @@ def run_client( num_nodes, num_edges, group_id, + use_graphbolt=False, ): os.environ["DGL_NUM_SERVER"] = str(server_count) os.environ["DGL_GROUP_ID"] = str(group_id) @@ -185,8 +222,10 @@ def run_client( gpb, graph_name, _, _ = load_partition_book( "/tmp/dist_graph/{}.json".format(graph_name), part_id ) - g = DistGraph(graph_name, gpb=gpb) - check_dist_graph(g, num_clients, num_nodes, num_edges) + g = DistGraph(graph_name, gpb=gpb, use_graphbolt=use_graphbolt) + check_dist_graph( + g, num_clients, num_nodes, num_edges, use_graphbolt=use_graphbolt + ) def run_emb_client( @@ -270,14 +309,20 @@ def check_dist_optim_store(rank, num_nodes, optimizer_states, save): def run_client_hierarchy( - graph_name, part_id, server_count, node_mask, edge_mask, return_dict + graph_name, + part_id, + server_count, + node_mask, + edge_mask, + return_dict, + use_graphbolt=False, ): os.environ["DGL_NUM_SERVER"] = str(server_count) dgl.distributed.initialize("kv_ip_config.txt") gpb, graph_name, _, _ = load_partition_book( "/tmp/dist_graph/{}.json".format(graph_name), part_id ) - g = DistGraph(graph_name, gpb=gpb) + g = DistGraph(graph_name, gpb=gpb, use_graphbolt=use_graphbolt) node_mask = F.tensor(node_mask) edge_mask = F.tensor(edge_mask) nodes = node_split( @@ -355,7 +400,7 @@ def check_dist_emb(g, num_clients, num_nodes, num_edges): sys.exit(-1) -def check_dist_graph(g, num_clients, num_nodes, num_edges): +def check_dist_graph(g, num_clients, num_nodes, num_edges, use_graphbolt=False): # Test API assert g.num_nodes() == num_nodes assert g.num_edges() == num_edges @@ -373,9 +418,15 @@ def check_dist_graph(g, num_clients, num_nodes, num_edges): assert np.all(F.asnumpy(feats == eids)) # Test edge_subgraph - sg = g.edge_subgraph(eids) - assert sg.num_edges() == len(eids) - assert F.array_equal(sg.edata[dgl.EID], eids) + if use_graphbolt: + with pytest.raises( + AssertionError, match="find_edges is not supported in GraphBolt." + ): + g.edge_subgraph(eids) + else: + sg = g.edge_subgraph(eids) + assert sg.num_edges() == len(eids) + assert F.array_equal(sg.edata[dgl.EID], eids) # Test init node data new_shape = (g.num_nodes(), 2) @@ -522,7 +573,9 @@ def check_dist_emb_server_client( print("clients have terminated") -def check_server_client(shared_mem, num_servers, num_clients, num_groups=1): +def check_server_client( + shared_mem, num_servers, num_clients, num_groups=1, use_graphbolt=False +): prepare_dist(num_servers) g = create_random_graph(10000) @@ -531,7 +584,9 @@ def check_server_client(shared_mem, num_servers, num_clients, num_groups=1): graph_name = f"check_server_client_{shared_mem}_{num_servers}_{num_clients}_{num_groups}" g.ndata["features"] = F.unsqueeze(F.arange(0, g.num_nodes()), 1) g.edata["features"] = F.unsqueeze(F.arange(0, g.num_edges()), 1) - partition_graph(g, graph_name, num_parts, "/tmp/dist_graph") + partition_graph( + g, graph_name, num_parts, "/tmp/dist_graph", use_graphbolt=use_graphbolt + ) # let's just test on one partition for now. # We cannot run multiple servers and clients on the same machine. @@ -546,6 +601,7 @@ def check_server_client(shared_mem, num_servers, num_clients, num_groups=1): num_servers, num_clients, shared_mem, + use_graphbolt, ), ) serv_ps.append(p) @@ -566,6 +622,7 @@ def check_server_client(shared_mem, num_servers, num_clients, num_groups=1): g.num_nodes(), g.num_edges(), group_id, + use_graphbolt, ), ) p.start() @@ -582,7 +639,12 @@ def check_server_client(shared_mem, num_servers, num_clients, num_groups=1): print("clients have terminated") -def check_server_client_hierarchy(shared_mem, num_servers, num_clients): +def check_server_client_hierarchy( + shared_mem, num_servers, num_clients, use_graphbolt=False +): + if num_clients == 1: + # skip this test if there is only one client. + return prepare_dist(num_servers) g = create_random_graph(10000) @@ -597,6 +659,7 @@ def check_server_client_hierarchy(shared_mem, num_servers, num_clients): num_parts, "/tmp/dist_graph", num_trainers_per_machine=num_clients, + use_graphbolt=use_graphbolt, ) # let's just test on one partition for now. @@ -606,7 +669,14 @@ def check_server_client_hierarchy(shared_mem, num_servers, num_clients): for serv_id in range(num_servers): p = ctx.Process( target=run_server, - args=(graph_name, serv_id, num_servers, num_clients, shared_mem), + args=( + graph_name, + serv_id, + num_servers, + num_clients, + shared_mem, + use_graphbolt, + ), ) serv_ps.append(p) p.start() @@ -633,6 +703,7 @@ def check_server_client_hierarchy(shared_mem, num_servers, num_clients): node_mask, edge_mask, return_dict, + use_graphbolt, ), ) p.start() @@ -658,15 +729,23 @@ def check_server_client_hierarchy(shared_mem, num_servers, num_clients): def run_client_hetero( - graph_name, part_id, server_count, num_clients, num_nodes, num_edges + graph_name, + part_id, + server_count, + num_clients, + num_nodes, + num_edges, + use_graphbolt=False, ): os.environ["DGL_NUM_SERVER"] = str(server_count) dgl.distributed.initialize("kv_ip_config.txt") gpb, graph_name, _, _ = load_partition_book( "/tmp/dist_graph/{}.json".format(graph_name), part_id ) - g = DistGraph(graph_name, gpb=gpb) - check_dist_graph_hetero(g, num_clients, num_nodes, num_edges) + g = DistGraph(graph_name, gpb=gpb, use_graphbolt=use_graphbolt) + check_dist_graph_hetero( + g, num_clients, num_nodes, num_edges, use_graphbolt=use_graphbolt + ) def create_random_hetero(): @@ -701,7 +780,9 @@ def create_random_hetero(): return g -def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges): +def check_dist_graph_hetero( + g, num_clients, num_nodes, num_edges, use_graphbolt=False +): # Test API for ntype in num_nodes: assert ntype in g.ntypes @@ -754,12 +835,18 @@ def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges): assert expect_except # Test edge_subgraph - sg = g.edge_subgraph({"r1": eids}) - assert sg.num_edges() == len(eids) - assert F.array_equal(sg.edata[dgl.EID], eids) - sg = g.edge_subgraph({("n1", "r1", "n2"): eids}) - assert sg.num_edges() == len(eids) - assert F.array_equal(sg.edata[dgl.EID], eids) + if use_graphbolt: + with pytest.raises( + AssertionError, match="find_edges is not supported in GraphBolt." + ): + g.edge_subgraph({"r1": eids}) + else: + sg = g.edge_subgraph({"r1": eids}) + assert sg.num_edges() == len(eids) + assert F.array_equal(sg.edata[dgl.EID], eids) + sg = g.edge_subgraph({("n1", "r1", "n2"): eids}) + assert sg.num_edges() == len(eids) + assert F.array_equal(sg.edata[dgl.EID], eids) # Test init node data new_shape = (g.num_nodes("n1"), 2) @@ -827,14 +914,18 @@ def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges): print("end") -def check_server_client_hetero(shared_mem, num_servers, num_clients): +def check_server_client_hetero( + shared_mem, num_servers, num_clients, use_graphbolt=False +): prepare_dist(num_servers) g = create_random_hetero() # Partition the graph num_parts = 1 graph_name = "dist_graph_test_3" - partition_graph(g, graph_name, num_parts, "/tmp/dist_graph") + partition_graph( + g, graph_name, num_parts, "/tmp/dist_graph", use_graphbolt=use_graphbolt + ) # let's just test on one partition for now. # We cannot run multiple servers and clients on the same machine. @@ -843,7 +934,14 @@ def check_server_client_hetero(shared_mem, num_servers, num_clients): for serv_id in range(num_servers): p = ctx.Process( target=run_server, - args=(graph_name, serv_id, num_servers, num_clients, shared_mem), + args=( + graph_name, + serv_id, + num_servers, + num_clients, + shared_mem, + use_graphbolt, + ), ) serv_ps.append(p) p.start() @@ -862,6 +960,7 @@ def check_server_client_hetero(shared_mem, num_servers, num_clients): num_clients, num_nodes, num_edges, + use_graphbolt, ), ) p.start() @@ -886,21 +985,23 @@ def check_server_client_hetero(shared_mem, num_servers, num_clients): @unittest.skipIf( dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support" ) -def test_server_client(): +@pytest.mark.parametrize("shared_mem", [True]) +@pytest.mark.parametrize("num_servers", [1]) +@pytest.mark.parametrize("num_clients", [1, 4]) +@pytest.mark.parametrize("use_graphbolt", [True, False]) +def test_server_client(shared_mem, num_servers, num_clients, use_graphbolt): reset_envs() os.environ["DGL_DIST_MODE"] = "distributed" - check_server_client_hierarchy(False, 1, 4) - check_server_client_empty(True, 1, 1) - check_server_client_hetero(True, 1, 1) - check_server_client_hetero(False, 1, 1) - check_server_client(True, 1, 1) - check_server_client(False, 1, 1) - # [TODO][Rhett] Tests for multiple groups may fail sometimes and - # root cause is unknown. Let's disable them for now. - # check_server_client(True, 2, 2) - # check_server_client(True, 1, 1, 2) - # check_server_client(False, 1, 1, 2) - # check_server_client(True, 2, 2, 2) + # [Rui] + # 1. `disable_shared_mem=False` is not supported yet. Skip it. + # 2. `num_servers` > 1 does not work on single machine. Skip it. + for func in [ + check_server_client, + check_server_client_hetero, + check_server_client_empty, + check_server_client_hierarchy, + ]: + func(shared_mem, num_servers, num_clients, use_graphbolt=use_graphbolt) @unittest.skip(reason="Skip due to glitch in CI") diff --git a/tests/distributed/test_partition.py b/tests/distributed/test_partition.py index 6b2df3fdc038..30a85f7df025 100644 --- a/tests/distributed/test_partition.py +++ b/tests/distributed/test_partition.py @@ -944,3 +944,165 @@ def test_not_sorted_node_edge_map(): gpb, _, _, _ = load_partition_book(part_config, 1) assert gpb.local_ntype_offset == [0, 300, 700] assert gpb.local_etype_offset == [0, 500, 1100, 1800, 2600] + + +@pytest.mark.parametrize("part_method", ["metis", "random"]) +@pytest.mark.parametrize("num_parts", [1, 4]) +@pytest.mark.parametrize("store_eids", [True, False]) +@pytest.mark.parametrize("store_inner_node", [True, False]) +@pytest.mark.parametrize("store_inner_edge", [True, False]) +@pytest.mark.parametrize("debug_mode", [True, False]) +def test_partition_graph_graphbolt_homo( + part_method, + num_parts, + store_eids, + store_inner_node, + store_inner_edge, + debug_mode, +): + reset_envs() + if debug_mode: + os.environ["DGL_DIST_DEBUG"] = "1" + with tempfile.TemporaryDirectory() as test_dir: + g = create_random_graph(1000) + graph_name = "test" + partition_graph( + g, + graph_name, + num_parts, + test_dir, + part_method=part_method, + use_graphbolt=True, + store_eids=store_eids, + store_inner_node=store_inner_node, + store_inner_edge=store_inner_edge, + ) + part_config = os.path.join(test_dir, f"{graph_name}.json") + for part_id in range(num_parts): + orig_g = dgl.load_graphs( + os.path.join(test_dir, f"part{part_id}/graph.dgl") + )[0][0] + new_g = load_partition( + part_config, part_id, load_feats=False, use_graphbolt=True + )[0] + orig_indptr, orig_indices, orig_eids = orig_g.adj().csc() + assert th.equal(orig_indptr, new_g.csc_indptr) + assert th.equal(orig_indices, new_g.indices) + assert new_g.node_type_offset is None + assert th.equal( + orig_g.ndata[dgl.NID], new_g.node_attributes[dgl.NID] + ) + if store_inner_node or debug_mode: + assert th.equal( + orig_g.ndata["inner_node"], + new_g.node_attributes["inner_node"], + ) + else: + assert "inner_node" not in new_g.node_attributes + if store_eids or debug_mode: + assert th.equal( + orig_g.edata[dgl.EID][orig_eids], + new_g.edge_attributes[dgl.EID], + ) + else: + assert dgl.EID not in new_g.edge_attributes + if store_inner_edge or debug_mode: + assert th.equal( + orig_g.edata["inner_edge"][orig_eids], + new_g.edge_attributes["inner_edge"], + ) + else: + assert "inner_edge" not in new_g.edge_attributes + assert new_g.type_per_edge is None + assert new_g.node_type_to_id is None + assert new_g.edge_type_to_id is None + + +@pytest.mark.parametrize("part_method", ["metis", "random"]) +@pytest.mark.parametrize("num_parts", [1, 4]) +@pytest.mark.parametrize("store_eids", [True, False]) +@pytest.mark.parametrize("store_inner_node", [True, False]) +@pytest.mark.parametrize("store_inner_edge", [True, False]) +@pytest.mark.parametrize("debug_mode", [True, False]) +def test_partition_graph_graphbolt_hetero( + part_method, + num_parts, + store_eids, + store_inner_node, + store_inner_edge, + debug_mode, +): + reset_envs() + if debug_mode: + os.environ["DGL_DIST_DEBUG"] = "1" + with tempfile.TemporaryDirectory() as test_dir: + g = create_random_hetero() + graph_name = "test" + partition_graph( + g, + graph_name, + num_parts, + test_dir, + part_method=part_method, + use_graphbolt=True, + store_eids=store_eids, + store_inner_node=store_inner_node, + store_inner_edge=store_inner_edge, + ) + part_config = os.path.join(test_dir, f"{graph_name}.json") + for part_id in range(num_parts): + orig_g = dgl.load_graphs( + os.path.join(test_dir, f"part{part_id}/graph.dgl") + )[0][0] + new_g = load_partition( + part_config, part_id, load_feats=False, use_graphbolt=True + )[0] + orig_indptr, orig_indices, orig_eids = orig_g.adj().csc() + assert th.equal(orig_indptr, new_g.csc_indptr) + assert th.equal(orig_indices, new_g.indices) + assert th.equal( + orig_g.ndata[dgl.NID], new_g.node_attributes[dgl.NID] + ) + if store_inner_node or debug_mode: + assert th.equal( + orig_g.ndata["inner_node"], + new_g.node_attributes["inner_node"], + ) + else: + assert "inner_node" not in new_g.node_attributes + if debug_mode: + assert th.equal( + orig_g.ndata[dgl.NTYPE], new_g.node_attributes[dgl.NTYPE] + ) + else: + assert dgl.NTYPE not in new_g.node_attributes + if store_eids or debug_mode: + assert th.equal( + orig_g.edata[dgl.EID][orig_eids], + new_g.edge_attributes[dgl.EID], + ) + else: + assert dgl.EID not in new_g.edge_attributes + if store_inner_edge or debug_mode: + assert th.equal( + orig_g.edata["inner_edge"], + new_g.edge_attributes["inner_edge"], + ) + else: + assert "inner_edge" not in new_g.edge_attributes + if debug_mode: + assert th.equal( + orig_g.edata[dgl.ETYPE][orig_eids], + new_g.edge_attributes[dgl.ETYPE], + ) + else: + assert dgl.ETYPE not in new_g.edge_attributes + assert th.equal( + orig_g.edata[dgl.ETYPE][orig_eids], new_g.type_per_edge + ) + + for node_type, type_id in new_g.node_type_to_id.items(): + assert g.get_ntype_id(node_type) == type_id + for edge_type, type_id in new_g.edge_type_to_id.items(): + assert g.get_etype_id(_etype_str_to_tuple(edge_type)) == type_id + assert new_g.node_type_offset is None diff --git a/tests/python/pytorch/graphbolt/test_base.py b/tests/python/pytorch/graphbolt/test_base.py index 34c57c469ef6..b25b28166294 100644 --- a/tests/python/pytorch/graphbolt/test_base.py +++ b/tests/python/pytorch/graphbolt/test_base.py @@ -13,17 +13,19 @@ @unittest.skipIf(F._default_context_str == "cpu", "CopyTo needs GPU to test") def test_CopyTo(): - item_sampler = gb.ItemSampler(gb.ItemSet(torch.randn(20)), 4) + item_sampler = gb.ItemSampler( + gb.ItemSet(torch.arange(20), names="seed_nodes"), 4 + ) # Invoke CopyTo via class constructor. dp = gb.CopyTo(item_sampler, "cuda") for data in dp: - assert data.device.type == "cuda" + assert data.seed_nodes.device.type == "cuda" # Invoke CopyTo via functional form. dp = item_sampler.copy_to("cuda") for data in dp: - assert data.device.type == "cuda" + assert data.seed_nodes.device.type == "cuda" @pytest.mark.parametrize( diff --git a/tests/python/pytorch/graphbolt/test_feature_fetcher.py b/tests/python/pytorch/graphbolt/test_feature_fetcher.py index bd14716bb188..63d990dc5eaa 100644 --- a/tests/python/pytorch/graphbolt/test_feature_fetcher.py +++ b/tests/python/pytorch/graphbolt/test_feature_fetcher.py @@ -77,7 +77,8 @@ def test_FeatureFetcher_with_edges_homo(): [[random.randint(0, 10)] for _ in range(graph.total_num_edges)] ) - def add_node_and_edge_ids(seeds): + def add_node_and_edge_ids(minibatch): + seeds = minibatch.seed_nodes subgraphs = [] for _ in range(3): sampled_csc = gb.CSCFormatBase( @@ -103,7 +104,7 @@ def add_node_and_edge_ids(seeds): features[keys[1]] = gb.TorchBasedFeature(b) feature_store = gb.BasicFeatureStore(features) - itemset = gb.ItemSet(torch.arange(10)) + itemset = gb.ItemSet(torch.arange(10), names="seed_nodes") item_sampler_dp = gb.ItemSampler(itemset, batch_size=2) converter_dp = Mapper(item_sampler_dp, add_node_and_edge_ids) fetcher_dp = gb.FeatureFetcher(converter_dp, feature_store, ["a"], ["b"]) @@ -170,7 +171,8 @@ def test_FeatureFetcher_with_edges_hetero(): a = torch.tensor([[random.randint(0, 10)] for _ in range(20)]) b = torch.tensor([[random.randint(0, 10)] for _ in range(50)]) - def add_node_and_edge_ids(seeds): + def add_node_and_edge_ids(minibatch): + seeds = minibatch.seed_nodes subgraphs = [] original_edge_ids = { "n1:e1:n2": torch.randint(0, 50, (10,)), @@ -213,7 +215,7 @@ def add_node_and_edge_ids(seeds): itemset = gb.ItemSetDict( { - "n1": gb.ItemSet(torch.randint(0, 20, (10,))), + "n1": gb.ItemSet(torch.randint(0, 20, (10,)), names="seed_nodes"), } ) item_sampler_dp = gb.ItemSampler(itemset, batch_size=2) diff --git a/tests/python/pytorch/graphbolt/test_item_sampler.py b/tests/python/pytorch/graphbolt/test_item_sampler.py index fc4764df026f..264e489726cf 100644 --- a/tests/python/pytorch/graphbolt/test_item_sampler.py +++ b/tests/python/pytorch/graphbolt/test_item_sampler.py @@ -204,9 +204,16 @@ def test_ItemSet_graphs(batch_size, shuffle, drop_last): dgl.rand_graph(num_nodes * (i + 1), num_edges * (i + 1)) for i in range(num_graphs) ] - item_set = gb.ItemSet(graphs) + item_set = gb.ItemSet(graphs, names="graphs") + # DGLGraph is not supported in gb.MiniBatch yet. Let's use a customized + # minibatcher to return the original graphs. + customized_minibatcher = lambda batch, names: batch item_sampler = gb.ItemSampler( - item_set, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last + item_set, + batch_size=batch_size, + shuffle=shuffle, + drop_last=drop_last, + minibatcher=customized_minibatcher, ) minibatch_num_nodes = [] minibatch_num_edges = [] @@ -459,13 +466,13 @@ def test_ItemSet_seeds_labels(batch_size, shuffle, drop_last): def test_append_with_other_datapipes(): num_ids = 100 batch_size = 4 - item_set = gb.ItemSet(torch.arange(0, num_ids)) + item_set = gb.ItemSet(torch.arange(0, num_ids), names="seed_nodes") data_pipe = gb.ItemSampler(item_set, batch_size) # torchdata.datapipes.iter.Enumerator data_pipe = data_pipe.enumerate() for i, (idx, data) in enumerate(data_pipe): assert i == idx - assert len(data) == batch_size + assert len(data.seed_nodes) == batch_size @pytest.mark.parametrize("batch_size", [1, 4]) diff --git a/tests/python/pytorch/graphbolt/impl/test_minibatch.py b/tests/python/pytorch/graphbolt/test_minibatch.py similarity index 100% rename from tests/python/pytorch/graphbolt/impl/test_minibatch.py rename to tests/python/pytorch/graphbolt/test_minibatch.py