Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) [core][compiled graphs] Unify code paths for NCCL P2P and collectives scheduling #48649

Open
wants to merge 33 commits into
base: master
Choose a base branch
from

Conversation

AndyUB
Copy link
Contributor

@AndyUB AndyUB commented Nov 8, 2024

Why are these changes needed?

This PR unifies the code paths for NCCL P2P and collectives. Before, scheduling for NCCL operations is done by splitting each node into three operations: READ, COMPUTE, and WRITE. This PR simplifies the logic by only keeping the compute node. To ensure scheduling still works, NCCL operations are converted into special types of system-created compute nodes, namely NCCL_WRITE and NCCL_READ for P2P send/recv, and NCCL_COLLECTIVE for collectives (WIP).

This PR will be a first step for supporting a dict of tensors in all-reduce, and eventually the DDP/FSDP project.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

dengwxn and others added 10 commits October 27, 2024 10:36
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
@dengwxn
Copy link
Contributor

dengwxn commented Nov 8, 2024

Looks great. Some more TODOs before an initial review as we discussed offline:

  1. Refactor all the [CL] and [TODO] in the code. They are mainly missing comments, unused code blocks, branches to be merged, variable and function names to be renamed, etc.
  2. Introduce a special op node for NCCL_Collective similar to the current NCCL_READ and NCCL_WRITE, such that the COMPUTE node does not require NCCL.

cc @dengwxn

@dengwxn
Copy link
Contributor

dengwxn commented Nov 8, 2024

@anyscalesam Could you help add a go badge to run more CI tests? Thanks!

@AndyUB AndyUB marked this pull request as ready for review November 8, 2024 18:49
@dengwxn
Copy link
Contributor

dengwxn commented Nov 9, 2024

Introduce a special op node for NCCL_Collective similar to the current NCCL_READ and NCCL_WRITE, such that the COMPUTE node does not require NCCL.

After your attempt and a second thought, I think this might not be the best way to separate NCCL and non-NCCL ops by introducing another NCCL_Collective op. We can skip this and see what others think.

@dengwxn
Copy link
Contributor

dengwxn commented Nov 9, 2024

As we discussed offline, we should remove all the NCCL_* op nodes, instead we should create system-level DAG nodes doing NCCL read/write. We will refactor based on this.

Copy link
Contributor

@dengwxn dengwxn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Structure seems right. Will look into details later.

python/ray/dag/collective_node.py Outdated Show resolved Hide resolved
python/ray/dag/collective_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/p2p_node.py Outdated Show resolved Hide resolved
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be made simpler. Try to think about how you can achieve the following:

  • _NCCLSendNode/_NCCLRecvNode should have the same interface as _CollectiveOperation
  • If the above is done properly, I believe we can get rid of most of the parts that need to differentiate between send/recv/collective. I.e. there should be only one requires_nccl flag instead of three, and there should only be on kind of DAG op node, a COMPUTE node.

Comment on lines 592 to 593
elif self.requires_nccl_read or self.requires_nccl_write:
method = lambda arg: arg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the method here be something like self.collective_op.execute, not identity function?

Copy link
Contributor Author

@AndyUB AndyUB Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

It is the identity function because:

  • If the operation is a NCCL write (i.e., it corresponds to a _NcclSendNode), the actual work is done in _write. The NCCL write operation first _reads in the output from the actual actor task via an IntraProcessChannel, then it _computes, which in this case, just returns the data read so that it is ready to be sent via NCCL.
  • If the operation is a NCCL read, the actual work is done in _read. After reading in the data via NCCL, it _computes (which again just returns the read data), and _writes the data to the actual downstream actor task via IntraProcessChannel.

I introduced a new interface _SynchronousGroup that represents a group of tasks in synchronous operations like NCCL and wrapped both the identity function here and the original collective_op.execute in sync_group.execute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but what I am suggesting is that we move the code that is currently in _write and _read into an execute method instead. So we get rid of the concept of _write and _read entirely and just have a generic execute method that we call instead.

@@ -895,6 +912,119 @@ def _add_node(self, node: "ray.dag.DAGNode") -> None:
self.dag_node_to_idx[node] = idx
self.counter += 1

def _add_nccl_p2p_nodes(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding the p2p nodes can be pretty simple if you just modify the code in _add_node. The algorithm should be something like:

  • if node's output requires NCCL, add a send node
  • for each input of the node, if the input requires NCCL, replace the input with a recv node and link it to the input's corresponding send node

_add_node should get called in topological order from the InputNode of the DAG, so this algorithm should work out fine (there should always be a corresponding send node by the time you need to add the recv node).

You can skip all of the checks because these should already be covered by other parts of the code, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried modifying _add_node in your suggested way, but it did not work because the current implementation calls _add_node in BFS order, not in topological order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deduplicated the checks for errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. How about modifying that code to use topological order instead? My concern is that currently this function looks a bit hard to understand.

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
Comment on lines +1826 to +1828
requires_nccl_read = dag_node.requires_nccl_read
requires_nccl_write = dag_node.requires_nccl_write
requires_nccl_collective = dag_node.requires_nccl_collective
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need three different flags instead of just requires_nccl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation of overlapping GPU communication supports overlapping NCCL read with computation. The NCCL collectives (only allreduce so far) are currently synchronized.

To overlap NCCL read with computation, during scheduling, a NCCL read node is swapped with its previous compute node if possible. That is a case where we need requires_nccl_read or some similar sort of bookkeeping.

We can probably combine these three different flags into one requires_nccl if we also support overlapping NCCL collectives with computation. Should we implement that in this same PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do that in this PR. So the new rule should be that requires_nccl nodes cannot be swapped with each other but can be swapped with other nodes. Does that sound right?

python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
@rkooo567 rkooo567 self-assigned this Nov 12, 2024
@stephanie-wang stephanie-wang self-assigned this Nov 12, 2024
@jcotant1 jcotant1 added the core Issues that should be addressed in Ray Core label Nov 15, 2024
Copy link
Contributor

@dengwxn dengwxn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall structure looks good! We can polish some details soon.

python/ray/dag/class_node.py Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
@@ -4,14 +4,15 @@
PARENT_CLASS_NODE_KEY = "parent_class_node"
PREV_CLASS_METHOD_CALL_KEY = "prev_class_method_call"
BIND_INDEX_KEY = "bind_index"
NO_CONTROL_EDGE_BIND_INDEX_VALUE = -1
WRITE_BIND_INDEX_INCREMENT = 0.3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hacky. Can you assign consecutive bind indicies to nccl read/write and the corresponding compute nodes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we do want consecutive bind indices:

  • If the task represented by a ClassMethodNode requires NCCL read from one or many upstream tasks, for each of its corresponding _NcclRecvNode(s), _NcclRecvNodes[i].bind_index == ClassMethodNode.bind_index - i - 1.
  • If the task represented by a ClassMethodNode requires NCCL write, its corresponding _NcclSendNode.bind_index == ClassMethodNode.bind_index + 1.

Under the current implementation, it seems not possible to assign consecutive bind indices at bind time. This is because the user can use with_type_hint to add/remove type hints.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, the NCCL read/write tasks are still subtasks of the original task, so it makes sense to subtract a little for NCCL read and add a little for NCCL write 🤣 . The problem is that the value added/subtracted can be arbitrary.

Another idea would be to go through all actor tasks in the DAG and update all of their bind indices. I feel this requires a lot of code change and might be confusing.

Another idea I have tried is to set the bind indices of all NCCL reads and writes to an invalid value -1. This messes up the exec_task_idxs and generates correct but non-readable schedules (because of the exec_task_idxs).

Another idea is to set the bind index of all NCCL reads and write to be the same as their corresponding ClassMethodNode's bind index. This can still result in arbitrary exec_task_idxs because of different ways of tie-breaking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the same bind index. Change sort lambda.

return True
with self._send_stream:
if self._write():
return True
Copy link
Contributor Author

@AndyUB AndyUB Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Fix this. This does not allow overlapping.
Idea: Return GPUFuture across nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test overlapping benefits.

python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
Copy link
Contributor Author

@AndyUB AndyUB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync'ed with Weixin offline

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
python/ray/dag/dag_node_operation.py Show resolved Hide resolved
python/ray/experimental/compiled_dag_ref.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
return self._collective_group

@property
def sync_group(self) -> _CollectiveGroup:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this duplicated as above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics are a bit different. sync_group is used in scheduling. collective_group is used to init_nccl_group.

python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
other_args_to_resolve={
PARENT_CLASS_NODE_KEY: send_actor_handle,
P2P_GROUP_KEY: _P2PGroup(),
# [TODO:andyub] What should the bind index be here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a few comments to discuss the options of bind index here.

Copy link
Contributor Author

@AndyUB AndyUB Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this needs some discussion. Will update the comment once we agree on a good way.

python/ray/dag/dag_node_operation.py Show resolved Hide resolved
python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
python/ray/dag/dag_node_operation.py Show resolved Hide resolved
python/ray/dag/p2p_node.py Show resolved Hide resolved
Signed-off-by: Yuhan Ruan <[email protected]>
collective_node.ready_collective_idxs.add(
(node.task_idx, node.operation.type)
)
if node.in_degree == 0 and node.sync_group is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments.

# Channel closed. Exit the loop.
return True

# To overlap GPU communication for NCCL recv, launch the NCCL recv operation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain the if not ... or ... in more details.

elif op_type == _DAGNodeOperationType.WRITE:
with self._send_stream:
return self._write()
with self._recv_stream:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments in all the places where a future is generated or passed to downstream. Cover why it is generated and where it is passed to.

with self._send_stream:
# To overlap GPU communication for NCCL recv, write the future as output to
# the downstream task, which waits on the future in its compute operation.
if self.requires_nccl_read and overlap_gpu_communication:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Write of NcclRecvNode does not need to wait on the read future.

# To overlap GPU communication for NCCL recv, launch the NCCL recv operation,
# skip the normal compute operation, and return the future without waiting.
if not self.requires_nccl_read or not overlap_gpu_communication:
input_data = self.reset_and_wait_intermediate_future()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Compute of NcclRecvNode is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Compute of base ClassMethodNode and NcclSendNode needs to wait on the read future.

output_val = self._intermediate_future
self._intermediate_future = None
else:
output_val = self.reset_and_wait_intermediate_future()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Write of base ClassMethodNode and NcclSendNode needs to wait on the compute future.

input_data = self.reset_and_wait_intermediate_future()
try:
_process_return_vals(input_data, return_single_output=False)
# Wait for any future in the input data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Wait on the read future.

val = input_data[i]
if isinstance(val, DAGOperationFuture):
resolved_future = val.wait()
# The only source of future is NCCL recv.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: NcclRecvNode.

@stephanie-wang stephanie-wang added the go add ONLY when ready to merge, run all tests label Nov 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants