Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert partially constructed segments on-error in segment init function #407

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
ef06972
Don't skip this test as its perfectly valid and passes, the reference…
dagardner-nv Aug 2, 2023
79d144b
Test for issue #360
dagardner-nv Aug 2, 2023
52968bf
Remove unneeded copy/paste code
dagardner-nv Aug 2, 2023
1178c02
Revert added objects and ingress/egress ports on error in segment ini…
dagardner-nv Aug 2, 2023
ac2853c
Better test name
dagardner-nv Aug 2, 2023
6a26e4f
wip
dagardner-nv Aug 2, 2023
cccef95
Only run iwyu on compilation units actually changed in PR
dagardner-nv Aug 2, 2023
6914da7
Checks require clang
dagardner-nv Aug 2, 2023
dbeeedc
Revert "Checks require clang"
dagardner-nv Aug 2, 2023
faf233c
test [no ci]
dagardner-nv Aug 2, 2023
9fa907c
Don't fetch base branch if it's already set
dagardner-nv Aug 2, 2023
81d094f
Add comment explaining version
dagardner-nv Aug 2, 2023
9d55259
Avoid 15.0.7
dagardner-nv Aug 2, 2023
1eb89f7
revert clang version changes [no ci]
dagardner-nv Aug 3, 2023
c0f8a35
Adopt updated versions of boost and libhwlock this <hand wavy> choose…
dagardner-nv Aug 3, 2023
46ac7b5
Apply fix from mdemoret-nv:mdd_force-stubgen-dev
dagardner-nv Aug 3, 2023
a6244c9
Merge branch 'branch-23.11' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Aug 18, 2023
f285c41
Merge branch 'branch-23.11' into david-inconsistent-pipe [no ci]
dagardner-nv Sep 25, 2023
f6503db
Merge branch 'branch-23.11' into david-inconsistent-pipe
dagardner-nv Sep 25, 2023
0f1bf1c
Merge branch 'david-inconsistent-pipe' of github.com:dagardner-nv/MRC…
dagardner-nv Sep 25, 2023
1cd28f6
IWYU changes
dagardner-nv Sep 25, 2023
743d4d3
Merge branch 'branch-23.11' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Sep 25, 2023
dfa066e
Python repro for issue #360
dagardner-nv Sep 26, 2023
9e9a5f7
Place executor.join under exception handler [no ci]
dagardner-nv Sep 26, 2023
38c0e15
Fix repro test, works if segment2 raises, but fails os segment1 raise…
dagardner-nv Sep 26, 2023
22220a2
Add second test, with exception being thrown in both the first and se…
dagardner-nv Sep 26, 2023
39ba316
Test edgeholder connections
dagardner-nv Sep 28, 2023
790a50c
release_edge_connection should only reset the m_connected_edge, preve…
dagardner-nv Sep 28, 2023
50b50e7
IWYU fixes
dagardner-nv Sep 28, 2023
6d5ec3a
Merge branch 'test-edge-holder' into david-inconsistent-segments-and-…
dagardner-nv Sep 28, 2023
8e90892
Fix type-o
dagardner-nv Sep 28, 2023
a5e27a5
Merge branch 'test-edge-holder' into david-inconsistent-segments-and-…
dagardner-nv Sep 28, 2023
987d8f1
WIP: A bunch of bebug logging that needs to be removed, add a destroy…
dagardner-nv Sep 27, 2023
3c77dcc
pare back unneeded changes
dagardner-nv Sep 28, 2023
0542cb2
Stop all segments in the pipeline
dagardner-nv Sep 28, 2023
765e8ae
Remove unneeded hack
dagardner-nv Sep 29, 2023
502b8cc
Shutdown manifolds
dagardner-nv Sep 29, 2023
2df03d7
Better handling for manifolds
dagardner-nv Sep 29, 2023
3f12bac
Merge branch 'branch-23.11' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Sep 29, 2023
fa680bf
Prevent pipeline shutdown from occurring in multiple threads
dagardner-nv Sep 29, 2023
587ad7f
Use a fiber mutex
dagardner-nv Sep 29, 2023
b18dac6
More logs
dagardner-nv Sep 29, 2023
3c7c574
Shutdown ingress and egress ports
dagardner-nv Sep 29, 2023
efa9022
Revert mistaken change
dagardner-nv Sep 29, 2023
3e20b24
Cleanup logging
dagardner-nv Sep 29, 2023
d3f5a21
Revert accidental changes
dagardner-nv Sep 29, 2023
9565284
Cleanup logging
dagardner-nv Sep 29, 2023
b068686
Call do_service_kill from shutdown
dagardner-nv Sep 29, 2023
598b976
naming things
dagardner-nv Oct 4, 2023
faec661
Building, TODO: look into adding name to WritableProvider
dagardner-nv Oct 4, 2023
5d534a7
WIP
dagardner-nv Oct 4, 2023
ec6c66e
wip
dagardner-nv Oct 5, 2023
7a45e24
wip
dagardner-nv Oct 6, 2023
c79af3d
wip
dagardner-nv Oct 6, 2023
4eff767
Fix test_edges
dagardner-nv Oct 6, 2023
10b3877
WIP
dagardner-nv Oct 6, 2023
ca7ca50
Remove temporary skips
dagardner-nv Oct 9, 2023
c7edd3f
Merge branch 'david-named-nodes' into david-inconsistent-segments-and…
dagardner-nv Oct 9, 2023
00f1549
IWYU fixes
dagardner-nv Oct 9, 2023
308968e
Name held edges
dagardner-nv Oct 9, 2023
81a72fb
Merge branch 'david-named-nodes' into david-inconsistent-segments-and…
dagardner-nv Oct 9, 2023
9820993
Explicitly call constructor for SinkProperties since its a virtual an…
dagardner-nv Oct 9, 2023
9d7b9d3
Debug log
dagardner-nv Oct 9, 2023
e10afb3
Explicitly invoke SourceProperties ctor since its virtual
dagardner-nv Oct 10, 2023
e61179d
WIP: Fixed segment init error tests
dagardner-nv Oct 10, 2023
a433e7e
Named queue's & release edges
dagardner-nv Oct 10, 2023
a0b6bda
Only call shutdown on the builder when we are shutting down unexpecte…
dagardner-nv Oct 10, 2023
96dda60
Set name
dagardner-nv Oct 10, 2023
111d7f8
Check to see if the disconnector has outlived the ForwardingEgressPro…
dagardner-nv Oct 11, 2023
96e76d6
Revert "Check to see if the disconnector has outlived the ForwardingE…
dagardner-nv Oct 11, 2023
bcafc1c
Log all the things, TODO: remove
dagardner-nv Oct 11, 2023
d5150bb
Add names and logs
dagardner-nv Oct 11, 2023
3a4358c
WIP
dagardner-nv Oct 12, 2023
c0955b6
Add additional test for connect & disconnect
dagardner-nv Oct 12, 2023
b0721e5
WIP: temp printing
dagardner-nv Oct 12, 2023
eebbcf6
Avoid using the 'this' pointer in the disconnector method if the node…
dagardner-nv Oct 12, 2023
1ac10d2
Merge branch 'branch-23.11' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Oct 12, 2023
465c762
Use Mutext typedef
dagardner-nv Oct 12, 2023
897b33c
IWYU Fixes
dagardner-nv Oct 12, 2023
3108f2d
Remove debug logging
dagardner-nv Oct 12, 2023
b9ff97a
Remove debug logging
dagardner-nv Oct 12, 2023
2892329
Parametarize the engine type
dagardner-nv Oct 12, 2023
6783187
Remove unneeded using declatation. This caused errors with non-debug …
dagardner-nv Oct 13, 2023
4636694
formatting
dagardner-nv Oct 13, 2023
67840a2
Merge branch 'branch-24.03' into david-inconsistent-segments-and-edge…
dagardner-nv Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/mrc/include/mrc/edge/edge_builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,11 @@ class DeferredWritableMultiEdge : public MultiEdgeHolder<std::size_t, T>,
public DeferredWritableMultiEdgeBase
{
public:
DeferredWritableMultiEdge(determine_indices_fn_t indices_fn = nullptr, bool deep_copy = false) :
m_indices_fn(std::move(indices_fn))
DeferredWritableMultiEdge(determine_indices_fn_t indices_fn = nullptr,
bool deep_copy = false,
std::string name = std::string()) :
m_indices_fn(std::move(indices_fn)),
MultiEdgeHolder<std::size_t, T>(std::move(name))
{
// // Generate warning if deep_copy = True but type does not support it
// if constexpr (!std::is_copy_constructible_v<T>)
Expand Down
64 changes: 59 additions & 5 deletions cpp/mrc/include/mrc/edge/edge_holder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <typeindex>
#include <utility>
Expand All @@ -50,20 +52,44 @@ template <typename T>
class EdgeHolder
{
public:
EdgeHolder() = default;
EdgeHolder(std::string name = std::string()) : m_name(std::move(name)){};

virtual ~EdgeHolder()
{
// Drop any edge connections before this object goes out of scope. This should execute any disconnectors
m_connected_edge.reset();

if (this->check_active_connection(false))
{
LOG(FATAL) << "A node was destructed which still had dependent connections. Nodes must be kept alive while "
"dependent connections are still active";
LOG(FATAL) << "EdgeHolder(" << this << ")[" << m_name << "] "
<< "A node was destructed which still had dependent connections. Nodes must be kept alive while "
"dependent connections are still active\n"
<< this->connection_info();
}
}

const std::string& name() const
{
return m_name;
};

protected:
std::string connection_info() const
{
std::stringstream ss;
ss << "m_owned_edge=" << m_owned_edge.lock() << "\tm_owned_edge_lifetime=" << m_owned_edge_lifetime
<< "\tm_connected_edge=" << m_connected_edge;

bool is_connected = false;
if (m_connected_edge)
{
is_connected = m_connected_edge->is_connected();
}

ss << "\tis_connected=" << is_connected << "\tcheck_active_connection=" << this->check_active_connection(false);
return ss.str();
}

bool check_active_connection(bool do_throw = true) const
{
// Alive connection exists when the lock is true, lifetime is false or a connction object has been set
Expand Down Expand Up @@ -155,6 +181,13 @@ class EdgeHolder
m_connected_edge.reset();
}

void disconnect()
{
m_connected_edge.reset();
m_owned_edge_lifetime.reset();
m_owned_edge.reset();
}

const std::shared_ptr<Edge<T>>& get_connected_edge() const
{
return m_connected_edge;
Expand Down Expand Up @@ -188,6 +221,8 @@ class EdgeHolder
// Holds a pointer to any set edge (different from init edge). Maintains lifetime
std::shared_ptr<Edge<T>> m_connected_edge;

std::string m_name;

// Allow edge builder to call set_edge
friend EdgeBuilder;

Expand All @@ -200,10 +235,25 @@ template <typename KeyT, typename T>
class MultiEdgeHolder
{
public:
MultiEdgeHolder() = default;
MultiEdgeHolder(std::string name = std::string()) : m_name(std::move(name)){};
virtual ~MultiEdgeHolder() = default;

const std::string& name() const
{
return m_name;
};

protected:
std::string connection_info() const
{
std::stringstream ss;
ss << "m_edges.size()=" << m_edges.size();
for (const auto& [key, edge_pair] : m_edges)
{
ss << "\n\tkey=" << key << "\t" << edge_pair.connection_info();
}
return ss.str();
}
void init_owned_edge(KeyT key, std::shared_ptr<Edge<T>> edge)
{
auto& edge_pair = this->get_edge_pair(key, true);
Expand Down Expand Up @@ -276,7 +326,9 @@ class MultiEdgeHolder
{
if (create_if_missing)
{
m_edges[key] = EdgeHolder<T>();
std::ostringstream edge_name;
edge_name << m_name << "_" << key;
m_edges[key] = EdgeHolder<T>(edge_name.str());
return m_edges[key];
}

Expand Down Expand Up @@ -321,6 +373,8 @@ class MultiEdgeHolder
// Keeps pairs of get_edge/set_edge for each key
std::map<KeyT, EdgeHolder<T>> m_edges;

std::string m_name;

// Allow edge builder to call set_edge
friend EdgeBuilder;
};
Expand Down
11 changes: 11 additions & 0 deletions cpp/mrc/include/mrc/manifold/composite_manifold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ class CompositeManifold : public Manifold
mrc::make_edge(*m_ingress, *m_egress);
}

~CompositeManifold() override
{
shutdown();
};

void shutdown() final
{
m_ingress->shutdown();
m_egress->shutdown();
}

protected:
IngressT& ingress()
{
Expand Down
8 changes: 8 additions & 0 deletions cpp/mrc/include/mrc/manifold/egress.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct EgressDelegate
{
virtual ~EgressDelegate() = default;
virtual void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) = 0;
virtual void shutdown(){};
};

template <typename T>
Expand All @@ -55,6 +56,13 @@ class TypedEgress : public EgressDelegate
template <typename T>
class RoundRobinEgress : public node::Router<SegmentAddress, T>, public TypedEgress<T>
{
public:
void shutdown() final
{
DVLOG(10) << "Releasing edges from manifold egress";
node::Router<SegmentAddress, T>::release_edge_connections();
}

protected:
SegmentAddress determine_key_for_value(const T& t) override
{
Expand Down
10 changes: 10 additions & 0 deletions cpp/mrc/include/mrc/manifold/ingress.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "mrc/node/sink_properties.hpp"
#include "mrc/node/source_properties.hpp"

#include <glog/logging.h>

#include <memory>

namespace mrc::manifold {
Expand All @@ -31,6 +33,7 @@ struct IngressDelegate
{
virtual ~IngressDelegate() = default;
virtual void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) = 0;
virtual void shutdown(){};
};

template <typename T>
Expand All @@ -51,6 +54,13 @@ class TypedIngress : public IngressDelegate
template <typename T>
class MuxedIngress : public node::Muxer<T>, public TypedIngress<T>
{
public:
void shutdown() final
{
DVLOG(10) << "Releasing edges from manifold ingress";
node::SourceProperties<T>::release_edge_connection();
}

protected:
void do_add_input(const SegmentAddress& address, edge::IWritableAcceptor<T>* source) final
{
Expand Down
6 changes: 4 additions & 2 deletions cpp/mrc/include/mrc/manifold/interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ struct Interface
virtual ~Interface() = default;

virtual const PortName& port_name() const = 0;
virtual const std::string& info() const = 0;

virtual void start() = 0;
virtual void join() = 0;
virtual void start() = 0;
virtual void join() = 0;
virtual void shutdown() = 0;

virtual void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) = 0;
virtual void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) = 0;
Expand Down
4 changes: 2 additions & 2 deletions cpp/mrc/include/mrc/manifold/manifold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ class Manifold : public Interface
~Manifold() override;

const PortName& port_name() const final;
const std::string& info() const final;
void shutdown() override;

protected:
runnable::IRunnableResources& resources();

const std::string& info() const;

private:
void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) final;
void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) final;
Expand Down
2 changes: 1 addition & 1 deletion cpp/mrc/include/mrc/node/generic_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ template <typename T>
class GenericSinkComponent : public RxSinkComponent<T>
{
public:
GenericSinkComponent()
GenericSinkComponent(std::string name = std::string()) : RxSinkComponent<T>(std::move(name))
{
RxSinkComponent<T>::set_observer(rxcpp::make_observer_dynamic<T>(
[this](T data) {
Expand Down
8 changes: 7 additions & 1 deletion cpp/mrc/include/mrc/node/generic_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ template <typename T>
class GenericSourceComponent : public ForwardingEgressProvider<T>
{
public:
GenericSourceComponent() = default;
GenericSourceComponent(std::string name = std::string()) : m_name(std::move(name)) {}
~GenericSourceComponent() override = default;

private:
Expand All @@ -81,6 +81,8 @@ class GenericSourceComponent : public ForwardingEgressProvider<T>
}

virtual mrc::channel::Status get_data(T& data) = 0;

std::string m_name;
};

template <typename T>
Expand All @@ -90,6 +92,10 @@ class LambdaSourceComponent : public GenericSourceComponent<T>
using get_data_fn_t = std::function<mrc::channel::Status(T&)>;

LambdaSourceComponent(get_data_fn_t get_data_fn) : m_get_data_fn(std::move(get_data_fn)) {}
LambdaSourceComponent(std::string name, get_data_fn_t get_data_fn) :
GenericSourceComponent<T>(std::move(name)),
m_get_data_fn(std::move(get_data_fn))
{}
~LambdaSourceComponent() override = default;

private:
Expand Down
28 changes: 21 additions & 7 deletions cpp/mrc/include/mrc/node/operators/broadcast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace mrc::node {
class BroadcastTypeless : public edge::IWritableProviderBase, public edge::IWritableAcceptorBase
{
public:
BroadcastTypeless(std::string name = std::string()) : m_name(std::move(name)) {}

std::shared_ptr<edge::WritableEdgeHandle> get_writable_edge_handle() const override
{
auto* self = const_cast<BroadcastTypeless*>(this);
Expand Down Expand Up @@ -141,6 +143,7 @@ class BroadcastTypeless : public edge::IWritableProviderBase, public edge::IWrit
}

private:
std::string m_name;
std::mutex m_mutex;
std::vector<std::weak_ptr<edge::DeferredWritableMultiEdgeBase>> m_upstream_handles;
std::vector<std::shared_ptr<edge::WritableEdgeHandle>> m_downstream_handles;
Expand Down Expand Up @@ -199,18 +202,18 @@ class Broadcast : public WritableProvider<T>, public edge::IWritableAcceptor<T>

Broadcast(bool deep_copy = false)
{
auto edge = std::make_shared<BroadcastEdge>(*this, deep_copy);

// Save to avoid casting
m_edge = edge;
init_edge(deep_copy);
}

WritableProvider<T>::init_owned_edge(edge);
Broadcast(std::string name, bool deep_copy = false) : m_name(std::move(name))
{
init_edge(deep_copy);
}

~Broadcast()
{
// Debug print
VLOG(10) << "Destroying TestBroadcast";
VLOG(10) << "Destroying Broadcast " << m_name;
}

void set_writable_edge_handle(std::shared_ptr<edge::WritableEdgeHandle> ingress) override
Expand All @@ -227,11 +230,22 @@ class Broadcast : public WritableProvider<T>, public edge::IWritableAcceptor<T>

void on_complete()
{
VLOG(10) << "TestBroadcast completed";
VLOG(10) << "Broadcast completed " << m_name;
}

private:
void init_edge(bool deep_copy)
{
auto edge = std::make_shared<BroadcastEdge>(*this, deep_copy);

// Save to avoid casting
m_edge = edge;

WritableProvider<T>::init_owned_edge(edge);
}

std::weak_ptr<BroadcastEdge> m_edge;
std::string m_name;
};

} // namespace mrc::node
15 changes: 13 additions & 2 deletions cpp/mrc/include/mrc/node/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,28 @@
#include "mrc/node/sink_properties.hpp"
#include "mrc/node/source_properties.hpp"

#include <string>

namespace mrc::node {

using namespace std::string_literals;

template <typename T>
class Queue : public WritableProvider<T>, public ReadableProvider<T>
{
public:
Queue()
Queue(const std::string& name = std::string()) :
SinkProperties<T>(name + "-sink"s),
SourceProperties<T>(name + "-source"s)
{
this->set_channel(std::make_unique<mrc::channel::BufferedChannel<T>>());
}
~Queue() override = default;

~Queue() override
{
SinkProperties<T>::release_edge_connection();
SourceProperties<T>::release_edge_connection();
};

void set_channel(std::unique_ptr<mrc::channel::Channel<T>> channel)
{
Expand Down
Loading