Skip to content

Commit

Permalink
Make sure to only open a channel if the network subsystem is already …
Browse files Browse the repository at this point in the history
…active
  • Loading branch information
VeithMetro committed Oct 8, 2024
1 parent 7ff5b42 commit 52ec5d8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 8 deletions.
2 changes: 1 addition & 1 deletion MessageControl/MessageControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ namespace Thunder {
Announce(new Publishers::FileOutput(abbreviate, _config.FileName.Value()));
}
if ((_config.Remote.Binding.Value().empty() == false) && (_config.Remote.Port.Value() != 0)) {
Announce(new Publishers::UDPOutput(abbreviate, Core::NodeId(_config.Remote.NodeId())));
Announce(new Publishers::UDPOutput(abbreviate, Core::NodeId(_config.Remote.NodeId()), _service));
}

_webSocketExporter.Initialize(service, _config.MaxExportConnections.Value());
Expand Down
46 changes: 41 additions & 5 deletions MessageControl/MessageOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ namespace Publishers {
{
::memset(_sendBuffer, 0, sizeof(_sendBuffer));
}
UDPOutput::Channel::~Channel() {
Close(Core::infinite);
UDPOutput::Channel::~Channel()
{
if (IsOpen() == true) {
Close(Core::infinite);
}
}

uint16_t UDPOutput::Channel::SendData(uint8_t* dataFrame, const uint16_t maxSendSize)
Expand Down Expand Up @@ -160,16 +163,49 @@ namespace Publishers {
Trigger();
}

UDPOutput::UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId)
UDPOutput::UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId, PluginHost::IShell* service)
: _convertor(abbreviate)
, _output(nodeId)
, _notification(*this)
, _subSystem(service->SubSystems())
{
ASSERT(_subSystem != nullptr);

if (_subSystem != nullptr) {
_subSystem->AddRef();
_subSystem->Register(&_notification);

if (_subSystem->IsActive(PluginHost::ISubSystem::NETWORK)) {
OpenUDPOutputChannel();
}
}
}

PluginHost::ISubSystem* UDPOutput::SubSystem() const
{
_output.Open(0);
return (_subSystem);
}

void UDPOutput::OpenUDPOutputChannel()
{
if (_output.IsOpen() == false) {
_output.Open(0);
}
ASSERT(_output.IsOpen() == true);
}

void UDPOutput::CloseUDPOutputChannel()
{
if (_output.IsOpen() == true) {
_output.Close(Core::infinite);
}
}

void UDPOutput::Message(const Core::Messaging::MessageInfo& metadata, const string& text) /* override */
{
_output.Output(_convertor.Convert(metadata, text));
if (_output.IsOpen() == true) {
_output.Output(_convertor.Convert(metadata, text));
}
}

} // namespace Publishers
Expand Down
55 changes: 53 additions & 2 deletions MessageControl/MessageOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,19 +333,70 @@ namespace Publishers {
Core::CriticalSection _adminLock;
};

class Notification : public PluginHost::ISubSystem::INotification, public RPC::IRemoteConnection::INotification {
public:
Notification() = delete;
Notification(const Notification&) = delete;
Notification& operator=(const Notification&) = delete;

explicit Notification(UDPOutput& parent)
: _parent(parent) {
}
~Notification() = default;

public:
void Updated() override
{
if (_parent.SubSystem()->IsActive(PluginHost::ISubSystem::NETWORK)) {
_parent.OpenUDPOutputChannel();
}
else {
_parent.CloseUDPOutputChannel();
}
}
void Activated(RPC::IRemoteConnection* /* connection */) override {
}
void Deactivated(RPC::IRemoteConnection* /* connection */) override {
}
void Terminated(RPC::IRemoteConnection* /* connection */) override {
}

BEGIN_INTERFACE_MAP(Notification)
INTERFACE_ENTRY(PluginHost::ISubSystem::INotification)
INTERFACE_ENTRY(RPC::IRemoteConnection::INotification)
END_INTERFACE_MAP

private:
UDPOutput& _parent;
};

public:
UDPOutput() = delete;
UDPOutput(const UDPOutput&) = delete;
UDPOutput& operator=(const UDPOutput&) = delete;

explicit UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId);
~UDPOutput() = default;
explicit UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId, PluginHost::IShell* service);

~UDPOutput() override
{
if (_subSystem != nullptr) {
_subSystem->Unregister(&_notification);
_subSystem->Release();
_subSystem = nullptr;
}
}

PluginHost::ISubSystem* SubSystem() const;
void OpenUDPOutputChannel();
void CloseUDPOutputChannel();

void Message(const Core::Messaging::MessageInfo& metadata, const string& text);

private:
Text _convertor;
Channel _output;
Core::SinkType<Notification> _notification;
PluginHost::ISubSystem* _subSystem;
};

class WebSocketOutput : public IPublish {
Expand Down

0 comments on commit 52ec5d8

Please sign in to comment.