Skip to content

Commit

Permalink
[OUT-OF-PROCESS] Move the Plugin, as a whole out-of-process. (#1647)
Browse files Browse the repository at this point in the history
* [OUT-OF-PROCESS] Move the Plugin, as a whole out-of-process.

On request of Sky Labs Aalborg, we are looking at possibilities to run any plugin, also plugins not coded
for out of process, out-of-process to make sure that memory corruption created by such is plugin is only
in its own process space and nolonger in the Thunder (WPEFramework) process space.

Focus for this feature are mainly plugins created in the RDKService that are not yet prepared to run out of
process but are only exposing a JSONRPC to the outside world.

The idea is to pass a root config object on plugin level, not on configuration level. If such a root config
object is set on plugin level (keyword also "root"), the Framework will start/load the plugin out of process
in the ThunderPlugin application process space. The request made (through the regular code) is to return a
IPlugin (one of the perls of working interface based all along ;-) ) Now if we retrieve the IPlugin from the
module loaded in the ThunderPlugin process space, Thunder framework will continue with this IPlugin as if it
where created locally and thus the IDispatcher (For JSONRPC dispatching) are extracted from the IPlugin for
handling the JSONRPC over COMRPC (feature available as of R4.4).

The Initialize is now also being forwarded through the IPlugin to the instance that is running in the
ThunderPlugin process (out-of-process hosting). This means that Initialization of the plugin is only done in
the ThunderPlugin process and nolonger in the Thunder (WPEFramework) process so this might lead to unexpected
situations. To be found during testing :-)

Another point of attention/testing is the feedback of JSONRPC events. More commits might follow to get that
properly working.

* [OUT-OF-PROCESS] Tested it succefully, more testing needed..

Issues:
IDispatcher needs to be intialized and deinitialized, added the ILocalDispatcher methods to the
IDispatcher, so the JSONRPC class can also be intialized (Attach) and deinitialized (detach) in
the other process space.

Crash due to a full Deactivate in case an Activate fails and the new behaviour (Deinitialize called
after any faiing Initialize) was assuming JSONRPC stuff was set, which was not :-) Now aonly call
Deinitialize in stead of a full Deactivate.

Decoupled notification of the channels that are opened and closed. As the closing of the channel
was detected by the ResourceMonitor thread, this thread is used to also iterate over the subscribers
for notifiation of channel opening and closing. As JSONRPC classes are one of the interested parties
to this event, the event now had to travel across a COMRPC boundary, in case the JSONRPC is running
out-of-process. A COMRPC call is a no go on the ResourceMonitor thread since that will deadlock ;-)

The registration from the IConnectionServer::INotification for the out-of-process plugin was triggered
by the Attach of the IDispatcher, however since this registration actively pushes all current open
channels to the subscriber we ended up in a deadlock:
COMRPC classical deadlock:
Thunder -> (IDispatcher::Attach)   ThunderPlugin --+
                                                   |
 +-------   IShell::Register(sink) <---------------+
 |
 +------> INotification::Open(id) [Oops, in and outbound on COMRPC are already in progress!!!!]

Solved by pushing the sink back on the Attach (and later Detach for symetry) and have Thunder Register
this sink on behalf of the ouot of process plugin !

Tested sofar calling JSONRPC method on the out-of-process plugin (JSONRPCPlugin in examples of ThunderNanoServices,
which worked flawless and did receive already notification that are send by default to the system (inbound
events)

More testing needs to be done!

* Update CMakeLists.txt

Seems we need an additional search path for this to work.

* Update after review

Addressing review remarks of @MFransen69

* Update PluginServer.h

Oops collateral damage of the search :-)

* Update PluginServer.h

* Update CMakeLists.txt

Oops

* Update PluginServer.h

* Update IDispatcher.h

---------

Co-authored-by: MFransen69 <[email protected]>
  • Loading branch information
pwielders and MFransen69 authored Jun 27, 2024
1 parent cb226b3 commit a53cd99
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 30 deletions.
9 changes: 9 additions & 0 deletions Source/Thunder/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ namespace PluginHost {
, IdleTime(0)
, SoftKillCheckWaitTime(10)
, HardKillCheckWaitTime(4)
, OutOfProcessWaitTime(3)
, IPV6(false)
, LegacyInitialize(false)
, DefaultMessagingCategories(false)
Expand Down Expand Up @@ -510,6 +511,7 @@ namespace PluginHost {
Add(_T("idletime"), &IdleTime);
Add(_T("softkillcheckwaittime"), &SoftKillCheckWaitTime);
Add(_T("hardkillcheckwaittime"), &HardKillCheckWaitTime);
Add(_T("outofprocesswaittime"), &OutOfProcessWaitTime);
Add(_T("ipv6"), &IPV6);
Add(_T("legacyinitialize"), &LegacyInitialize);
Add(_T("messaging"), &DefaultMessagingCategories);
Expand Down Expand Up @@ -555,6 +557,7 @@ namespace PluginHost {
Core::JSON::DecUInt16 IdleTime;
Core::JSON::DecUInt8 SoftKillCheckWaitTime;
Core::JSON::DecUInt8 HardKillCheckWaitTime;
Core::JSON::DecUInt8 OutOfProcessWaitTime;
Core::JSON::Boolean IPV6;
Core::JSON::Boolean LegacyInitialize;
Core::JSON::String DefaultMessagingCategories;
Expand Down Expand Up @@ -725,6 +728,7 @@ namespace PluginHost {
, _idleTime(180)
, _softKillCheckWaitTime(3)
, _hardKillCheckWaitTime(10)
, _outOfProcessWaitTime(3000)
, _stackSize(0)
, _inputInfo()
, _processInfo()
Expand Down Expand Up @@ -773,6 +777,7 @@ namespace PluginHost {
_idleTime = config.IdleTime.Value();
_softKillCheckWaitTime = config.SoftKillCheckWaitTime.Value();
_hardKillCheckWaitTime = config.HardKillCheckWaitTime.Value();
_outOfProcessWaitTime = config.OutOfProcessWaitTime.Value() * 1000; // Move to milliseconds
_IPV6 = config.IPV6.Value();
_legacyInitialize = config.LegacyInitialize.Value();
_binding = config.Binding.Value();
Expand Down Expand Up @@ -975,6 +980,9 @@ POP_WARNING()
inline uint8_t HardKillCheckWaitTime() const {
return _hardKillCheckWaitTime;
}
inline uint16_t OutOfProcessWaitTime() const {
return _outOfProcessWaitTime;
}
inline const string& URL() const {
return (_URL);
}
Expand Down Expand Up @@ -1156,6 +1164,7 @@ POP_WARNING()
uint16_t _idleTime;
uint8_t _softKillCheckWaitTime;
uint8_t _hardKillCheckWaitTime;
uint16_t _outOfProcessWaitTime;
uint32_t _stackSize;
InputInfo _inputInfo;
ProcessInfo _processInfo;
Expand Down
5 changes: 4 additions & 1 deletion Source/Thunder/ExampleConfigWindows.json
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@
"callsign": "JSONRPCPlugin",
"locator": "libJSONRPCPlugin.so",
"classname": "JSONRPCPlugin",
"startmode": "Deactivated"
"startmode": "Deactivated",
"root": {
"mode": "Local"
}
},
{
"callsign": "OpenCDMi",
Expand Down
105 changes: 83 additions & 22 deletions Source/Thunder/PluginServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ namespace PluginHost {
std::vector<PluginHost::ISubSystem::subsystem> _control;
string _versionHash;
};

static Core::NodeId PluginNodeId(const PluginHost::Config& config, const Plugin::Config& plugin) {
Core::NodeId result;
if (plugin.Communicator.IsSet() == true) {
Expand Down Expand Up @@ -1207,7 +1208,9 @@ namespace PluginHost {

void* result(_administrator.Instantiate(object, waitTime, sessionId, DataPath(), PersistentPath(), VolatilePath()));

_connection = _administrator.RemoteConnection(sessionId);
if (result != nullptr) {
_connection = _administrator.RemoteConnection(sessionId);
}

return (result);
}
Expand Down Expand Up @@ -1378,16 +1381,17 @@ namespace PluginHost {
if (progressedState == 0) {
ErrorMessage(_T("library does not exist"));
}
else if (progressedState == 2) {
else if (progressedState == 1) {
ErrorMessage(_T("library could not be loaded"));
}
else if (progressedState == 3) {
else if (progressedState == 2) {
ErrorMessage(_T("library does not contain the right methods"));
}
}

return (result);
}

void AcquireInterfaces()
{
ASSERT((State() == DEACTIVATED) || (State() == PRECONDITION));
Expand All @@ -1406,13 +1410,35 @@ namespace PluginHost {
}
} else {
_library = LoadLibrary(locator);
if (_library.IsLoaded() == false) {
ErrorMessage(_T("Library could not be loaded"));
}
else {
if ((newIF = Core::ServiceAdministrator::Instance().Instantiate<IPlugin>(_library, className, version)) == nullptr) {
ErrorMessage(_T("class definitions/version does not exist"));
_library = Core::Library();
if (_library.IsLoaded() == true) {
if ((PluginHost::Service::Configuration().Root.IsSet() == false) || (PluginHost::Service::Configuration().Root.Mode.Value() == Plugin::Config::RootConfig::ModeType::OFF)) {
if ((newIF = Core::ServiceAdministrator::Instance().Instantiate<IPlugin>(_library, className, version)) == nullptr) {
ErrorMessage(_T("class definitions/version does not exist"));
Core::ServiceAdministrator::Instance().ReleaseLibrary(std::move(_library));
}
}
else {
uint32_t pid;
Core::ServiceAdministrator::Instance().ReleaseLibrary(std::move(_library));

RPC::Object definition(locator,
classNameString,
Callsign(),
IPlugin::ID,
version,
PluginHost::Service::Configuration().Root.User.Value(),
PluginHost::Service::Configuration().Root.Group.Value(),
PluginHost::Service::Configuration().Root.Threads.Value(),
PluginHost::Service::Configuration().Root.Priority.Value(),
PluginHost::Service::Configuration().Root.HostType(),
SystemRootPath(),
PluginHost::Service::Configuration().Root.RemoteAddress.Value(),
PluginHost::Service::Configuration().Root.Configuration.Value());

newIF = reinterpret_cast<IPlugin*>(Instantiate(definition, _administrator.Configuration().OutOfProcessWaitTime(), pid));
if (newIF == nullptr) {
ErrorMessage(_T("could not start the plugin in a detached mode"));
}
}
}
}
Expand Down Expand Up @@ -1859,7 +1885,7 @@ namespace PluginHost {
};

private:
class CommunicatorServer : public RPC::Communicator {
class CommunicatorServer : public RPC::Communicator {
private:
using Observers = std::vector<IShell::ICOMLink::INotification*>;
using Proxy = std::pair<uint32_t, const Core::IUnknown*>;
Expand Down Expand Up @@ -2518,6 +2544,8 @@ namespace PluginHost {
string _observerPath;
};

using Channels = std::vector<uint32_t>;

public:
ServiceMap() = delete;
ServiceMap(ServiceMap&&) = delete;
Expand Down Expand Up @@ -2553,6 +2581,9 @@ namespace PluginHost {
, _configObserver(*this, server._config.PluginConfigPath())
, _shellObservers()
, _channelObservers()
, _opened()
, _closed()
, _job(*this)
{
if (server._config.PluginConfigPath().empty() == true) {
SYSLOG(Logging::Startup, (_T("Dynamic configs disabled.")));
Expand Down Expand Up @@ -3109,23 +3140,25 @@ namespace PluginHost {

void Opened(const uint32_t id)
{
_notificationLock.Lock();

for (auto& sink : _channelObservers) {
sink->Opened(id);
}
_adminLock.Lock();
_opened.push_back(id);
_adminLock.Unlock();

_notificationLock.Unlock();
_job.Submit();
}
void Closed(const uint32_t id)
{
_notificationLock.Lock();

for (auto& sink : _channelObservers) {
sink->Closed(id);
_adminLock.Lock();
Channels::iterator index(std::find(_opened.begin(), _opened.end(), id));
if (index != _opened.end()) {
_opened.erase(index);
}
else {
_closed.push_back(id);
}
_adminLock.Unlock();

_notificationLock.Unlock();
_job.Submit();
}

private:
Expand Down Expand Up @@ -3250,6 +3283,31 @@ namespace PluginHost {
return (_server.WorkerPool());
}

friend class Core::ThreadPool::JobType<ServiceMap&>;
void Dispatch()
{
_adminLock.Lock();
Channels opened; opened.swap(_opened);
Channels closed; closed.swap(_closed);
_adminLock.Unlock();

_notificationLock.Lock();

for (uint32_t id : opened) {
for (auto& sink : _channelObservers) {
sink->Opened(id);
}
}

for (uint32_t id : closed) {
for (auto& sink : _channelObservers) {
sink->Closed(id);
}
}

_notificationLock.Unlock();
}

private:
Server& _server;
mutable Core::CriticalSection _adminLock;
Expand All @@ -3264,6 +3322,9 @@ namespace PluginHost {
ConfigObserver _configObserver;
ShellNotifiers _shellObservers;
ChannelObservers _channelObservers;
Channels _opened;
Channels _closed;
Core::ThreadPool::JobType<ServiceMap&> _job;
};

// Connection handler is the listening socket and keeps track of all open
Expand Down
13 changes: 11 additions & 2 deletions Source/plugins/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ namespace Plugin {
, StartupOrder(50)
, StartMode(PluginHost::IShell::startmode::ACTIVATED)
, Communicator()
, Root()
{
Add(_T("callsign"), &Callsign);
Add(_T("locator"), &Locator);
Expand All @@ -268,6 +269,7 @@ namespace Plugin {
Add(_T("startuporder"), &StartupOrder);
Add(_T("startmode"), &StartMode);
Add(_T("communicator"), &Communicator);
Add(_T("root"), &Root);
}
Config(const Config& copy)
: Core::JSON::Container()
Expand All @@ -286,6 +288,7 @@ namespace Plugin {
, StartupOrder(copy.StartupOrder)
, StartMode(copy.StartMode)
, Communicator(copy.Communicator)
, Root(copy.Root)
{
Add(_T("callsign"), &Callsign);
Add(_T("locator"), &Locator);
Expand All @@ -302,8 +305,9 @@ namespace Plugin {
Add(_T("startuporder"), &StartupOrder);
Add(_T("startmode"), &StartMode);
Add(_T("communicator"), &Communicator);
Add(_T("root"), &Root);
}
Config(Config&& move)
Config(Config&& move) noexcept
: Core::JSON::Container()
, Callsign(std::move(move.Callsign))
, Locator(std::move(move.Locator))
Expand All @@ -320,6 +324,7 @@ namespace Plugin {
, StartupOrder(std::move(move.StartupOrder))
, StartMode(std::move(move.StartMode))
, Communicator(std::move(move.Communicator))
, Root(std::move(move.Root))
{
Add(_T("callsign"), &Callsign);
Add(_T("locator"), &Locator);
Expand All @@ -336,6 +341,7 @@ namespace Plugin {
Add(_T("startuporder"), &StartupOrder);
Add(_T("startmode"), &StartMode);
Add(_T("communicator"), &Communicator);
Add(_T("root"), &Root);
}

~Config() override = default;
Expand All @@ -357,11 +363,12 @@ namespace Plugin {
StartupOrder = RHS.StartupOrder;
StartMode = RHS.StartMode;
Communicator = RHS.Communicator;
Root = RHS.Root;

return (*this);
}

Config& operator=(Config&& move)
Config& operator=(Config&& move) noexcept
{
Callsign = std::move(move.Callsign);
Locator = std::move(move.Locator);
Expand All @@ -378,6 +385,7 @@ namespace Plugin {
StartupOrder = std::move(move.StartupOrder);
StartMode = std::move(move.StartMode);
Communicator = std::move(move.Communicator);
Root = std::move(move.Root);

return (*this);
}
Expand Down Expand Up @@ -410,6 +418,7 @@ namespace Plugin {
Core::JSON::DecUInt32 StartupOrder;
Core::JSON::EnumType<PluginHost::IShell::startmode> StartMode;
Core::JSON::String Communicator;
RootConfig Root;

static Core::NodeId IPV4UnicastNode(const string& ifname);

Expand Down
2 changes: 0 additions & 2 deletions Source/plugins/JSONRPC.h
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,6 @@ namespace PluginHost {
return (Core::ERROR_NONE);
}

// Inherited via IDispatcher::ICallback
// ---------------------------------------------------------------------------------
void Dropped(const IDispatcher::ICallback* callback) override {
_adminLock.Lock();

Expand Down
3 changes: 0 additions & 3 deletions Source/plugins/Shell.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ namespace PluginHost
} else {
ICOMLink* handler(QueryInterface<ICOMLink>());

// This method can only be used in the main process. Only this process, can instantiate a new process
ASSERT(handler != nullptr);

if (handler != nullptr) {
string locator(rootConfig.Locator.Value());
if (locator.empty() == true) {
Expand Down

0 comments on commit a53cd99

Please sign in to comment.