Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) [core][compiled graphs] Unify code paths for NCCL P2P and collectives scheduling #48649

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d16ba2c
refactor: Add compute aio
dengwxn Oct 27, 2024
5b226f3
refactor: Separate nccl_read, nccl_write, and compute
dengwxn Oct 30, 2024
c04ea86
chore: Apply CL
dengwxn Oct 30, 2024
1f43ce9
test: Fix messages
dengwxn Nov 4, 2024
d4d8764
merge: Upstream master
AndyUB Nov 5, 2024
252789a
fix: Test error message
AndyUB Nov 5, 2024
d88b19c
merge: Upstream master
AndyUB Nov 7, 2024
767eaf4
fix: Merge errors
AndyUB Nov 7, 2024
d355e94
fix: Execution schedule GPU tests
AndyUB Nov 8, 2024
bf28397
chore: Format code
AndyUB Nov 8, 2024
941cb73
refactor: Separate NCCL_COLLECTIVE and COMPUTE
AndyUB Nov 9, 2024
2867556
Revert "refactor: Separate NCCL_COLLECTIVE and COMPUTE"
AndyUB Nov 9, 2024
9259b2e
(WIP) refactor: Add NCCL send/recv nodes
AndyUB Nov 10, 2024
a7507cb
(WIP) refactor: Refactor DAGOperationGraphNode
AndyUB Nov 11, 2024
f980026
(WIP) refactor: Tests
AndyUB Nov 11, 2024
2dc06ec
(WIP) chore: Cleanup
AndyUB Nov 11, 2024
4045f34
(WIP) chore: Add comments for test_execution_schedule_gpu
AndyUB Nov 16, 2024
f3cf414
merge: Upstream master
AndyUB Nov 17, 2024
bd7d556
(WIP) refactor: Remove DAG node operation type; add synchronous group
AndyUB Nov 17, 2024
fb2f20d
(WIP) experimental: Add P2P nodes in _add_node (failed)
AndyUB Nov 17, 2024
8d7f01c
(WIP) revert: Reuse _add_nccl_p2p_nodes; chore: Add comments
AndyUB Nov 17, 2024
4d19546
(WIP) refactor: Keep relative bind index order; fix: test_execution_s…
AndyUB Nov 17, 2024
3c3a697
(WIP) experimental: Mock test execution schedule gpu
AndyUB Nov 17, 2024
0693df3
revert: Mock test execution schedule gpu; chore: Comments
AndyUB Nov 17, 2024
a4e6287
chore: Deduplicate error checking; remove dead code
AndyUB Nov 17, 2024
3b6b968
chore: Reorg imports
AndyUB Nov 18, 2024
2a4f932
refactor: Decouple graph and sync group
AndyUB Nov 24, 2024
fa07751
experimental: Pass futures across DAG nodes
AndyUB Nov 25, 2024
d743107
(WIP) fix: Pass future
AndyUB Nov 26, 2024
971cfe3
fix: Skip compute for NCCL read
AndyUB Nov 26, 2024
6ce0f00
fix: Resolve future outside of process_return_vals; refactor: Combine…
AndyUB Nov 27, 2024
9ae354e
chore: Rename vars, mark is_ready as property
AndyUB Nov 27, 2024
f60555a
polish: Add comments
AndyUB Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions python/ray/dag/collective_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
raise ValueError(
"Expected actor handles to match the custom NCCL group"
)
self._output_nodes: List[DAGNode] = []
AndyUB marked this conversation as resolved.
Show resolved Hide resolved

def __str__(self) -> str:
return (
Expand All @@ -88,6 +89,13 @@ def actor_handles(self) -> List["ray.actor.ActorHandle"]:
def type_hint(self) -> TorchTensorType:
return self._type_hint

def _add_output_node(self, output_node: "CollectiveOutputNode"):
self._output_nodes.append(output_node)

@property
def output_nodes(self) -> List["CollectiveOutputNode"]:
return self._output_nodes

def init_nccl_group(self, nccl_group_id: Optional[str] = None) -> str:
"""
Initialize the NCCL group if it has not been initialized yet. If `nccl_group_id`
Expand Down Expand Up @@ -142,6 +150,14 @@ def __init__(
method_options: Dict[str, Any],
other_args_to_resolve: Dict[str, Any],
):
super().__init__(
method_name,
method_args,
method_kwargs,
method_options,
other_args_to_resolve,
)

# Parse the input node.
if not (
isinstance(method_args, tuple)
Expand All @@ -156,14 +172,8 @@ def __init__(
)
if self._collective_op is None:
raise ValueError("Expected a collective operation")

super().__init__(
method_name,
method_args,
method_kwargs,
method_options,
other_args_to_resolve,
)
self.collective_op._add_output_node(self)
self.set_requires_nccl_collective(True)

def _copy_impl(
self,
Expand All @@ -188,3 +198,7 @@ def _execute_impl(self, *args, **kwargs):
@property
def collective_op(self) -> _CollectiveOperation:
return self._collective_op

@property
def synchronous_peers(self) -> List["CollectiveOutputNode"]:
AndyUB marked this conversation as resolved.
Show resolved Hide resolved
return self._collective_op.output_nodes
Loading