Skip to content

Commit

Permalink
Specify connection when creating Allocation/Admin session for Glacier2 (
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Oct 30, 2024
1 parent 2bb052a commit 5407a60
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 59 deletions.
20 changes: 13 additions & 7 deletions cpp/src/IceGrid/AdminSessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,15 @@ AdminSessionFactory::AdminSessionFactory(
}

Glacier2::SessionPrx
AdminSessionFactory::createGlacier2Session(const string& sessionId, const optional<Glacier2::SessionControlPrx>& ctl)
AdminSessionFactory::createGlacier2Session(
const string& sessionId,
const optional<Glacier2::SessionControlPrx>& ctl,
const Ice::ConnectionPtr& con)
{
assert(_servantManager);

auto session = createSessionServant(sessionId);
auto proxy = session->_register(_servantManager, nullptr);
auto proxy = session->_register(_servantManager, con);

if (ctl)
{
Expand All @@ -489,7 +492,7 @@ AdminSessionFactory::createGlacier2Session(const string& sessionId, const option
// We can't use a non-0 timeout such as the Glacier2 session timeout. As of Ice 3.8, heartbeats may not be sent
// at all on a busy connection. Furthermore, as of Ice 3.8, Glacier2 no longer "converts" heartbeats into
// keepAlive requests.
_reaper->add(make_shared<SessionReapable<AdminSessionI>>(_database->getTraceLevels()->logger, session), 0s);
_reaper->add(make_shared<SessionReapable<AdminSessionI>>(_database->getTraceLevels()->logger, session), 0s, con);
return Ice::uncheckedCast<Glacier2::SessionPrx>(proxy);
}

Expand All @@ -508,15 +511,18 @@ AdminSessionFactory::getTraceLevels() const
AdminSessionManagerI::AdminSessionManagerI(const shared_ptr<AdminSessionFactory>& factory) : _factory(factory) {}

optional<Glacier2::SessionPrx>
AdminSessionManagerI::create(string userId, optional<Glacier2::SessionControlPrx> ctl, const Ice::Current&)
AdminSessionManagerI::create(string userId, optional<Glacier2::SessionControlPrx> ctl, const Ice::Current& current)
{
return _factory->createGlacier2Session(std::move(userId), std::move(ctl));
return _factory->createGlacier2Session(std::move(userId), std::move(ctl), current.con);
}

AdminSSLSessionManagerI::AdminSSLSessionManagerI(const shared_ptr<AdminSessionFactory>& factory) : _factory(factory) {}

optional<Glacier2::SessionPrx>
AdminSSLSessionManagerI::create(Glacier2::SSLInfo info, optional<Glacier2::SessionControlPrx> ctl, const Ice::Current&)
AdminSSLSessionManagerI::create(
Glacier2::SSLInfo info,
optional<Glacier2::SessionControlPrx> ctl,
const Ice::Current& current)
{
string userDN;
if (!info.certs.empty()) // TODO: Require userDN?
Expand All @@ -535,5 +541,5 @@ AdminSSLSessionManagerI::create(Glacier2::SSLInfo info, optional<Glacier2::Sessi
}
}

return _factory->createGlacier2Session(std::move(userDN), std::move(ctl));
return _factory->createGlacier2Session(std::move(userDN), std::move(ctl), current.con);
}
7 changes: 5 additions & 2 deletions cpp/src/IceGrid/AdminSessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ namespace IceGrid
const std::shared_ptr<ReapThread>&,
const std::shared_ptr<RegistryI>&);

Glacier2::SessionPrx
createGlacier2Session(const std::string&, const std::optional<Glacier2::SessionControlPrx>&);
Glacier2::SessionPrx createGlacier2Session(
const std::string& sessionId,
const std::optional<Glacier2::SessionControlPrx>& ctl,
const Ice::ConnectionPtr& con);

std::shared_ptr<AdminSessionI> createSessionServant(const std::string&);

const std::shared_ptr<TraceLevels>& getTraceLevels() const;
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/IceGrid/InternalRegistryI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ InternalRegistryI::registerNode(
try
{
auto session = NodeSessionI::create(_database, std::move(*node), info, _nodeSessionTimeout, load);
_reaper->add(make_shared<SessionReapable<NodeSessionI>>(logger, session), _nodeSessionTimeout);

// nullptr for the connection parameter, meaning the reaper won't destroy the session if the connection is
// closed.
_reaper->add(make_shared<SessionReapable<NodeSessionI>>(logger, session), _nodeSessionTimeout, nullptr);
return session->getProxy();
}
catch (const Ice::ObjectAdapterDestroyedException&)
Expand Down Expand Up @@ -93,7 +96,10 @@ InternalRegistryI::registerReplica(
try
{
auto s = ReplicaSessionI::create(_database, _wellKnownObjects, info, std::move(*prx), _replicaSessionTimeout);
_reaper->add(make_shared<SessionReapable<ReplicaSessionI>>(logger, s), _replicaSessionTimeout);

// nullptr for the connection parameter, meaning the reaper won't destroy the session if the connection is
// closed.
_reaper->add(make_shared<SessionReapable<ReplicaSessionI>>(logger, s), _replicaSessionTimeout, nullptr);
return s->getProxy();
}
catch (const Ice::ObjectAdapterDestroyedException&)
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/IceGrid/NodeSessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ NodeSessionI::destroy(const Ice::Current&)
destroyImpl(false);
}

chrono::steady_clock::time_point
NodeSessionI::timestamp() const
optional<chrono::steady_clock::time_point>
NodeSessionI::timestamp() const noexcept
{
lock_guard lock(_mutex);
if (_destroy)
{
throw Ice::ObjectNotExistException{__FILE__, __LINE__};
return nullopt;
}
return _timestamp;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/IceGrid/NodeSessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace IceGrid
const Ice::Current&) const final;
void destroy(const Ice::Current&) final;

std::chrono::steady_clock::time_point timestamp() const;
std::optional<std::chrono::steady_clock::time_point> timestamp() const noexcept;
void shutdown();

const NodePrx& getNode() const;
Expand Down
18 changes: 7 additions & 11 deletions cpp/src/IceGrid/ReapThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,19 @@ ReapThread::run()
auto p = _sessions.begin();
while (p != _sessions.end())
{
try
if (auto timestamp = p->item->timestamp())
{
auto timestamp = p->item->timestamp(); // throws ONE if the reapable is destroyed.

if (p->timeout > 0s && (chrono::steady_clock::now() - timestamp > p->timeout))
if (p->timeout > 0s && (chrono::steady_clock::now() - *timestamp > p->timeout))
{
reap.push_back(*p);
// and go to the code after the catch block
}
else
{
++p;
continue;
continue; // while loop
}
}
catch (const Ice::ObjectNotExistException&)
{
// already destroyed
}
// else session is already destroyed and we clean-up

// Remove the reapable
if (p->connection)
Expand Down Expand Up @@ -141,7 +135,7 @@ ReapThread::add(const shared_ptr<Reapable>& reapable, chrono::seconds timeout, c
}

// NOTE: registering a reapable with a 0s timeout is allowed. The reapable is reaped only when the reaper thread is
// shutdown or the connection is closed.
// shutdown or the connection is closed (when connection is not null).

//
// 10 seconds is the minimum permissable timeout (for non-zero timeouts).
Expand All @@ -155,6 +149,8 @@ ReapThread::add(const shared_ptr<Reapable>& reapable, chrono::seconds timeout, c

if (connection)
{
assert(timeout == 0s);

auto p = _connections.find(connection);
if (p == _connections.end())
{
Expand Down
25 changes: 22 additions & 3 deletions cpp/src/IceGrid/ReapThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ namespace IceGrid
public:
virtual ~Reapable() = default;

virtual std::chrono::steady_clock::time_point timestamp() const = 0;
// Returns the timestamp of the most recent keepAlive call, or nullopt if the session is destroyed.
virtual std::optional<std::chrono::steady_clock::time_point> timestamp() const noexcept = 0;
virtual void destroy(bool) = 0;
};

Expand All @@ -32,7 +33,10 @@ namespace IceGrid
{
}

std::chrono::steady_clock::time_point timestamp() const final { return _session->timestamp(); }
std::optional<std::chrono::steady_clock::time_point> timestamp() const noexcept final
{
return _session->timestamp();
}

void destroy(bool shutdown) final
{
Expand Down Expand Up @@ -62,14 +66,29 @@ namespace IceGrid
const std::shared_ptr<T> _session;
};

// A shared monitoring/reaping mechanism that destroys session servants. It's used for all session servants hosted
// by the IceGrid registry: admin sessions, client sessions (aka resource allocation sessions), internal node
// sessions and internal replica sessions.
// It supports two modes:
// - 0s timeout + non-null connection: the session is bound to the connection and is destroyed/reaped either
// explicitly (via a call to a Slice-defined destroy operation) or when the connection is closed
// - a null connection with usually a non-0 timeout: the session's lifetime is independent of the connection, and
// the reaper destroys the session when it doesn't receive a keepAlive call within timeout. If the timeout is 0s,
// the reaper only reaps the session when it's destroyed explicitly, or when the IceGrid registry shuts down.
class ReapThread final
{
public:
ReapThread();

void terminate();
void join();
void add(const std::shared_ptr<Reapable>&, std::chrono::seconds, const Ice::ConnectionPtr& = nullptr);

// Add a session (wrapped in a Reapable object), with a specified timeout (can be 0s) and connection (can be
// null).
void
add(const std::shared_ptr<Reapable>& reapable,
std::chrono::seconds timeout,
const Ice::ConnectionPtr& connection);

void connectionClosed(const Ice::ConnectionPtr&);

Expand Down
18 changes: 7 additions & 11 deletions cpp/src/IceGrid/RegistryI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1073,20 +1073,16 @@ RegistryI::createAdminSessionFromSecureConnection(const Current& current)
int
RegistryI::getSessionTimeout(const Ice::Current&) const
{
// getSessionTimeout is called by clients that create sessions (resource allocation sessions aka client sessions and
// admin sessions) directly using the IceGrid::Registry interface. These sessions are hosted by the
// IceGrid.Registry.Client object adapter.

// A Glacier2 client that creates a session using the Glacier2::SessionManager can't call this operation since it
// doesn't have access to the IceGrid::Registry interface.
PropertiesPtr properties = _communicator->getProperties();

int serverIdleTimeout = properties->getIcePropertyAsInt("Ice.Connection.Server.IdleTimeout");
int adminSessionTimeout = properties->getPropertyAsIntWithDefault(
"IceGrid.Registry.AdminSessionManager.Connection.IdleTimeout",
serverIdleTimeout);
int sessionTimeout = properties->getPropertyAsIntWithDefault(
"IceGrid.Registry.SessionManager.Connection.IdleTimeout",
serverIdleTimeout);

// Users should not fine-tune the idle timeout so both timeouts are usually identical. In case they are different,
// we return the min to the caller (an old Ice client, with Ice version <= 3.7). The caller may then send keep alive
// more often than necessary (very minor).
return min(adminSessionTimeout, sessionTimeout);
return properties->getPropertyAsIntWithDefault("IceGrid.Registry.Client.Connection.IdleTimeout", serverIdleTimeout);
}

string
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/IceGrid/ReplicaSessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ ReplicaSessionI::destroy(const Ice::Current&)
destroyImpl(false);
}

chrono::steady_clock::time_point
ReplicaSessionI::timestamp() const
optional<chrono::steady_clock::time_point>
ReplicaSessionI::timestamp() const noexcept
{
lock_guard lock(_mutex);
if (_destroy)
{
throw Ice::ObjectNotExistException{__FILE__, __LINE__};
return nullopt;
}
return _timestamp;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/IceGrid/ReplicaSessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace IceGrid
void receivedUpdate(TopicName, int, std::string, const Ice::Current&) override;
void destroy(const Ice::Current&) override;

std::chrono::steady_clock::time_point timestamp() const;
std::optional<std::chrono::steady_clock::time_point> timestamp() const noexcept;
void shutdown();

const InternalRegistryPrx& getInternalRegistry() const;
Expand Down
24 changes: 13 additions & 11 deletions cpp/src/IceGrid/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,13 @@ BaseSessionI::destroyImpl(bool)
}
}

std::chrono::steady_clock::time_point
BaseSessionI::timestamp() const
optional<chrono::steady_clock::time_point>
BaseSessionI::timestamp() const noexcept
{
lock_guard lock(_mutex);
if (_destroyed)
{
// Just a "marker" exception here.
throw Ice::ObjectNotExistException{__FILE__, __LINE__};
return nullopt;
}
return std::chrono::steady_clock::time_point::min(); // not used
}
Expand Down Expand Up @@ -277,12 +276,15 @@ ClientSessionFactory::ClientSessionFactory(
}

Glacier2::SessionPrx
ClientSessionFactory::createGlacier2Session(const string& sessionId, const optional<Glacier2::SessionControlPrx>& ctl)
ClientSessionFactory::createGlacier2Session(
const string& sessionId,
const optional<Glacier2::SessionControlPrx>& ctl,
const Ice::ConnectionPtr& con)
{
assert(_servantManager);

auto session = createSessionServant(sessionId);
auto proxy = session->_register(_servantManager, nullptr);
auto proxy = session->_register(_servantManager, con);

if (ctl)
{
Expand All @@ -308,7 +310,7 @@ ClientSessionFactory::createGlacier2Session(const string& sessionId, const optio
// We can't use a non-0 timeout such as the Glacier2 session timeout. As of Ice 3.8, heartbeats may not be sent
// at all on a busy connection. Furthermore, as of Ice 3.8, Glacier2 no longer "converts" heartbeats into
// keepAlive requests.
_reaper->add(make_shared<SessionReapable<SessionI>>(_database->getTraceLevels()->logger, session), 0s);
_reaper->add(make_shared<SessionReapable<SessionI>>(_database->getTraceLevels()->logger, session), 0s, con);
return Ice::uncheckedCast<Glacier2::SessionPrx>(proxy);
}

Expand All @@ -327,9 +329,9 @@ ClientSessionFactory::getTraceLevels() const
ClientSessionManagerI::ClientSessionManagerI(const shared_ptr<ClientSessionFactory>& factory) : _factory(factory) {}

std::optional<Glacier2::SessionPrx>
ClientSessionManagerI::create(string user, std::optional<Glacier2::SessionControlPrx> ctl, const Ice::Current&)
ClientSessionManagerI::create(string user, std::optional<Glacier2::SessionControlPrx> ctl, const Ice::Current& current)
{
return _factory->createGlacier2Session(std::move(user), std::move(ctl));
return _factory->createGlacier2Session(std::move(user), std::move(ctl), current.con);
}

ClientSSLSessionManagerI::ClientSSLSessionManagerI(const shared_ptr<ClientSessionFactory>& factory) : _factory(factory)
Expand All @@ -340,7 +342,7 @@ std::optional<Glacier2::SessionPrx>
ClientSSLSessionManagerI::create(
Glacier2::SSLInfo info,
std::optional<Glacier2::SessionControlPrx> ctl,
const Ice::Current&)
const Ice::Current& current)
{
string userDN;
if (!info.certs.empty()) // TODO: Require userDN?
Expand All @@ -360,5 +362,5 @@ ClientSSLSessionManagerI::create(
}
}

return _factory->createGlacier2Session(userDN, ctl);
return _factory->createGlacier2Session(userDN, ctl, current.con);
}
10 changes: 6 additions & 4 deletions cpp/src/IceGrid/SessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ namespace IceGrid
public:
virtual ~BaseSessionI() = default;

// Return value is never used. Just throws ONE when the session is destroyed.
std::chrono::steady_clock::time_point timestamp() const;
// Return value is never used. Just returns nullopt when the session is destroyed.
std::optional<std::chrono::steady_clock::time_point> timestamp() const noexcept;

void shutdown();
std::optional<Glacier2::IdentitySetPrx> getGlacier2IdentitySet();
Expand Down Expand Up @@ -99,8 +99,10 @@ namespace IceGrid
const IceInternal::TimerPtr&,
const std::shared_ptr<ReapThread>&);

Glacier2::SessionPrx
createGlacier2Session(const std::string&, const std::optional<Glacier2::SessionControlPrx>&);
Glacier2::SessionPrx createGlacier2Session(
const std::string& sessionId,
const std::optional<Glacier2::SessionControlPrx>& ctl,
const Ice::ConnectionPtr& con);

std::shared_ptr<SessionI> createSessionServant(const std::string&);

Expand Down

0 comments on commit 5407a60

Please sign in to comment.