Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IB gather WIP #172

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,8 @@ class Communicator {
/// @param ibMaxWrPerSend The maximum number of work requests per send for IB. Unused if transport is not IB.
/// @return std::shared_ptr<Connection> A shared pointer to the connection.
std::shared_ptr<Connection> connectOnSetup(int remoteRank, int tag, Transport transport, int ibMaxCqSize = 1024,
int ibMaxCqPollNum = 1, int ibMaxSendWr = 8192, int ibMaxWrPerSend = 64);
int ibMaxCqPollNum = 1, int ibMaxSendWr = 8192, int ibMaxWrPerSend = 64,
int ibMaxNumSgesPerWr = 1);

/// Add a custom Setuppable object to a list of objects to be setup later, when @ref setup() is called.
///
Expand Down
2 changes: 1 addition & 1 deletion python/mscclpp/core_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void register_core(nb::module_& m) {
.def("recv_memory_on_setup", &Communicator::recvMemoryOnSetup, nb::arg("remoteRank"), nb::arg("tag"))
.def("connect_on_setup", &Communicator::connectOnSetup, nb::arg("remoteRank"), nb::arg("tag"),
nb::arg("transport"), nb::arg("ibMaxCqSize") = 1024, nb::arg("ibMaxCqPollNum") = 1,
nb::arg("ibMaxSendWr") = 8192, nb::arg("ibMaxWrPerSend") = 64)
nb::arg("ibMaxSendWr") = 8192, nb::arg("ibMaxWrPerSend") = 64, nb::arg("ibMaxNumSgesPerWr") = 1)
.def("setup", &Communicator::setup);
}

Expand Down
10 changes: 4 additions & 6 deletions src/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ MSCCLPP_API_CPP NonblockingFuture<RegisteredMemory> Communicator::recvMemoryOnSe
return NonblockingFuture<RegisteredMemory>(memoryReceiver->memoryPromise_.get_future());
}

MSCCLPP_API_CPP std::shared_ptr<Connection> Communicator::connectOnSetup(int remoteRank, int tag, Transport transport,
int ibMaxCqSize /*=1024*/,
int ibMaxCqPollNum /*=1*/,
int ibMaxSendWr /*=8192*/,
int ibMaxWrPerSend /*=64*/) {
MSCCLPP_API_CPP std::shared_ptr<Connection> Communicator::connectOnSetup(
int remoteRank, int tag, Transport transport, int ibMaxCqSize /*=1024*/, int ibMaxCqPollNum /*=1*/,
int ibMaxSendWr /*=8192*/, int ibMaxWrPerSend /*=64*/, int ibMaxNumSgesPerWr /*=1*/) {
std::shared_ptr<ConnectionBase> conn;
if (transport == Transport::CudaIpc) {
// sanity check: make sure the IPC connection is being made within a node
Expand All @@ -116,7 +114,7 @@ MSCCLPP_API_CPP std::shared_ptr<Connection> Communicator::connectOnSetup(int rem
pimpl->rankToHash_[remoteRank]);
} else if (AllIBTransports.has(transport)) {
auto ibConn = std::make_shared<IBConnection>(remoteRank, tag, transport, ibMaxCqSize, ibMaxCqPollNum, ibMaxSendWr,
ibMaxWrPerSend, *pimpl);
ibMaxWrPerSend, ibMaxNumSgesPerWr, *pimpl);
conn = ibConn;
INFO(MSCCLPP_NET, "IB connection between rank %d(%lx) via %s and remoteRank %d(%lx) created",
pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()],
Expand Down
4 changes: 2 additions & 2 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ void CudaIpcConnection::flush(int64_t timeoutUsec) {
// IBConnection

IBConnection::IBConnection(int remoteRank, int tag, Transport transport, int maxCqSize, int maxCqPollNum, int maxSendWr,
int maxWrPerSend, Communicator::Impl& commImpl)
int maxWrPerSend, int maxNumSgesPerWr, Communicator::Impl& commImpl)
: ConnectionBase(remoteRank, tag),
transport_(transport),
remoteTransport_(Transport::Unknown),
numSignaledSends(0),
dummyAtomicSource_(std::make_unique<uint64_t>(0)) {
qp = commImpl.getIbContext(transport)->createQp(maxCqSize, maxCqPollNum, maxSendWr, 0, maxWrPerSend);
qp = commImpl.getIbContext(transport)->createQp(maxCqSize, maxCqPollNum, maxSendWr, 0, maxWrPerSend, maxNumSgesPerWr);
dummyAtomicSourceMem_ = RegisteredMemory(std::make_shared<RegisteredMemory::Impl>(
dummyAtomicSource_.get(), sizeof(uint64_t), commImpl.bootstrap_->getRank(), transport, commImpl));
validateTransport(dummyAtomicSourceMem_, transport);
Expand Down
Loading