Skip to content

Commit

Permalink
[JSONRPC] Add IConnectionServer to IShell (rdkcentral#1561)
Browse files Browse the repository at this point in the history
* [JSONRPC] Add IJSONRPCLink to IShell

* [JSONRPC] Unify context and index to one template set

* Omit proxystubs for IJSONRPCLink

* Unregister notification if dropped channel removed all observers

* Remove assert

* Remove COMLink and JSONRPCLink accessors

* ChannelClosed is private

* Restore assert on Register

* Report initial set on Register

* Don't limit on JSONRPC anymore

---------

Co-authored-by: Pierre Wielders <[email protected]>
  • Loading branch information
sebaszm and pwielders authored Apr 17, 2024
1 parent c9e5173 commit 9a4f15f
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 165 deletions.
11 changes: 9 additions & 2 deletions Source/WPEFramework/PluginServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ namespace PluginHost {
_processAdministrator.Destroy();
}

/* virtual */ void* Server::Service::QueryInterface(const uint32_t id)
void* Server::Service::QueryInterface(const uint32_t id) /* override */
{
void* result = nullptr;
if (id == Core::IUnknown::ID) {
Expand All @@ -290,8 +290,15 @@ namespace PluginHost {
AddRef();
result = static_cast<PluginHost::IShell*>(this);
}
else if (id == PluginHost::IShell::ICOMLink::ID) {
AddRef();
result = static_cast<PluginHost::IShell::ICOMLink*>(this);
}
else if (id == PluginHost::IShell::IConnectionServer::ID) {
AddRef();
result = static_cast<PluginHost::IShell::IConnectionServer*>(this);
}
else {

_pluginHandling.Lock();

if (_handler != nullptr) {
Expand Down
145 changes: 96 additions & 49 deletions Source/WPEFramework/PluginServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ namespace PluginHost {
std::string _text;
};

class Service : public IShell::ICOMLink, public PluginHost::Service {
class Service : public IShell::ICOMLink, public IShell::IConnectionServer, public PluginHost::Service {
public:
enum mode {
CONFIGURED,
Expand Down Expand Up @@ -1216,10 +1216,6 @@ namespace PluginHost {
result.ToString(info);
return (Core::ERROR_NONE);
}
// Use the base framework (webbridge) to start/stop processes and the service in side of the given binary.
IShell::ICOMLink* COMLink() override {
return (this);
}
void* Instantiate(const RPC::Object& object, const uint32_t waitTime, uint32_t& sessionId) override
{
ASSERT(_connection == nullptr);
Expand All @@ -1238,6 +1234,14 @@ namespace PluginHost {
{
_administrator.Unregister(sink);
}
void Register(IShell::IConnectionServer::INotification* sink) override
{
_administrator.Register(sink);
}
void Unregister(const IShell::IConnectionServer::INotification* sink) override
{
_administrator.Unregister(sink);
}
void Register(IShell::ICOMLink::INotification* sink)
{
_administrator.Register(sink);
Expand All @@ -1251,10 +1255,6 @@ namespace PluginHost {
return (_administrator.RemoteConnection(connectionId));
}

void Closed(const uint32_t /*id */)
{
}

// Methods to Activate and Deactivate the aggregated Plugin to this shell.
// These are Blocking calls!!!!!
Core::hresult Activate(const reason) override;
Expand Down Expand Up @@ -1778,6 +1778,7 @@ namespace PluginHost {
using Notifiers = std::vector<PluginHost::IPlugin::INotification*>;
using RemoteInstantiators = std::unordered_map<string, IRemoteInstantiation*>;
using ShellNotifiers = std::vector< Exchange::Controller::IShells::INotification*>;
using ChannelObservers = std::vector<IShell::IConnectionServer::INotification*>;

class Iterator {
public:
Expand Down Expand Up @@ -2567,6 +2568,8 @@ namespace PluginHost {
, _subSystems(this)
, _authenticationHandler(nullptr)
, _configObserver(*this, server._config.PluginConfigPath())
, _shellObservers()
, _channelObservers()
{
if (server._config.PluginConfigPath().empty() == true) {
SYSLOG(Logging::Startup, (_T("Dynamic configs disabled.")));
Expand Down Expand Up @@ -2818,6 +2821,40 @@ namespace PluginHost {
{
_processAdministrator.Unregister(sink);
}
void Register(IShell::IConnectionServer::INotification* sink)
{
_notificationLock.Lock();

ASSERT(std::find(_channelObservers.begin(), _channelObservers.end(), sink) == _channelObservers.end());

_channelObservers.push_back(sink);

ASSERT(sink != nullptr);
sink->AddRef();

_server.Visit([sink](const Channel& channel) {
if (channel.IsOpen() == true) {
sink->Opened(channel.Id());
}
});

_notificationLock.Unlock();
}
void Unregister(const IShell::IConnectionServer::INotification* sink)
{
_notificationLock.Lock();

ChannelObservers::iterator index(std::find(_channelObservers.begin(), _channelObservers.end(), sink));

ASSERT(index != _channelObservers.end());

if (index != _channelObservers.end()) {
(*index)->Release();
_channelObservers.erase(index);
}

_notificationLock.Unlock();
}
void Register(Exchange::Controller::IShells::INotification* sink) {
_notificationLock.Lock();

Expand Down Expand Up @@ -2855,24 +2892,10 @@ namespace PluginHost {

_notificationLock.Unlock();
}

RPC::IRemoteConnection* RemoteConnection(const uint32_t connectionId)
{
return (connectionId != 0 ? _processAdministrator.Connection(connectionId) : nullptr);
}
void Closed(const uint32_t id) {
_adminLock.Lock();

// First stop all services running ...
Plugins::iterator index(_services.begin());

while (index != _services.end()) {
index->second->Closed(id);
++index;
}

_adminLock.Unlock();
}
inline Core::ProxyType<Service> Insert(const Plugin::Config& configuration, const Service::mode mode)
{
// Whatever plugin is needse, we at least have our Metadata plugin available (as the first entry :-).
Expand All @@ -2889,7 +2912,6 @@ namespace PluginHost {

return (newService);
}

inline uint32_t Clone(const Core::ProxyType<IShell>& originalShell, const string& newCallsign, Core::ProxyType<IShell>& newService)
{
uint32_t result = Core::ERROR_GENERAL;
Expand Down Expand Up @@ -2924,7 +2946,6 @@ namespace PluginHost {

return (result);
}

inline void Destroy(const string& callSign)
{
_adminLock.Lock();
Expand Down Expand Up @@ -3114,6 +3135,27 @@ namespace PluginHost {
void Close();
void Destroy();

void Opened(const uint32_t id)
{
_notificationLock.Lock();

for (auto& sink : _channelObservers) {
sink->Opened(id);
}

_notificationLock.Unlock();
}
void Closed(const uint32_t id)
{
_notificationLock.Lock();

for (auto& sink : _channelObservers) {
sink->Closed(id);
}

_notificationLock.Unlock();
}

private:
void Dangling(const Core::IUnknown* source, const uint32_t interfaceId) {
if (interfaceId == RPC::IRemoteConnection::INotification::ID)
Expand Down Expand Up @@ -3249,6 +3291,7 @@ namespace PluginHost {
IAuthenticate* _authenticationHandler;
ConfigObserver _configObserver;
ShellNotifiers _shellObservers;
ChannelObservers _channelObservers;
};

// Connection handler is the listening socket and keeps track of all open
Expand Down Expand Up @@ -4098,6 +4141,8 @@ namespace PluginHost {

State(CLOSED, false);

_parent.Operational(Id(), false);

} else if (IsUpgrading() == true) {

ASSERT(_service.IsValid() == false);
Expand Down Expand Up @@ -4135,6 +4180,9 @@ namespace PluginHost {
}
}
}
else if ((IsOpen() == true) && (IsWebSocket() == false)) {
_parent.Operational(Id(), true);
}
}

friend class Core::SocketServerType<Channel>;
Expand Down Expand Up @@ -4225,19 +4273,9 @@ namespace PluginHost {
_job.Revoke();

// Start by closing the server thread..
BaseClass::Close(waitTime);

// Kill all open connections, we are shutting down !!!
BaseClass::Iterator index(BaseClass::Clients());

while (index.Next() == true) {
// Oops nothing hapened for a long time, kill the connection
// give it 100ms to actually close, if not do it forcefully !!
index.Client()->Close(waitTime);
}

// Cleanup the closed sockets we created..
ValidateConnections();
BaseClass::Close(waitTime);
BaseClass::Cleanup();

return (Core::ERROR_NONE);
}
Expand Down Expand Up @@ -4285,7 +4323,7 @@ namespace PluginHost {
// Next Clean all Id's from JSONRPC nolonger available
//
// First check and clear, closed connections
ValidateConnections();
BaseClass::Cleanup();

if (_connectionCheckTimer != 0) {
// Now suspend those that have no activity.
Expand All @@ -4310,17 +4348,6 @@ namespace PluginHost {
_job.Reschedule(NextTick);
}
}
void ValidateConnections() {
BaseClass::Iterator index(BaseClass::Clients());

while (index.Next() == true) {
if (index.Client()->IsOpen() == false) {
TRACE(Activity, (_T("Client closed, that is reported closed"), index.Client()->Id()));
_parent.Services().Closed(index.Client()->Id());
}
}
BaseClass::Cleanup();
}

private:
Server& _parent;
Expand Down Expand Up @@ -4445,6 +4472,26 @@ namespace PluginHost {
return (infoBlob.Load());
}

void Visit(const std::function<void(const Channel&)>& handler)
{
ChannelMap::Iterator it = _connections.Clients();

while (it.Next() == true) {
handler(*it.Client());
}
}

private:
void Operational(const uint32_t id, const bool upAndRunning)
{
if (upAndRunning == true) {
Services().Opened(id);
}
else {
Services().Closed(id);
}
}

private:
Core::ProxyType<Service> Controller()
{
Expand Down
32 changes: 17 additions & 15 deletions Source/com/Ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,24 @@ namespace RPC {
ID_DISPATCHER_CALLBACK = (ID_OFFSET_INTERNAL + 0x002E),

ID_SHELL = (ID_OFFSET_INTERNAL + 0x0030),
ID_STATECONTROL = (ID_OFFSET_INTERNAL + 0x0031),
ID_STATECONTROL_NOTIFICATION = (ID_OFFSET_INTERNAL + 0x0032),
ID_SUBSYSTEM = (ID_OFFSET_INTERNAL + 0x0033),
ID_SUBSYSTEM_NOTIFICATION = (ID_OFFSET_INTERNAL + 0x0034),
ID_SUBSYSTEM_INTERNET = (ID_OFFSET_INTERNAL + 0x0035),
ID_SUBSYSTEM_LOCATION = (ID_OFFSET_INTERNAL + 0x0036),
ID_SUBSYSTEM_IDENTIFIER = (ID_OFFSET_INTERNAL + 0x0037),
ID_SUBSYSTEM_TIME = (ID_OFFSET_INTERNAL + 0x0038),
ID_SUBSYSTEM_SECURITY = (ID_OFFSET_INTERNAL + 0x0039),
ID_SUBSYSTEM_PROVISIONING = (ID_OFFSET_INTERNAL + 0x003A),
ID_SUBSYSTEM_DECRYPTION = (ID_OFFSET_INTERNAL + 0x003B),
ID_REMOTE_INSTANTIATION = (ID_OFFSET_INTERNAL + 0x003C),
ID_COMREQUEST_NOTIFICATION = (ID_OFFSET_INTERNAL + 0x003D),
ID_SYSTEM_METADATA = (ID_OFFSET_INTERNAL + 0x003E),
ID_SHELL_COMLINK = (ID_OFFSET_INTERNAL + 0x0031),
ID_SHELL_CONNECTIONSERVER = (ID_OFFSET_INTERNAL + 0x0032),
ID_SHELL_CONNECTIONSERVER_NOTIFICATION = (ID_OFFSET_INTERNAL + 0x0033),
ID_STATECONTROL = (ID_OFFSET_INTERNAL + 0x0034),
ID_STATECONTROL_NOTIFICATION = (ID_OFFSET_INTERNAL + 0x0035),
ID_SUBSYSTEM = (ID_OFFSET_INTERNAL + 0x0036),
ID_SUBSYSTEM_NOTIFICATION = (ID_OFFSET_INTERNAL + 0x0037),
ID_SUBSYSTEM_INTERNET = (ID_OFFSET_INTERNAL + 0x0038),
ID_SUBSYSTEM_LOCATION = (ID_OFFSET_INTERNAL + 0x0039),
ID_SUBSYSTEM_IDENTIFIER = (ID_OFFSET_INTERNAL + 0x003A),
ID_SUBSYSTEM_TIME = (ID_OFFSET_INTERNAL + 0x003B),
ID_SUBSYSTEM_SECURITY = (ID_OFFSET_INTERNAL + 0x003C),
ID_SUBSYSTEM_PROVISIONING = (ID_OFFSET_INTERNAL + 0x003D),
ID_SUBSYSTEM_DECRYPTION = (ID_OFFSET_INTERNAL + 0x003E),
ID_REMOTE_INSTANTIATION = (ID_OFFSET_INTERNAL + 0x003F),
ID_SYSTEM_METADATA = (ID_OFFSET_INTERNAL + 0x0040),

ID_EXTERNAL_INTERFACE_OFFSET = (ID_OFFSET_INTERNAL + 0x0040),
ID_EXTERNAL_INTERFACE_OFFSET = (ID_OFFSET_INTERNAL + 0x0080),
ID_EXTERNAL_QA_INTERFACE_OFFSET = (ID_OFFSET_INTERNAL + 0xA000)
};
}
Expand Down
Loading

0 comments on commit 9a4f15f

Please sign in to comment.