Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft : Safe handling of control plane promises w/ clang16 #388

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1fad5eb
Updating control plane client to maintain promise lifetimes
mdemoret-nv Sep 7, 2023
44da13f
Removing Service constructor with description
mdemoret-nv Sep 7, 2023
5a642aa
use the size_t specialization of atomic to match other usage of size_t
dagardner-nv Sep 7, 2023
8524d89
Allocate promises on the heap
dagardner-nv Sep 7, 2023
2c72555
Add missing flags to docker launch to mount the working dir and set -…
dagardner-nv Sep 8, 2023
201f861
Merge branch 'branch-23.11' of github.com:nv-morpheus/MRC into mdd_co…
dagardner-nv Sep 11, 2023
6671019
Revert "Revert boost upgrade, and update clang to v16 (#382)"
dagardner-nv Sep 11, 2023
29d9f00
Re-work PromiseHandler as a component
dagardner-nv Sep 11, 2023
5cf17e5
Revert "Re-work PromiseHandler as a component"
dagardner-nv Sep 12, 2023
c7f6939
Revert "Revert "Re-work PromiseHandler as a component""
dagardner-nv Sep 12, 2023
0e29298
Revert "Allocate promises on the heap"
dagardner-nv Sep 12, 2023
89449b4
Add test viariant locked to singl core
dagardner-nv Sep 12, 2023
7732039
IWYU fixes
dagardner-nv Sep 12, 2023
1b20886
Make do_writes_done a public method
dagardner-nv Sep 13, 2023
c226ec1
Add do_writes_done method
dagardner-nv Sep 13, 2023
bd740aa
wip
dagardner-nv Sep 13, 2023
a37366a
Revert "wip"
dagardner-nv Sep 18, 2023
76c3ef2
Revert "Add do_writes_done method"
dagardner-nv Sep 18, 2023
7a277cb
Revert "Make do_writes_done a public method"
dagardner-nv Sep 18, 2023
632235f
Adopt clang-16
dagardner-nv Sep 18, 2023
7b363f0
Only run iwyu on compilation units actually changed in PR
dagardner-nv Aug 2, 2023
c10f1ba
IWYU fix
dagardner-nv Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/conda/environments/clang_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ dependencies:
- libclang=16
- libclang-cpp=16
- llvmdev=16
- include-what-you-use
- include-what-you-use=0.20
4 changes: 2 additions & 2 deletions ci/conda/environments/dev_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies:
- autoconf>=2.69
- bash-completion
- benchmark=1.6.0
- boost-cpp=1.74
- boost-cpp=1.82
- ccache
- cmake=3.24
- cuda-toolkit # Version comes from the channel above
Expand All @@ -46,7 +46,7 @@ dependencies:
- isort
- jinja2=3.0
- lcov=1.15
- libhwloc=2.5
- libhwloc=2.9.2
- libprotobuf=3.21
- librmm=23.06
- libtool
Expand Down
17 changes: 15 additions & 2 deletions ci/scripts/cpp_checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,22 @@ if [[ -n "${MRC_MODIFIED_FILES}" ]]; then

# Include What You Use
if [[ "${SKIP_IWYU}" == "" ]]; then
IWYU_DIRS="cpp python"
# Remove .h, .hpp, and .cu files from the modified list
shopt -s extglob
IWYU_MODIFIED_FILES=( "${MRC_MODIFIED_FILES[@]/*.@(h|hpp|cu)/}" )

# Get the list of compiled files relative to this directory
WORKING_PREFIX="${PWD}/"
COMPILED_FILES=( $(jq -r .[].file ${BUILD_DIR}/compile_commands.json | sort -u ) )
COMPILED_FILES=( "${COMPILED_FILES[@]/#$WORKING_PREFIX/}" )
COMBINED_FILES=("${COMPILED_FILES[@]}")
COMBINED_FILES+=("${IWYU_MODIFIED_FILES[@]}")

# Find the intersection between compiled files and modified files
IWYU_MODIFIED_FILES=( $(printf '%s\0' "${COMBINED_FILES[@]}" | sort -z | uniq -d -z | xargs -0n1) )

NUM_PROC=$(get_num_proc)
IWYU_OUTPUT=`${IWYU_TOOL} -p ${BUILD_DIR} -j ${NUM_PROC} ${IWYU_DIRS} 2>&1`
IWYU_OUTPUT=`${IWYU_TOOL} -p ${BUILD_DIR} -j ${NUM_PROC} ${IWYU_MODIFIED_FILES[@]} 2>&1`
IWYU_RETVAL=$?
fi
else
Expand Down
74 changes: 61 additions & 13 deletions cpp/mrc/src/internal/control_plane/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

#include "internal/control_plane/client/connections_manager.hpp"
#include "internal/grpc/progress_engine.hpp"
#include "internal/grpc/promise_handler.hpp"
#include "internal/grpc/promise_handler.hpp" // for PromiseHandler
#include "internal/grpc/stream_writer.hpp" // for StreamWriter
#include "internal/runnable/runnable_resources.hpp"
#include "internal/service.hpp"
#include "internal/system/system.hpp"

#include "mrc/channel/status.hpp"
Expand All @@ -33,15 +35,20 @@
#include "mrc/runnable/launch_control.hpp"
#include "mrc/runnable/launcher.hpp"
#include "mrc/runnable/runner.hpp"
#include "mrc/types.hpp"

#include <boost/fiber/future/promise.hpp> // for promise
#include <google/protobuf/any.pb.h>
#include <grpcpp/grpcpp.h>
#include <rxcpp/rx.hpp>

#include <mutex>
#include <ostream>

namespace mrc::control_plane {

std::atomic_uint64_t AsyncEventStatus::s_request_id_counter;

Client::Client(resources::PartitionResourceBase& base, std::shared_ptr<grpc::CompletionQueue> cq) :
resources::PartitionResourceBase(base),
m_cq(std::move(cq)),
Expand Down Expand Up @@ -73,13 +80,11 @@
if (m_owns_progress_engine)
{
CHECK(m_cq);
auto progress_engine = std::make_unique<rpc::ProgressEngine>(m_cq);
auto progress_handler = std::make_unique<rpc::PromiseHandler>();
auto progress_engine = std::make_unique<rpc::ProgressEngine>(m_cq);
m_progress_handler = std::make_unique<rpc::PromiseHandler>();

mrc::make_edge(*progress_engine, *progress_handler);
mrc::make_edge(*progress_engine, *m_progress_handler);

m_progress_handler =
runnable().launch_control().prepare_launcher(launch_options(), std::move(progress_handler))->ignition();
m_progress_engine =
runnable().launch_control().prepare_launcher(launch_options(), std::move(progress_engine))->ignition();
}
Expand Down Expand Up @@ -135,7 +140,6 @@
if (m_owns_progress_engine)
{
m_progress_engine->await_live();
m_progress_handler->await_live();
}
m_event_handler->await_live();
}
Expand All @@ -150,7 +154,6 @@
{
m_cq->Shutdown();
m_progress_engine->await_join();
m_progress_handler->await_join();
}
}

Expand All @@ -161,10 +164,21 @@
// handle a subset of events directly on the event handler

case protos::EventType::Response: {
auto* promise = reinterpret_cast<Promise<protos::Event>*>(event.msg.tag());
if (promise != nullptr)
auto event_tag = event.msg.tag();

if (event_tag != 0)
{
promise->set_value(std::move(event.msg));
// Lock to prevent multiple threads
std::unique_lock<decltype(m_mutex)> lock(m_mutex);

// Find the promise associated with the event tag
auto promise = m_pending_events.extract(event_tag);

// Unlock to allow other threads to continue as soon as possible
lock.unlock();

// Finally, set the value
promise.mapped().set_value(std::move(event.msg));
}
}
break;
Expand Down Expand Up @@ -242,11 +256,12 @@
return m_launch_options;
}

void Client::issue_event(const protos::EventType& event_type)
AsyncEventStatus Client::issue_event(const protos::EventType& event_type)
{
protos::Event event;
event.set_event(event_type);
m_writer->await_write(std::move(event));
// m_writer->await_write(std::move(event));
return this->write_event(std::move(event), false);
}

void Client::request_update()
Expand All @@ -260,4 +275,37 @@
// }
}

AsyncEventStatus Client::write_event(protos::Event event, bool await_response)
{
if (event.tag() != 0)
{
LOG(WARNING) << "event tag is set but this field should exclusively be used by the control plane client. "
"Clearing to avoid confusion";
event.clear_tag();

Check warning on line 284 in cpp/mrc/src/internal/control_plane/client.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/internal/control_plane/client.cpp#L282-L284

Added lines #L282 - L284 were not covered by tests
}

AsyncEventStatus status;

if (await_response)
{
// If we are supporting awaiting, create the promise now
Promise<protos::Event> promise;

// Set the future to the status
status.set_future(promise.get_future());

// Set the tag to the request ID to allow looking up the promise later
event.set_tag(status.request_id());

// Save the promise to the pending promises to be retrieved later
std::unique_lock<decltype(m_mutex)> lock(m_mutex);

m_pending_events[status.request_id()] = std::move(promise);
}

// Finally, write the event
m_writer->await_write(std::move(event));

return status;
}
} // namespace mrc::control_plane
Loading
Loading