Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) [core][compiled graphs] Unify code paths for NCCL P2P and collectives scheduling #48649

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d16ba2c
refactor: Add compute aio
dengwxn Oct 27, 2024
5b226f3
refactor: Separate nccl_read, nccl_write, and compute
dengwxn Oct 30, 2024
c04ea86
chore: Apply CL
dengwxn Oct 30, 2024
1f43ce9
test: Fix messages
dengwxn Nov 4, 2024
d4d8764
merge: Upstream master
AndyUB Nov 5, 2024
252789a
fix: Test error message
AndyUB Nov 5, 2024
d88b19c
merge: Upstream master
AndyUB Nov 7, 2024
767eaf4
fix: Merge errors
AndyUB Nov 7, 2024
d355e94
fix: Execution schedule GPU tests
AndyUB Nov 8, 2024
bf28397
chore: Format code
AndyUB Nov 8, 2024
941cb73
refactor: Separate NCCL_COLLECTIVE and COMPUTE
AndyUB Nov 9, 2024
2867556
Revert "refactor: Separate NCCL_COLLECTIVE and COMPUTE"
AndyUB Nov 9, 2024
9259b2e
(WIP) refactor: Add NCCL send/recv nodes
AndyUB Nov 10, 2024
a7507cb
(WIP) refactor: Refactor DAGOperationGraphNode
AndyUB Nov 11, 2024
f980026
(WIP) refactor: Tests
AndyUB Nov 11, 2024
2dc06ec
(WIP) chore: Cleanup
AndyUB Nov 11, 2024
4045f34
(WIP) chore: Add comments for test_execution_schedule_gpu
AndyUB Nov 16, 2024
f3cf414
merge: Upstream master
AndyUB Nov 17, 2024
bd7d556
(WIP) refactor: Remove DAG node operation type; add synchronous group
AndyUB Nov 17, 2024
fb2f20d
(WIP) experimental: Add P2P nodes in _add_node (failed)
AndyUB Nov 17, 2024
8d7f01c
(WIP) revert: Reuse _add_nccl_p2p_nodes; chore: Add comments
AndyUB Nov 17, 2024
4d19546
(WIP) refactor: Keep relative bind index order; fix: test_execution_s…
AndyUB Nov 17, 2024
3c3a697
(WIP) experimental: Mock test execution schedule gpu
AndyUB Nov 17, 2024
0693df3
revert: Mock test execution schedule gpu; chore: Comments
AndyUB Nov 17, 2024
a4e6287
chore: Deduplicate error checking; remove dead code
AndyUB Nov 17, 2024
3b6b968
chore: Reorg imports
AndyUB Nov 18, 2024
2a4f932
refactor: Decouple graph and sync group
AndyUB Nov 24, 2024
fa07751
experimental: Pass futures across DAG nodes
AndyUB Nov 25, 2024
d743107
(WIP) fix: Pass future
AndyUB Nov 26, 2024
971cfe3
fix: Skip compute for NCCL read
AndyUB Nov 26, 2024
6ce0f00
fix: Resolve future outside of process_return_vals; refactor: Combine…
AndyUB Nov 27, 2024
9ae354e
chore: Rename vars, mark is_ready as property
AndyUB Nov 27, 2024
f60555a
polish: Add comments
AndyUB Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/ray/dag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
PREV_CLASS_METHOD_CALL_KEY,
BIND_INDEX_KEY,
IS_CLASS_METHOD_OUTPUT_KEY,
COLLECTIVE_OPERATION_KEY,
COLLECTIVE_GROUP_KEY,
DAGNODE_TYPE_KEY,
)
from ray.dag.vis_utils import plot
Expand All @@ -38,7 +38,7 @@
"PREV_CLASS_METHOD_CALL_KEY",
"BIND_INDEX_KEY",
"IS_CLASS_METHOD_OUTPUT_KEY",
"COLLECTIVE_OPERATION_KEY",
"COLLECTIVE_GROUP_KEY",
"DAGNODE_TYPE_KEY",
"plot",
"MultiOutputNode",
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dag/class_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def __init__(
ClassMethodNode
] = other_args_to_resolve.get(PREV_CLASS_METHOD_CALL_KEY, None)
# The index/order when bind() is called on this class method
self._bind_index: Optional[int] = other_args_to_resolve.get(
self._bind_index: Optional[Union[int, float]] = other_args_to_resolve.get(
AndyUB marked this conversation as resolved.
Show resolved Hide resolved
BIND_INDEX_KEY, None
)
# Represent if the ClassMethodNode is a class method output. If True,
Expand Down Expand Up @@ -252,7 +252,7 @@ def __repr__(self) -> str:
def get_method_name(self) -> str:
return self._method_name

def _get_bind_index(self) -> int:
def _get_bind_index(self) -> Union[int, float]:
return self._bind_index

def _get_remote_method(self, method_name):
Expand Down
42 changes: 25 additions & 17 deletions python/ray/dag/collective_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@
DAGNode,
ClassMethodNode,
)
from ray.dag.constants import COLLECTIVE_OPERATION_KEY
from ray.dag.constants import COLLECTIVE_GROUP_KEY
from ray.dag.sync_group import _SynchronousGroup
from ray.experimental.channel import ChannelContext
from ray.experimental.channel.torch_tensor_nccl_channel import _init_nccl_group
from ray.experimental.channel.torch_tensor_type import GPUCommunicator, TorchTensorType
from ray.experimental.util.types import _CollectiveOp, ReduceOp
from ray.util.annotations import DeveloperAPI


class _CollectiveOperation:
class _CollectiveGroup(_SynchronousGroup):
"""
Represent metadata for a NCCL collective operation.
Represent metadata for a group of actors in a NCCL collective operation.

Args:
input_nodes: A list of input nodes to the collective operation.
Expand All @@ -37,6 +38,8 @@ def __init__(
op: _CollectiveOp,
transport: Optional[Union[str, GPUCommunicator]] = None,
):
super().__init__()

if len(input_nodes) == 0:
raise ValueError("Expected input nodes for a collective operation")
if len(set(input_nodes)) != len(input_nodes):
Expand Down Expand Up @@ -142,6 +145,14 @@ def __init__(
method_options: Dict[str, Any],
other_args_to_resolve: Dict[str, Any],
):
super().__init__(
method_name,
method_args,
method_kwargs,
method_options,
other_args_to_resolve,
)

# Parse the input node.
if not (
isinstance(method_args, tuple)
Expand All @@ -151,19 +162,12 @@ def __init__(
raise ValueError("Expected a single input node")
self._input_node = method_args[0]
# Parse the collective operation.
self._collective_op: _CollectiveOperation = other_args_to_resolve.get(
COLLECTIVE_OPERATION_KEY, None
)
if self._collective_op is None:
raise ValueError("Expected a collective operation")

super().__init__(
method_name,
method_args,
method_kwargs,
method_options,
other_args_to_resolve,
self._collective_group: _CollectiveGroup = other_args_to_resolve.get(
COLLECTIVE_GROUP_KEY, None
)
if self._collective_group is None:
raise ValueError("Expected a collective group")
self.set_requires_nccl_collective(True)

def _copy_impl(
self,
Expand All @@ -186,5 +190,9 @@ def _execute_impl(self, *args, **kwargs):
)

@property
def collective_op(self) -> _CollectiveOperation:
return self._collective_op
def collective_group(self) -> _CollectiveGroup:
return self._collective_group

@property
def sync_group(self) -> _CollectiveGroup:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this duplicated as above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics are a bit different. sync_group is used in scheduling. collective_group is used to init_nccl_group.

return self._collective_group
Loading