diff --git a/examples/sampling/graphbolt/quickstart/link_prediction.py b/examples/sampling/graphbolt/quickstart/link_prediction.py index db674040b19a..387b5fe284e5 100644 --- a/examples/sampling/graphbolt/quickstart/link_prediction.py +++ b/examples/sampling/graphbolt/quickstart/link_prediction.py @@ -98,11 +98,8 @@ def evaluate(model, dataset, device): logits = [] labels = [] for step, data in enumerate(dataloader): - # Convert data to DGL format for computing. - data = data.to_dgl() - - # Unpack MiniBatch. - compacted_pairs, label = to_binary_link_dgl_computing_pack(data) + # Get node pairs with labels for loss calculation. + compacted_pairs, label = data.node_pairs_with_labels # The features of sampled nodes. x = data.node_features["feat"] @@ -140,11 +137,8 @@ def train(model, dataset, device): # mini-batches. ######################################################################## for step, data in enumerate(dataloader): - # Convert data to DGL format for computing. - data = data.to_dgl() - - # Unpack MiniBatch. - compacted_pairs, labels = to_binary_link_dgl_computing_pack(data) + # Get node pairs with labels for loss calculation. + compacted_pairs, labels = data.node_pairs_with_labels # The features of sampled nodes. x = data.node_features["feat"] diff --git a/examples/sampling/graphbolt/quickstart/node_classification.py b/examples/sampling/graphbolt/quickstart/node_classification.py index 43a885a4f8fc..823d08d5b447 100644 --- a/examples/sampling/graphbolt/quickstart/node_classification.py +++ b/examples/sampling/graphbolt/quickstart/node_classification.py @@ -57,7 +57,6 @@ def evaluate(model, dataset, itemset, device): dataloader = create_dataloader(dataset, itemset, device) for step, data in enumerate(dataloader): - data = data.to_dgl() x = data.node_features["feat"] y.append(data.labels) y_hats.append(model(data.blocks, x)) @@ -84,9 +83,6 @@ def train(model, dataset, device): # mini-batches. ######################################################################## for step, data in enumerate(dataloader): - # Convert data to DGL format for computing. - data = data.to_dgl() - # The features of sampled nodes. x = data.node_features["feat"] diff --git a/python/dgl/graphbolt/minibatch.py b/python/dgl/graphbolt/minibatch.py index 49e36659a7d3..df292abf0a2e 100644 --- a/python/dgl/graphbolt/minibatch.py +++ b/python/dgl/graphbolt/minibatch.py @@ -363,9 +363,10 @@ def set_edge_features( """Set edge features.""" self.edge_features = edge_features - def _to_dgl_blocks(self): - """Transforming a `MiniBatch` into DGL blocks necessitates constructing - a graphical structure and ID mappings. + @property + def blocks(self): + """Extracts DGL blocks from `MiniBatch` to construct a graphical + structure and ID mappings. """ if not self.sampled_subgraphs: return None @@ -459,98 +460,135 @@ def _to_dgl_blocks(self): block.edata[dgl.EID] = subgraph.original_edge_ids return blocks - def to_dgl(self): - """Converting a `MiniBatch` into a DGL MiniBatch that contains - everything necessary for computation." + @property + def positive_node_pairs(self): + """`positive_node_pairs` is a representation of positive graphs used for + evaluating or computing loss in link prediction tasks. + - If `positive_node_pairs` is a tuple: It indicates a homogeneous graph + containing two tensors representing source-destination node pairs. + - If `positive_node_pairs` is a dictionary: The keys should be edge type, + and the value should be a tuple of tensors representing node pairs of the + given type. """ - minibatch = DGLMiniBatch( - blocks=self._to_dgl_blocks(), - node_features=self.node_features, - edge_features=self.edge_features, - labels=self.labels, - ) - # Need input nodes to fetch feature. - if self.node_features is None: - minibatch.input_nodes = self.input_nodes - # Need output nodes to fetch label. - if self.labels is None: - minibatch.output_nodes = self.seed_nodes - assert ( - minibatch.blocks is not None - ), "Sampled subgraphs for computation are missing." - - # For link prediction tasks. - if self.compacted_node_pairs is not None: - minibatch.positive_node_pairs = self.compacted_node_pairs - # Build negative graph. - if ( - self.compacted_negative_srcs is not None - and self.compacted_negative_dsts is not None - ): - # For homogeneous graph. - if isinstance(self.compacted_negative_srcs, torch.Tensor): - minibatch.negative_node_pairs = ( - self.compacted_negative_srcs.view(-1), - self.compacted_negative_dsts.view(-1), + return self.compacted_node_pairs + + @property + def negative_node_pairs(self): + """`negative_node_pairs` is a representation of negative graphs used for + evaluating or computing loss in link prediction tasks. + - If `negative_node_pairs` is a tuple: It indicates a homogeneous graph + containing two tensors representing source-destination node pairs. + - If `negative_node_pairs` is a dictionary: The keys should be edge type, + and the value should be a tuple of tensors representing node pairs of the + given type. + """ + # Build negative graph. + if ( + self.compacted_negative_srcs is not None + and self.compacted_negative_dsts is not None + ): + # For homogeneous graph. + if isinstance(self.compacted_negative_srcs, torch.Tensor): + negative_node_pairs = ( + self.compacted_negative_srcs.view(-1), + self.compacted_negative_dsts.view(-1), + ) + # For heterogeneous graph. + else: + negative_node_pairs = { + etype: ( + neg_src.view(-1), + self.compacted_negative_dsts[etype].view(-1), ) - # For heterogeneous graph. - else: - minibatch.negative_node_pairs = { - etype: ( - neg_src.view(-1), - self.compacted_negative_dsts[etype].view(-1), - ) - for etype, neg_src in self.compacted_negative_srcs.items() - } - elif self.compacted_negative_srcs is not None: - # For homogeneous graph. - if isinstance(self.compacted_negative_srcs, torch.Tensor): - negative_ratio = self.compacted_negative_srcs.size(1) - minibatch.negative_node_pairs = ( - self.compacted_negative_srcs.view(-1), - self.compacted_node_pairs[1].repeat_interleave( + for etype, neg_src in self.compacted_negative_srcs.items() + } + elif ( + self.compacted_negative_srcs is not None + and self.compacted_node_pairs is not None + ): + # For homogeneous graph. + if isinstance(self.compacted_negative_srcs, torch.Tensor): + negative_ratio = self.compacted_negative_srcs.size(1) + negative_node_pairs = ( + self.compacted_negative_srcs.view(-1), + self.compacted_node_pairs[1].repeat_interleave( + negative_ratio + ), + ) + # For heterogeneous graph. + else: + negative_ratio = list(self.compacted_negative_srcs.values())[ + 0 + ].size(1) + negative_node_pairs = { + etype: ( + neg_src.view(-1), + self.compacted_node_pairs[etype][1].repeat_interleave( negative_ratio ), ) - # For heterogeneous graph. - else: - negative_ratio = list( - self.compacted_negative_srcs.values() - )[0].size(1) - minibatch.negative_node_pairs = { - etype: ( - neg_src.view(-1), - self.compacted_node_pairs[etype][ - 1 - ].repeat_interleave(negative_ratio), - ) - for etype, neg_src in self.compacted_negative_srcs.items() - } - elif self.compacted_negative_dsts is not None: - # For homogeneous graph. - if isinstance(self.compacted_negative_dsts, torch.Tensor): - negative_ratio = self.compacted_negative_dsts.size(1) - minibatch.negative_node_pairs = ( - self.compacted_node_pairs[0].repeat_interleave( + for etype, neg_src in self.compacted_negative_srcs.items() + } + elif ( + self.compacted_negative_dsts is not None + and self.compacted_node_pairs is not None + ): + # For homogeneous graph. + if isinstance(self.compacted_negative_dsts, torch.Tensor): + negative_ratio = self.compacted_negative_dsts.size(1) + negative_node_pairs = ( + self.compacted_node_pairs[0].repeat_interleave( + negative_ratio + ), + self.compacted_negative_dsts.view(-1), + ) + # For heterogeneous graph. + else: + negative_ratio = list(self.compacted_negative_dsts.values())[ + 0 + ].size(1) + negative_node_pairs = { + etype: ( + self.compacted_node_pairs[etype][0].repeat_interleave( negative_ratio ), - self.compacted_negative_dsts.view(-1), + neg_dst.view(-1), ) - # For heterogeneous graph. - else: - negative_ratio = list( - self.compacted_negative_dsts.values() - )[0].size(1) - minibatch.negative_node_pairs = { - etype: ( - self.compacted_node_pairs[etype][ - 0 - ].repeat_interleave(negative_ratio), - neg_dst.view(-1), - ) - for etype, neg_dst in self.compacted_negative_dsts.items() - } - return minibatch + for etype, neg_dst in self.compacted_negative_dsts.items() + } + else: + negative_node_pairs = None + return negative_node_pairs + + @property + def node_pairs_with_labels(self): + """Get a node pair tensor and a label tensor from MiniBatch. They are + used for evaluating or computing loss. It will return + `(node_pairs, labels)` as result. + - If it's a link prediction task, `node_pairs` will contain both + negative and positive node pairs and `labels` will consist of 0 and 1, + indicating whether the corresponding node pair is negative or positive. + - If it's an edge classification task, this function will directly + return `compacted_node_pairs` and corresponding `labels`. + - Otherwise it will return None. + """ + if self.labels is None: + positive_node_pairs = self.positive_node_pairs + negative_node_pairs = self.negative_node_pairs + if positive_node_pairs is None or negative_node_pairs is None: + return None + pos_src, pos_dst = positive_node_pairs + neg_src, neg_dst = negative_node_pairs + node_pairs = ( + torch.cat((pos_src, neg_src), dim=0), + torch.cat((pos_dst, neg_dst), dim=0), + ) + pos_label = torch.ones_like(pos_src) + neg_label = torch.zeros_like(neg_src) + labels = torch.cat([pos_label, neg_label], dim=0) + return (node_pairs, labels.float()) + else: + return (self.compacted_node_pairs, self.labels) def to(self, device: torch.device) -> None: # pylint: disable=invalid-name """Copy `MiniBatch` to the specified device using reflection.""" @@ -561,13 +599,16 @@ def _to(x, device): for attr in dir(self): # Only copy member variables. if not callable(getattr(self, attr)) and not attr.startswith("__"): - setattr( - self, - attr, - recursive_apply( - getattr(self, attr), lambda x: _to(x, device) - ), - ) + try: + setattr( + self, + attr, + recursive_apply( + getattr(self, attr), lambda x: _to(x, device) + ), + ) + except AttributeError: + continue return self diff --git a/python/dgl/graphbolt/minibatch_transformer.py b/python/dgl/graphbolt/minibatch_transformer.py index d23907b2041e..32d03a5b442c 100644 --- a/python/dgl/graphbolt/minibatch_transformer.py +++ b/python/dgl/graphbolt/minibatch_transformer.py @@ -8,7 +8,6 @@ __all__ = [ "MiniBatchTransformer", - "DGLMiniBatchConverter", ] @@ -41,22 +40,3 @@ def _transformer(self, minibatch): minibatch, (MiniBatch, DGLMiniBatch) ), "The transformer output should be an instance of MiniBatch" return minibatch - - -@functional_datapipe("to_dgl") -class DGLMiniBatchConverter(Mapper): - """Convert a graphbolt mini-batch to a dgl mini-batch. - - Functional name: :obj:`to_dgl`. - - Parameters - ---------- - datapipe : DataPipe - The datapipe. - """ - - def __init__( - self, - datapipe, - ): - super().__init__(datapipe, MiniBatch.to_dgl) diff --git a/tests/python/pytorch/graphbolt/impl/test_minibatch.py b/tests/python/pytorch/graphbolt/impl/test_minibatch.py index de0e9fffd8d1..aa143434b4a9 100644 --- a/tests/python/pytorch/graphbolt/impl/test_minibatch.py +++ b/tests/python/pytorch/graphbolt/impl/test_minibatch.py @@ -163,9 +163,12 @@ def test_minibatch_representation_homo(): expect_result = str( """MiniBatch(seed_nodes=None, sampled_subgraphs=None, + positive_node_pairs=None, + node_pairs_with_labels=None, node_pairs=None, node_features=None, negative_srcs=None, + negative_node_pairs=None, negative_dsts=None, labels=None, input_nodes=None, @@ -173,6 +176,7 @@ def test_minibatch_representation_homo(): compacted_node_pairs=None, compacted_negative_srcs=None, compacted_negative_dsts=None, + blocks=None, )""" ) result = str(minibatch) @@ -207,6 +211,13 @@ def test_minibatch_representation_homo(): indices=tensor([1, 2, 0]), ), )], + positive_node_pairs=CSCFormatBase(indptr=tensor([0, 2, 3]), + indices=tensor([3, 4, 5]), + ), + node_pairs_with_labels=(CSCFormatBase(indptr=tensor([0, 2, 3]), + indices=tensor([3, 4, 5]), + ), + tensor([0., 1., 2.])), node_pairs=[CSCFormatBase(indptr=tensor([0, 1, 3, 5, 6]), indices=tensor([0, 1, 2, 2, 1, 2]), ), @@ -217,6 +228,8 @@ def test_minibatch_representation_homo(): negative_srcs=tensor([[8], [1], [6]]), + negative_node_pairs=(tensor([0, 1, 2]), + tensor([6, 0, 0])), negative_dsts=tensor([[2], [8], [8]]), @@ -233,6 +246,8 @@ def test_minibatch_representation_homo(): compacted_negative_dsts=tensor([[6], [0], [0]]), + blocks=[Block(num_src_nodes=4, num_dst_nodes=4, num_edges=6), + Block(num_src_nodes=3, num_dst_nodes=2, num_edges=3)], )""" ) result = str(minibatch) @@ -307,7 +322,7 @@ def test_minibatch_representation_hetero(): } compacted_negative_srcs = {relation: torch.tensor([[0], [1], [2]])} compacted_negative_dsts = {relation: torch.tensor([[6], [0], [0]])} - # Test dglminibatch with all attributes. + # Test minibatch with all attributes. minibatch = gb.MiniBatch( seed_nodes={"B": torch.tensor([10, 15])}, node_pairs=csc_formats, @@ -343,6 +358,17 @@ def test_minibatch_representation_hetero(): indices=tensor([1, 0]), )}, )], + positive_node_pairs={'A:r:B': CSCFormatBase(indptr=tensor([0, 1, 2, 3]), + indices=tensor([3, 4, 5]), + ), 'B:rr:A': CSCFormatBase(indptr=tensor([0, 0, 0, 1, 2]), + indices=tensor([0, 1]), + )}, + node_pairs_with_labels=({'A:r:B': CSCFormatBase(indptr=tensor([0, 1, 2, 3]), + indices=tensor([3, 4, 5]), + ), 'B:rr:A': CSCFormatBase(indptr=tensor([0, 0, 0, 1, 2]), + indices=tensor([0, 1]), + )}, + {'B': tensor([2, 5])}), node_pairs=[{'A:r:B': CSCFormatBase(indptr=tensor([0, 1, 2, 3]), indices=tensor([0, 1, 1]), ), 'B:rr:A': CSCFormatBase(indptr=tensor([0, 0, 0, 1, 2]), @@ -355,6 +381,7 @@ def test_minibatch_representation_hetero(): negative_srcs={'B': tensor([[8], [1], [6]])}, + negative_node_pairs={'A:r:B': (tensor([0, 1, 2]), tensor([6, 0, 0]))}, negative_dsts={'B': tensor([[2], [8], [8]])}, @@ -373,13 +400,21 @@ def test_minibatch_representation_hetero(): compacted_negative_dsts={'A:r:B': tensor([[6], [0], [0]])}, + blocks=[Block(num_src_nodes={'A': 4, 'B': 3}, + num_dst_nodes={'A': 4, 'B': 3}, + num_edges={('A', 'r', 'B'): 3, ('B', 'rr', 'A'): 2}, + metagraph=[('A', 'B', 'r'), ('B', 'A', 'rr')]), + Block(num_src_nodes={'A': 2, 'B': 2}, + num_dst_nodes={'B': 2}, + num_edges={('A', 'r', 'B'): 2}, + metagraph=[('A', 'B', 'r')])], )""" ) result = str(minibatch) assert result == expect_result, print(result) -def test_dgl_minibatch_representation_homo(): +def test_get_dgl_blocks_homo(): node_pairs = [ ( torch.tensor([0, 1, 2, 2, 2, 1]), @@ -424,7 +459,7 @@ def test_dgl_minibatch_representation_homo(): compacted_negative_srcs = torch.tensor([[0], [1], [2]]) compacted_negative_dsts = torch.tensor([[6], [0], [0]]) labels = torch.tensor([0.0, 1.0, 2.0]) - # Test dglminibatch with all attributes. + # Test minibatch with all attributes. minibatch = gb.MiniBatch( node_pairs=node_pairs, sampled_subgraphs=subgraphs, @@ -438,31 +473,15 @@ def test_dgl_minibatch_representation_homo(): compacted_negative_srcs=compacted_negative_srcs, compacted_negative_dsts=compacted_negative_dsts, ) - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks expect_result = str( - """DGLMiniBatch(positive_node_pairs=(tensor([0, 1, 2]), - tensor([3, 4, 5])), - output_nodes=None, - node_features={'x': tensor([7, 6, 2, 2])}, - negative_node_pairs=(tensor([0, 1, 2]), - tensor([6, 0, 0])), - labels=tensor([0., 1., 2.]), - input_nodes=None, - edge_features=[{'x': tensor([[8], - [1], - [6]])}, - {'x': tensor([[2], - [8], - [8]])}], - blocks=[Block(num_src_nodes=4, num_dst_nodes=4, num_edges=6), - Block(num_src_nodes=3, num_dst_nodes=2, num_edges=3)], - )""" + """[Block(num_src_nodes=4, num_dst_nodes=4, num_edges=6), Block(num_src_nodes=3, num_dst_nodes=2, num_edges=3)]""" ) - result = str(dgl_minibatch) + result = str(dgl_blocks) assert result == expect_result, print(result) -def test_dgl_minibatch_representation_hetero(): +def test_get_dgl_blocks_hetero(): node_pairs = [ { relation: (torch.tensor([0, 1, 1]), torch.tensor([0, 1, 2])), @@ -516,7 +535,7 @@ def test_dgl_minibatch_representation_hetero(): } compacted_negative_srcs = {relation: torch.tensor([[0], [1], [2]])} compacted_negative_dsts = {relation: torch.tensor([[6], [0], [0]])} - # Test dglminibatch with all attributes. + # Test minibatch with all attributes. minibatch = gb.MiniBatch( seed_nodes={"B": torch.tensor([10, 15])}, node_pairs=node_pairs, @@ -534,30 +553,63 @@ def test_dgl_minibatch_representation_hetero(): compacted_negative_srcs=compacted_negative_srcs, compacted_negative_dsts=compacted_negative_dsts, ) - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks expect_result = str( - """DGLMiniBatch(positive_node_pairs={'A:r:B': (tensor([0, 1, 2]), tensor([3, 4, 5])), 'B:rr:A': (tensor([0, 1, 2]), tensor([3, 4, 5]))}, - output_nodes=None, - node_features={('A', 'x'): tensor([6, 4, 0, 1])}, - negative_node_pairs={'A:r:B': (tensor([0, 1, 2]), tensor([6, 0, 0]))}, - labels={'B': tensor([2, 5])}, - input_nodes=None, - edge_features=[{('A:r:B', 'x'): tensor([4, 2, 4])}, - {('A:r:B', 'x'): tensor([0, 6])}], - blocks=[Block(num_src_nodes={'A': 4, 'B': 3}, - num_dst_nodes={'A': 4, 'B': 3}, - num_edges={('A', 'r', 'B'): 3, ('B', 'rr', 'A'): 2}, - metagraph=[('A', 'B', 'r'), ('B', 'A', 'rr')]), - Block(num_src_nodes={'A': 2, 'B': 2}, - num_dst_nodes={'B': 2}, - num_edges={('A', 'r', 'B'): 2}, - metagraph=[('A', 'B', 'r')])], - )""" + """[Block(num_src_nodes={'A': 4, 'B': 3}, + num_dst_nodes={'A': 4, 'B': 3}, + num_edges={('A', 'r', 'B'): 3, ('B', 'rr', 'A'): 2}, + metagraph=[('A', 'B', 'r'), ('B', 'A', 'rr')]), Block(num_src_nodes={'A': 2, 'B': 2}, + num_dst_nodes={'B': 2}, + num_edges={('A', 'r', 'B'): 2}, + metagraph=[('A', 'B', 'r')])]""" ) - result = str(dgl_minibatch) + result = str(dgl_blocks) assert result == expect_result, print(result) +@pytest.mark.parametrize( + "mode", ["neg_graph", "neg_src", "neg_dst", "edge_classification"] +) +def test_minibatch_node_pairs_with_labels(mode): + # Arrange + minibatch = create_homo_minibatch() + minibatch.compacted_node_pairs = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + if mode == "neg_graph" or mode == "neg_src": + minibatch.compacted_negative_srcs = torch.tensor([[0, 0], [1, 1]]) + if mode == "neg_graph" or mode == "neg_dst": + minibatch.compacted_negative_dsts = torch.tensor([[1, 0], [0, 1]]) + if mode == "edge_classification": + minibatch.labels = torch.tensor([0, 1]).long() + # Act + node_pairs, labels = minibatch.node_pairs_with_labels + + # Assert + if mode == "neg_src": + expect_node_pairs = ( + torch.tensor([0, 1, 0, 0, 1, 1]), + torch.tensor([1, 0, 1, 1, 0, 0]), + ) + expect_labels = torch.tensor([1, 1, 0, 0, 0, 0]).float() + elif mode != "edge_classification": + expect_node_pairs = ( + torch.tensor([0, 1, 0, 0, 1, 1]), + torch.tensor([1, 0, 1, 0, 0, 1]), + ) + expect_labels = torch.tensor([1, 1, 0, 0, 0, 0]).float() + else: + expect_node_pairs = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + expect_labels = torch.tensor([0, 1]).long() + assert torch.equal(node_pairs[0], expect_node_pairs[0]) + assert torch.equal(node_pairs[1], expect_node_pairs[1]) + assert torch.equal(labels, expect_labels) + + def check_dgl_blocks_hetero(minibatch, blocks): etype = gb.etype_str_to_tuple(relation) node_pairs = [ @@ -607,61 +659,48 @@ def check_dgl_blocks_homo(minibatch, blocks): assert torch.equal(blocks[0].srcdata[dgl.NID], original_row_node_ids[0]) -def test_to_dgl_node_classification_without_feature(): +def test_get_dgl_blocks_node_classification_without_feature(): # Arrange minibatch = create_homo_minibatch() minibatch.node_features = None minibatch.labels = None minibatch.seed_nodes = torch.tensor([10, 15]) # Act - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert dgl_minibatch.node_features is None - assert minibatch.edge_features is dgl_minibatch.edge_features - assert dgl_minibatch.labels is None - assert minibatch.input_nodes is dgl_minibatch.input_nodes - assert minibatch.seed_nodes is dgl_minibatch.output_nodes - check_dgl_blocks_homo(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + assert minibatch.node_features is None + assert minibatch.labels is None + check_dgl_blocks_homo(minibatch, dgl_blocks) -def test_to_dgl_node_classification_homo(): +def test_get_dgl_blocks_node_classification_homo(): # Arrange minibatch = create_homo_minibatch() minibatch.seed_nodes = torch.tensor([10, 15]) minibatch.labels = torch.tensor([2, 5]) # Act - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert minibatch.node_features is dgl_minibatch.node_features - assert minibatch.edge_features is dgl_minibatch.edge_features - assert minibatch.labels is dgl_minibatch.labels - assert dgl_minibatch.input_nodes is None - assert dgl_minibatch.output_nodes is None - check_dgl_blocks_homo(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + check_dgl_blocks_homo(minibatch, dgl_blocks) def test_to_dgl_node_classification_hetero(): minibatch = create_hetero_minibatch() minibatch.labels = {"B": torch.tensor([2, 5])} minibatch.seed_nodes = {"B": torch.tensor([10, 15])} - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert minibatch.node_features is dgl_minibatch.node_features - assert minibatch.edge_features is dgl_minibatch.edge_features - assert minibatch.labels is dgl_minibatch.labels - assert dgl_minibatch.input_nodes is None - assert dgl_minibatch.output_nodes is None - check_dgl_blocks_hetero(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + check_dgl_blocks_hetero(minibatch, dgl_blocks) @pytest.mark.parametrize("mode", ["neg_graph", "neg_src", "neg_dst"]) -def test_to_dgl_link_predication_homo(mode): +def test_dgl_link_predication_homo(mode): # Arrange minibatch = create_homo_minibatch() minibatch.compacted_node_pairs = ( @@ -673,28 +712,40 @@ def test_to_dgl_link_predication_homo(mode): if mode == "neg_graph" or mode == "neg_dst": minibatch.compacted_negative_dsts = torch.tensor([[1, 0], [0, 1]]) # Act - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert minibatch.node_features is dgl_minibatch.node_features - assert minibatch.edge_features is dgl_minibatch.edge_features - assert minibatch.compacted_node_pairs is dgl_minibatch.positive_node_pairs - check_dgl_blocks_homo(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + check_dgl_blocks_homo(minibatch, dgl_blocks) if mode == "neg_graph" or mode == "neg_src": assert torch.equal( - dgl_minibatch.negative_node_pairs[0], + minibatch.negative_node_pairs[0], minibatch.compacted_negative_srcs.view(-1), ) if mode == "neg_graph" or mode == "neg_dst": assert torch.equal( - dgl_minibatch.negative_node_pairs[1], + minibatch.negative_node_pairs[1], minibatch.compacted_negative_dsts.view(-1), ) + node_pairs, labels = minibatch.node_pairs_with_labels + if mode == "neg_src": + expect_node_pairs = ( + torch.tensor([0, 1, 0, 0, 1, 1]), + torch.tensor([1, 0, 1, 1, 0, 0]), + ) + else: + expect_node_pairs = ( + torch.tensor([0, 1, 0, 0, 1, 1]), + torch.tensor([1, 0, 1, 0, 0, 1]), + ) + expect_labels = torch.tensor([1, 1, 0, 0, 0, 0]).float() + assert torch.equal(node_pairs[0], expect_node_pairs[0]) + assert torch.equal(node_pairs[1], expect_node_pairs[1]) + assert torch.equal(labels, expect_labels) @pytest.mark.parametrize("mode", ["neg_graph", "neg_src", "neg_dst"]) -def test_to_dgl_link_predication_hetero(mode): +def test_dgl_link_predication_hetero(mode): # Arrange minibatch = create_hetero_minibatch() minibatch.compacted_node_pairs = { @@ -718,24 +769,21 @@ def test_to_dgl_link_predication_hetero(mode): reverse_relation: torch.tensor([[2, 1], [3, 1]]), } # Act - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert minibatch.node_features is dgl_minibatch.node_features - assert minibatch.edge_features is dgl_minibatch.edge_features - assert minibatch.compacted_node_pairs is dgl_minibatch.positive_node_pairs - check_dgl_blocks_hetero(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + check_dgl_blocks_hetero(minibatch, dgl_blocks) if mode == "neg_graph" or mode == "neg_src": for etype, src in minibatch.compacted_negative_srcs.items(): assert torch.equal( - dgl_minibatch.negative_node_pairs[etype][0], + minibatch.negative_node_pairs[etype][0], src.view(-1), ) if mode == "neg_graph" or mode == "neg_dst": for etype, dst in minibatch.compacted_negative_dsts.items(): assert torch.equal( - dgl_minibatch.negative_node_pairs[etype][1], + minibatch.negative_node_pairs[etype][1], minibatch.compacted_negative_dsts[etype].view(-1), ) @@ -925,61 +973,49 @@ def check_dgl_blocks_homo_csc_format(minibatch, blocks): ), print(blocks[0].srcdata[dgl.NID]) -def test_to_dgl_node_classification_without_feature_csc_format(): +def test_dgl_node_classification_without_feature_csc_format(): # Arrange minibatch = create_homo_minibatch_csc_format() minibatch.node_features = None minibatch.labels = None minibatch.seed_nodes = torch.tensor([10, 15]) # Act - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert dgl_minibatch.node_features is None - assert minibatch.edge_features is dgl_minibatch.edge_features - assert dgl_minibatch.labels is None - assert minibatch.input_nodes is dgl_minibatch.input_nodes - assert minibatch.seed_nodes is dgl_minibatch.output_nodes - check_dgl_blocks_homo_csc_format(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + assert minibatch.node_features is None + assert minibatch.labels is None + check_dgl_blocks_homo_csc_format(minibatch, dgl_blocks) -def test_to_dgl_node_classification_homo_csc_format(): +def test_dgl_node_classification_homo_csc_format(): # Arrange minibatch = create_homo_minibatch_csc_format() minibatch.seed_nodes = torch.tensor([10, 15]) minibatch.labels = torch.tensor([2, 5]) # Act - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert minibatch.node_features is dgl_minibatch.node_features - assert minibatch.edge_features is dgl_minibatch.edge_features - assert minibatch.labels is dgl_minibatch.labels - assert dgl_minibatch.input_nodes is None - assert dgl_minibatch.output_nodes is None - check_dgl_blocks_homo_csc_format(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + check_dgl_blocks_homo_csc_format(minibatch, dgl_blocks) -def test_to_dgl_node_classification_hetero_csc_format(): +def test_dgl_node_classification_hetero_csc_format(): minibatch = create_hetero_minibatch_csc_format() minibatch.labels = {"B": torch.tensor([2, 5])} minibatch.seed_nodes = {"B": torch.tensor([10, 15])} - dgl_minibatch = minibatch.to_dgl() + # Act + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert minibatch.node_features is dgl_minibatch.node_features - assert minibatch.edge_features is dgl_minibatch.edge_features - assert minibatch.labels is dgl_minibatch.labels - assert dgl_minibatch.input_nodes is None - assert dgl_minibatch.output_nodes is None - check_dgl_blocks_hetero_csc_format(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + check_dgl_blocks_hetero_csc_format(minibatch, dgl_blocks) @pytest.mark.parametrize("mode", ["neg_graph", "neg_src", "neg_dst"]) -def test_to_dgl_link_predication_homo_csc_format(mode): +def test_dgl_link_predication_homo_csc_format(mode): # Arrange minibatch = create_homo_minibatch_csc_format() minibatch.compacted_node_pairs = ( @@ -991,28 +1027,43 @@ def test_to_dgl_link_predication_homo_csc_format(mode): if mode == "neg_graph" or mode == "neg_dst": minibatch.compacted_negative_dsts = torch.tensor([[1, 0], [0, 1]]) # Act - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert minibatch.node_features is dgl_minibatch.node_features - assert minibatch.edge_features is dgl_minibatch.edge_features - assert minibatch.compacted_node_pairs is dgl_minibatch.positive_node_pairs - check_dgl_blocks_homo_csc_format(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + check_dgl_blocks_homo_csc_format(minibatch, dgl_blocks) if mode == "neg_graph" or mode == "neg_src": assert torch.equal( - dgl_minibatch.negative_node_pairs[0], + minibatch.negative_node_pairs[0], minibatch.compacted_negative_srcs.view(-1), ) if mode == "neg_graph" or mode == "neg_dst": assert torch.equal( - dgl_minibatch.negative_node_pairs[1], + minibatch.negative_node_pairs[1], minibatch.compacted_negative_dsts.view(-1), ) + ( + node_pairs, + labels, + ) = minibatch.node_pairs_with_labels + if mode == "neg_src": + expect_node_pairs = ( + torch.tensor([0, 1, 0, 0, 1, 1]), + torch.tensor([1, 0, 1, 1, 0, 0]), + ) + else: + expect_node_pairs = ( + torch.tensor([0, 1, 0, 0, 1, 1]), + torch.tensor([1, 0, 1, 0, 0, 1]), + ) + expect_labels = torch.tensor([1, 1, 0, 0, 0, 0]).float() + assert torch.equal(node_pairs[0], expect_node_pairs[0]) + assert torch.equal(node_pairs[1], expect_node_pairs[1]) + assert torch.equal(labels, expect_labels) @pytest.mark.parametrize("mode", ["neg_graph", "neg_src", "neg_dst"]) -def test_to_dgl_link_predication_hetero_csc_format(mode): +def test_dgl_link_predication_hetero_csc_format(mode): # Arrange minibatch = create_hetero_minibatch_csc_format() minibatch.compacted_node_pairs = { @@ -1036,23 +1087,20 @@ def test_to_dgl_link_predication_hetero_csc_format(mode): reverse_relation: torch.tensor([[2, 1], [3, 1]]), } # Act - dgl_minibatch = minibatch.to_dgl() + dgl_blocks = minibatch.blocks # Assert - assert len(dgl_minibatch.blocks) == 2 - assert minibatch.node_features is dgl_minibatch.node_features - assert minibatch.edge_features is dgl_minibatch.edge_features - assert minibatch.compacted_node_pairs is dgl_minibatch.positive_node_pairs - check_dgl_blocks_hetero_csc_format(minibatch, dgl_minibatch.blocks) + assert len(dgl_blocks) == 2 + check_dgl_blocks_hetero_csc_format(minibatch, dgl_blocks) if mode == "neg_graph" or mode == "neg_src": for etype, src in minibatch.compacted_negative_srcs.items(): assert torch.equal( - dgl_minibatch.negative_node_pairs[etype][0], + minibatch.negative_node_pairs[etype][0], src.view(-1), ) if mode == "neg_graph" or mode == "neg_dst": for etype, dst in minibatch.compacted_negative_dsts.items(): assert torch.equal( - dgl_minibatch.negative_node_pairs[etype][1], + minibatch.negative_node_pairs[etype][1], minibatch.compacted_negative_dsts[etype].view(-1), ) diff --git a/tests/python/pytorch/graphbolt/impl/test_ondisk_dataset.py b/tests/python/pytorch/graphbolt/impl/test_ondisk_dataset.py index dbf96c3e53fe..0764ac9eb266 100644 --- a/tests/python/pytorch/graphbolt/impl/test_ondisk_dataset.py +++ b/tests/python/pytorch/graphbolt/impl/test_ondisk_dataset.py @@ -2086,7 +2086,6 @@ def test_OnDiskDataset_homogeneous(include_original_edge_id): datapipe = datapipe.fetch_feature( dataset.feature, node_feature_keys=["feat"] ) - datapipe = datapipe.to_dgl() dataloader = gb.DataLoader(datapipe) for _ in dataloader: pass @@ -2158,7 +2157,6 @@ def test_OnDiskDataset_heterogeneous(include_original_edge_id): datapipe = datapipe.fetch_feature( dataset.feature, node_feature_keys={"user": ["feat"]} ) - datapipe = datapipe.to_dgl() dataloader = gb.DataLoader(datapipe) for _ in dataloader: pass diff --git a/tests/python/pytorch/graphbolt/test_base.py b/tests/python/pytorch/graphbolt/test_base.py index 51fd4ad2e01e..9500a1d32e55 100644 --- a/tests/python/pytorch/graphbolt/test_base.py +++ b/tests/python/pytorch/graphbolt/test_base.py @@ -67,9 +67,6 @@ def test_data_device(datapipe): # Invoke CopyTo via functional form. test_data_device(datapipe.copy_to("cuda")) - # Test for DGLMiniBatch. - datapipe = gb.DGLMiniBatchConverter(datapipe) - # Invoke CopyTo via class constructor. test_data_device(gb.CopyTo(datapipe, "cuda")) diff --git a/tests/python/pytorch/graphbolt/test_feature_fetcher.py b/tests/python/pytorch/graphbolt/test_feature_fetcher.py index 3cf8ab1ed5d1..bfbf5da646cf 100644 --- a/tests/python/pytorch/graphbolt/test_feature_fetcher.py +++ b/tests/python/pytorch/graphbolt/test_feature_fetcher.py @@ -14,10 +14,7 @@ class MiniBatchType(Enum): DGLMiniBatch = 2 -@pytest.mark.parametrize( - "minibatch_type", [MiniBatchType.MiniBatch, MiniBatchType.DGLMiniBatch] -) -def test_FeatureFetcher_invoke(minibatch_type): +def test_FeatureFetcher_invoke(): # Prepare graph and required datapipes. graph = gb_test_utils.rand_csc_graph(20, 0.15, bidirection_edge=True) a = torch.tensor( @@ -40,8 +37,6 @@ def test_FeatureFetcher_invoke(minibatch_type): # Invoke FeatureFetcher via class constructor. datapipe = gb.NeighborSampler(item_sampler, graph, fanouts) - if minibatch_type == MiniBatchType.DGLMiniBatch: - datapipe = datapipe.to_dgl() datapipe = gb.FeatureFetcher(datapipe, feature_store, ["a"], ["b"]) assert len(list(datapipe)) == 5 @@ -53,10 +48,7 @@ def test_FeatureFetcher_invoke(minibatch_type): assert len(list(datapipe)) == 5 -@pytest.mark.parametrize( - "minibatch_type", [MiniBatchType.MiniBatch, MiniBatchType.DGLMiniBatch] -) -def test_FeatureFetcher_homo(minibatch_type): +def test_FeatureFetcher_homo(): graph = gb_test_utils.rand_csc_graph(20, 0.15, bidirection_edge=True) a = torch.tensor( [[random.randint(0, 10)] for _ in range(graph.total_num_nodes)] @@ -76,17 +68,12 @@ def test_FeatureFetcher_homo(minibatch_type): num_layer = 2 fanouts = [torch.LongTensor([2]) for _ in range(num_layer)] sampler_dp = gb.NeighborSampler(item_sampler, graph, fanouts) - if minibatch_type == MiniBatchType.DGLMiniBatch: - sampler_dp = sampler_dp.to_dgl() fetcher_dp = gb.FeatureFetcher(sampler_dp, feature_store, ["a"], ["b"]) assert len(list(fetcher_dp)) == 5 -@pytest.mark.parametrize( - "minibatch_type", [MiniBatchType.MiniBatch, MiniBatchType.DGLMiniBatch] -) -def test_FeatureFetcher_with_edges_homo(minibatch_type): +def test_FeatureFetcher_with_edges_homo(): graph = gb_test_utils.rand_csc_graph(20, 0.15, bidirection_edge=True) a = torch.tensor( [[random.randint(0, 10)] for _ in range(graph.total_num_nodes)] @@ -121,8 +108,6 @@ def add_node_and_edge_ids(seeds): itemset = gb.ItemSet(torch.arange(10)) item_sampler_dp = gb.ItemSampler(itemset, batch_size=2) converter_dp = Mapper(item_sampler_dp, add_node_and_edge_ids) - if minibatch_type == MiniBatchType.DGLMiniBatch: - converter_dp = converter_dp.to_dgl() fetcher_dp = gb.FeatureFetcher(converter_dp, feature_store, ["a"], ["b"]) assert len(list(fetcher_dp)) == 5 @@ -155,10 +140,7 @@ def get_hetero_graph(): ) -@pytest.mark.parametrize( - "minibatch_type", [MiniBatchType.MiniBatch, MiniBatchType.DGLMiniBatch] -) -def test_FeatureFetcher_hetero(minibatch_type): +def test_FeatureFetcher_hetero(): graph = get_hetero_graph() a = torch.tensor([[random.randint(0, 10)] for _ in range(2)]) b = torch.tensor([[random.randint(0, 10)] for _ in range(3)]) @@ -179,8 +161,6 @@ def test_FeatureFetcher_hetero(minibatch_type): num_layer = 2 fanouts = [torch.LongTensor([2]) for _ in range(num_layer)] sampler_dp = gb.NeighborSampler(item_sampler, graph, fanouts) - if minibatch_type == MiniBatchType.DGLMiniBatch: - sampler_dp = sampler_dp.to_dgl() fetcher_dp = gb.FeatureFetcher( sampler_dp, feature_store, {"n1": ["a"], "n2": ["a"]} ) @@ -188,10 +168,7 @@ def test_FeatureFetcher_hetero(minibatch_type): assert len(list(fetcher_dp)) == 3 -@pytest.mark.parametrize( - "minibatch_type", [MiniBatchType.MiniBatch, MiniBatchType.DGLMiniBatch] -) -def test_FeatureFetcher_with_edges_hetero(minibatch_type): +def test_FeatureFetcher_with_edges_hetero(): a = torch.tensor([[random.randint(0, 10)] for _ in range(20)]) b = torch.tensor([[random.randint(0, 10)] for _ in range(50)]) @@ -243,8 +220,6 @@ def add_node_and_edge_ids(seeds): ) item_sampler_dp = gb.ItemSampler(itemset, batch_size=2) converter_dp = Mapper(item_sampler_dp, add_node_and_edge_ids) - if minibatch_type == MiniBatchType.DGLMiniBatch: - converter_dp = converter_dp.to_dgl() fetcher_dp = gb.FeatureFetcher( converter_dp, feature_store, {"n1": ["a"]}, {"n1:e1:n2": ["a"]} ) diff --git a/tests/python/pytorch/graphbolt/test_integration.py b/tests/python/pytorch/graphbolt/test_integration.py index 496f9d3b5bf5..79c557fc5877 100644 --- a/tests/python/pytorch/graphbolt/test_integration.py +++ b/tests/python/pytorch/graphbolt/test_integration.py @@ -55,65 +55,152 @@ def test_integration_link_prediction(): datapipe = datapipe.fetch_feature( feature_store, node_feature_keys=["feat"], edge_feature_keys=["feat"] ) - datapipe = datapipe.to_dgl() dataloader = gb.DataLoader( datapipe, ) expected = [ str( - """DGLMiniBatch(positive_node_pairs=(tensor([0, 1, 1, 1]), - tensor([2, 3, 3, 1])), - output_nodes=None, - node_features={'feat': tensor([[0.5160, 0.2486], - [0.8672, 0.2276], - [0.6172, 0.7865], - [0.2109, 0.1089], - [0.9634, 0.2294], - [0.5503, 0.8223]])}, - negative_node_pairs=(tensor([0, 1, 1, 1]), - tensor([4, 4, 1, 4])), - labels=None, - input_nodes=None, - edge_features=[{}, - {}], - blocks=[Block(num_src_nodes=6, num_dst_nodes=6, num_edges=2), - Block(num_src_nodes=6, num_dst_nodes=5, num_edges=1)], - )""" + """MiniBatch(seed_nodes=None, + sampled_subgraphs=[FusedSampledSubgraphImpl(original_row_node_ids=tensor([5, 3, 1, 2, 0, 4]), + original_edge_ids=None, + original_column_node_ids=tensor([5, 3, 1, 2, 0, 4]), + node_pairs=(tensor([5, 4]), tensor([0, 5])), + ), + FusedSampledSubgraphImpl(original_row_node_ids=tensor([5, 3, 1, 2, 0, 4]), + original_edge_ids=None, + original_column_node_ids=tensor([5, 3, 1, 2, 0]), + node_pairs=(tensor([5]), tensor([0])), + )], + positive_node_pairs=(tensor([0, 1, 1, 1]), + tensor([2, 3, 3, 1])), + node_pairs_with_labels=((tensor([0, 1, 1, 1, 0, 1, 1, 1]), tensor([2, 3, 3, 1, 4, 4, 1, 4])), + tensor([1., 1., 1., 1., 0., 0., 0., 0.])), + node_pairs=(tensor([5, 3, 3, 3]), + tensor([1, 2, 2, 3])), + node_features={'feat': tensor([[0.5160, 0.2486], + [0.8672, 0.2276], + [0.6172, 0.7865], + [0.2109, 0.1089], + [0.9634, 0.2294], + [0.5503, 0.8223]])}, + negative_srcs=tensor([[5], + [3], + [3], + [3]]), + negative_node_pairs=(tensor([0, 1, 1, 1]), + tensor([4, 4, 1, 4])), + negative_dsts=tensor([[0], + [0], + [3], + [0]]), + labels=None, + input_nodes=tensor([5, 3, 1, 2, 0, 4]), + edge_features=[{}, + {}], + compacted_node_pairs=(tensor([0, 1, 1, 1]), + tensor([2, 3, 3, 1])), + compacted_negative_srcs=tensor([[0], + [1], + [1], + [1]]), + compacted_negative_dsts=tensor([[4], + [4], + [1], + [4]]), + blocks=[Block(num_src_nodes=6, num_dst_nodes=6, num_edges=2), + Block(num_src_nodes=6, num_dst_nodes=5, num_edges=1)], + )""" ), str( - """DGLMiniBatch(positive_node_pairs=(tensor([0, 1, 1, 2]), - tensor([0, 0, 1, 1])), - output_nodes=None, - node_features={'feat': tensor([[0.8672, 0.2276], - [0.5503, 0.8223], - [0.9634, 0.2294], - [0.5160, 0.2486], - [0.6172, 0.7865]])}, - negative_node_pairs=(tensor([0, 1, 1, 2]), - tensor([1, 1, 3, 4])), - labels=None, - input_nodes=None, - edge_features=[{}, - {}], - blocks=[Block(num_src_nodes=5, num_dst_nodes=5, num_edges=2), - Block(num_src_nodes=5, num_dst_nodes=5, num_edges=2)], - )""" + """MiniBatch(seed_nodes=None, + sampled_subgraphs=[FusedSampledSubgraphImpl(original_row_node_ids=tensor([3, 4, 0, 5, 1]), + original_edge_ids=None, + original_column_node_ids=tensor([3, 4, 0, 5, 1]), + node_pairs=(tensor([1, 3]), tensor([3, 4])), + ), + FusedSampledSubgraphImpl(original_row_node_ids=tensor([3, 4, 0, 5, 1]), + original_edge_ids=None, + original_column_node_ids=tensor([3, 4, 0, 5, 1]), + node_pairs=(tensor([1, 3]), tensor([3, 4])), + )], + positive_node_pairs=(tensor([0, 1, 1, 2]), + tensor([0, 0, 1, 1])), + node_pairs_with_labels=((tensor([0, 1, 1, 2, 0, 1, 1, 2]), tensor([0, 0, 1, 1, 1, 1, 3, 4])), + tensor([1., 1., 1., 1., 0., 0., 0., 0.])), + node_pairs=(tensor([3, 4, 4, 0]), + tensor([3, 3, 4, 4])), + node_features={'feat': tensor([[0.8672, 0.2276], + [0.5503, 0.8223], + [0.9634, 0.2294], + [0.5160, 0.2486], + [0.6172, 0.7865]])}, + negative_srcs=tensor([[3], + [4], + [4], + [0]]), + negative_node_pairs=(tensor([0, 1, 1, 2]), + tensor([1, 1, 3, 4])), + negative_dsts=tensor([[4], + [4], + [5], + [1]]), + labels=None, + input_nodes=tensor([3, 4, 0, 5, 1]), + edge_features=[{}, + {}], + compacted_node_pairs=(tensor([0, 1, 1, 2]), + tensor([0, 0, 1, 1])), + compacted_negative_srcs=tensor([[0], + [1], + [1], + [2]]), + compacted_negative_dsts=tensor([[1], + [1], + [3], + [4]]), + blocks=[Block(num_src_nodes=5, num_dst_nodes=5, num_edges=2), + Block(num_src_nodes=5, num_dst_nodes=5, num_edges=2)], + )""" ), str( - """DGLMiniBatch(positive_node_pairs=(tensor([0, 1]), - tensor([0, 0])), - output_nodes=None, - node_features={'feat': tensor([[0.5160, 0.2486], - [0.5503, 0.8223]])}, - negative_node_pairs=(tensor([0, 1]), - tensor([0, 0])), - labels=None, - input_nodes=None, - edge_features=[{}, - {}], - blocks=[Block(num_src_nodes=2, num_dst_nodes=2, num_edges=1), - Block(num_src_nodes=2, num_dst_nodes=2, num_edges=1)], - )""" + """MiniBatch(seed_nodes=None, + sampled_subgraphs=[FusedSampledSubgraphImpl(original_row_node_ids=tensor([5, 4]), + original_edge_ids=None, + original_column_node_ids=tensor([5, 4]), + node_pairs=(tensor([1]), tensor([1])), + ), + FusedSampledSubgraphImpl(original_row_node_ids=tensor([5, 4]), + original_edge_ids=None, + original_column_node_ids=tensor([5, 4]), + node_pairs=(tensor([1]), tensor([1])), + )], + positive_node_pairs=(tensor([0, 1]), + tensor([0, 0])), + node_pairs_with_labels=((tensor([0, 1, 0, 1]), tensor([0, 0, 0, 0])), + tensor([1., 1., 0., 0.])), + node_pairs=(tensor([5, 4]), + tensor([5, 5])), + node_features={'feat': tensor([[0.5160, 0.2486], + [0.5503, 0.8223]])}, + negative_srcs=tensor([[5], + [4]]), + negative_node_pairs=(tensor([0, 1]), + tensor([0, 0])), + negative_dsts=tensor([[5], + [5]]), + labels=None, + input_nodes=tensor([5, 4]), + edge_features=[{}, + {}], + compacted_node_pairs=(tensor([0, 1]), + tensor([0, 0])), + compacted_negative_srcs=tensor([[0], + [1]]), + compacted_negative_dsts=tensor([[0], + [0]]), + blocks=[Block(num_src_nodes=2, num_dst_nodes=2, num_edges=1), + Block(num_src_nodes=2, num_dst_nodes=2, num_edges=1)], + )""" ), ] for step, data in enumerate(dataloader): @@ -169,60 +256,116 @@ def test_integration_node_classification(): datapipe = datapipe.fetch_feature( feature_store, node_feature_keys=["feat"], edge_feature_keys=["feat"] ) - datapipe = datapipe.to_dgl() dataloader = gb.DataLoader( datapipe, ) expected = [ str( - """DGLMiniBatch(positive_node_pairs=(tensor([0, 1, 1, 1]), - tensor([2, 3, 3, 1])), - output_nodes=None, - node_features={'feat': tensor([[0.5160, 0.2486], - [0.8672, 0.2276], - [0.6172, 0.7865], - [0.2109, 0.1089], - [0.5503, 0.8223]])}, - negative_node_pairs=None, - labels=None, - input_nodes=None, - edge_features=[{}, - {}], - blocks=[Block(num_src_nodes=5, num_dst_nodes=4, num_edges=4), - Block(num_src_nodes=4, num_dst_nodes=4, num_edges=4)], - )""" + """MiniBatch(seed_nodes=None, + sampled_subgraphs=[FusedSampledSubgraphImpl(original_row_node_ids=tensor([5, 3, 1, 2, 4]), + original_edge_ids=None, + original_column_node_ids=tensor([5, 3, 1, 2]), + node_pairs=(tensor([4, 1, 0, 1]), tensor([0, 1, 2, 3])), + ), + FusedSampledSubgraphImpl(original_row_node_ids=tensor([5, 3, 1, 2]), + original_edge_ids=None, + original_column_node_ids=tensor([5, 3, 1, 2]), + node_pairs=(tensor([0, 1, 0, 1]), tensor([0, 1, 2, 3])), + )], + positive_node_pairs=(tensor([0, 1, 1, 1]), + tensor([2, 3, 3, 1])), + node_pairs_with_labels=None, + node_pairs=(tensor([5, 3, 3, 3]), + tensor([1, 2, 2, 3])), + node_features={'feat': tensor([[0.5160, 0.2486], + [0.8672, 0.2276], + [0.6172, 0.7865], + [0.2109, 0.1089], + [0.5503, 0.8223]])}, + negative_srcs=None, + negative_node_pairs=None, + negative_dsts=None, + labels=None, + input_nodes=tensor([5, 3, 1, 2, 4]), + edge_features=[{}, + {}], + compacted_node_pairs=(tensor([0, 1, 1, 1]), + tensor([2, 3, 3, 1])), + compacted_negative_srcs=None, + compacted_negative_dsts=None, + blocks=[Block(num_src_nodes=5, num_dst_nodes=4, num_edges=4), + Block(num_src_nodes=4, num_dst_nodes=4, num_edges=4)], + )""" ), str( - """DGLMiniBatch(positive_node_pairs=(tensor([0, 1, 1, 2]), - tensor([0, 0, 1, 1])), - output_nodes=None, - node_features={'feat': tensor([[0.8672, 0.2276], - [0.5503, 0.8223], - [0.9634, 0.2294]])}, - negative_node_pairs=None, - labels=None, - input_nodes=None, - edge_features=[{}, - {}], - blocks=[Block(num_src_nodes=3, num_dst_nodes=3, num_edges=2), - Block(num_src_nodes=3, num_dst_nodes=3, num_edges=2)], - )""" + """MiniBatch(seed_nodes=None, + sampled_subgraphs=[FusedSampledSubgraphImpl(original_row_node_ids=tensor([3, 4, 0]), + original_edge_ids=None, + original_column_node_ids=tensor([3, 4, 0]), + node_pairs=(tensor([0, 2]), tensor([0, 1])), + ), + FusedSampledSubgraphImpl(original_row_node_ids=tensor([3, 4, 0]), + original_edge_ids=None, + original_column_node_ids=tensor([3, 4, 0]), + node_pairs=(tensor([0, 2]), tensor([0, 1])), + )], + positive_node_pairs=(tensor([0, 1, 1, 2]), + tensor([0, 0, 1, 1])), + node_pairs_with_labels=None, + node_pairs=(tensor([3, 4, 4, 0]), + tensor([3, 3, 4, 4])), + node_features={'feat': tensor([[0.8672, 0.2276], + [0.5503, 0.8223], + [0.9634, 0.2294]])}, + negative_srcs=None, + negative_node_pairs=None, + negative_dsts=None, + labels=None, + input_nodes=tensor([3, 4, 0]), + edge_features=[{}, + {}], + compacted_node_pairs=(tensor([0, 1, 1, 2]), + tensor([0, 0, 1, 1])), + compacted_negative_srcs=None, + compacted_negative_dsts=None, + blocks=[Block(num_src_nodes=3, num_dst_nodes=3, num_edges=2), + Block(num_src_nodes=3, num_dst_nodes=3, num_edges=2)], + )""" ), str( - """DGLMiniBatch(positive_node_pairs=(tensor([0, 1]), - tensor([0, 0])), - output_nodes=None, - node_features={'feat': tensor([[0.5160, 0.2486], - [0.5503, 0.8223], - [0.9634, 0.2294]])}, - negative_node_pairs=None, - labels=None, - input_nodes=None, - edge_features=[{}, - {}], - blocks=[Block(num_src_nodes=3, num_dst_nodes=2, num_edges=2), - Block(num_src_nodes=2, num_dst_nodes=2, num_edges=2)], - )""" + """MiniBatch(seed_nodes=None, + sampled_subgraphs=[FusedSampledSubgraphImpl(original_row_node_ids=tensor([5, 4, 0]), + original_edge_ids=None, + original_column_node_ids=tensor([5, 4]), + node_pairs=(tensor([0, 2]), tensor([0, 1])), + ), + FusedSampledSubgraphImpl(original_row_node_ids=tensor([5, 4]), + original_edge_ids=None, + original_column_node_ids=tensor([5, 4]), + node_pairs=(tensor([1, 1]), tensor([0, 1])), + )], + positive_node_pairs=(tensor([0, 1]), + tensor([0, 0])), + node_pairs_with_labels=None, + node_pairs=(tensor([5, 4]), + tensor([5, 5])), + node_features={'feat': tensor([[0.5160, 0.2486], + [0.5503, 0.8223], + [0.9634, 0.2294]])}, + negative_srcs=None, + negative_node_pairs=None, + negative_dsts=None, + labels=None, + input_nodes=tensor([5, 4, 0]), + edge_features=[{}, + {}], + compacted_node_pairs=(tensor([0, 1]), + tensor([0, 0])), + compacted_negative_srcs=None, + compacted_negative_dsts=None, + blocks=[Block(num_src_nodes=3, num_dst_nodes=2, num_edges=2), + Block(num_src_nodes=2, num_dst_nodes=2, num_edges=2)], + )""" ), ] for step, data in enumerate(dataloader): diff --git a/tests/python/pytorch/graphbolt/test_minibatch_transformer.py b/tests/python/pytorch/graphbolt/test_minibatch_transformer.py deleted file mode 100644 index b233fd1e7fe0..000000000000 --- a/tests/python/pytorch/graphbolt/test_minibatch_transformer.py +++ /dev/null @@ -1,34 +0,0 @@ -import dgl.graphbolt as gb -import torch - -from . import gb_test_utils - - -def test_dgl_minibatch_converter(): - N = 32 - B = 4 - itemset = gb.ItemSet(torch.arange(N), names="seed_nodes") - graph = gb_test_utils.rand_csc_graph(200, 0.15, bidirection_edge=True) - - features = {} - keys = [("node", None, "a"), ("node", None, "b")] - features[keys[0]] = gb.TorchBasedFeature(torch.randn(200, 4)) - features[keys[1]] = gb.TorchBasedFeature(torch.randn(200, 4)) - feature_store = gb.BasicFeatureStore(features) - - item_sampler = gb.ItemSampler(itemset, batch_size=B) - subgraph_sampler = gb.NeighborSampler( - item_sampler, - graph, - fanouts=[torch.LongTensor([2]) for _ in range(2)], - ) - feature_fetcher = gb.FeatureFetcher( - subgraph_sampler, - feature_store, - ["a"], - ) - dgl_converter = gb.DGLMiniBatchConverter(feature_fetcher) - dataloader = gb.DataLoader(dgl_converter) - assert len(list(dataloader)) == N // B - minibatch = next(iter(dataloader)) - assert isinstance(minibatch, gb.DGLMiniBatch)