Skip to content

Commit

Permalink
fixup! Support TLS listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
hallfox committed Jan 6, 2025
1 parent cb68f2a commit a5ff6ec
Show file tree
Hide file tree
Showing 8 changed files with 625 additions and 291 deletions.
4 changes: 2 additions & 2 deletions src/applications/bmqbrkr/etc/bmqbrkrcfg.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"rotationBytes": 268435456,
"logfileFormat": "%d (%t) %s %F:%l %m\n\n",
"consoleFormat": "%d (%t) %s %F:%l %m\n",
"loggingVerbosity": "TRACE",
"consoleSeverityThreshold": "TRACE",
"loggingVerbosity": "INFO",
"consoleSeverityThreshold": "INFO",
"categories": [
"BMQBRKR:INFO:green",
"BMQ*:INFO:green",
Expand Down
43 changes: 11 additions & 32 deletions src/groups/bmq/bmqio/bmqio_ntcchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,6 @@ void NtcChannel::processConnect(

d_peerUri = d_streamSocket_sp->remoteEndpoint().text();

if (d_encryptionClient_sp) {
this->upgrade(d_encryptionClient_sp,
ntca::UpgradeOptions(),
d_upgradeCallback);
}

lock.release()->unlock();

if (resultCallback) {
Expand Down Expand Up @@ -977,32 +971,7 @@ NtcChannel::NtcChannel(
, d_watermarkSignaler(basicAllocator)
, d_closeSignaler(basicAllocator)
, d_resultCallback(bsl::allocator_arg, basicAllocator, resultCallback)
, d_encryptionClient_sp()
, d_upgradeCallback(bsl::allocator_arg, basicAllocator)
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
}

NtcChannel::NtcChannel(
const bsl::shared_ptr<ntci::Interface>& interface,
const bmqio::ChannelFactory::ResultCallback& resultCallback,
const ntci::UpgradeFunction& upgradeCallback,
bslma::Allocator* basicAllocator)
: d_mutex()
, d_interface_sp(interface)
, d_streamSocket_sp()
, d_readQueue(basicAllocator)
, d_readCache(basicAllocator)
, d_channelId(0)
, d_peerUri(basicAllocator)
, d_state(e_STATE_DEFAULT)
, d_options(basicAllocator)
, d_properties(basicAllocator)
, d_watermarkSignaler(basicAllocator)
, d_closeSignaler(basicAllocator)
, d_resultCallback(bsl::allocator_arg, basicAllocator, resultCallback)
, d_encryptionClient_sp()
, d_upgradeCallback(bsl::allocator_arg, basicAllocator, upgradeCallback)
, d_upgradable()
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
}
Expand Down Expand Up @@ -1961,5 +1930,15 @@ void NtcListenerUtil::fail(Status* status,
}
}

const bsl::shared_ptr<ntci::Upgradable>& NtcChannel::upgradable() const
{
return d_upgradable;
}

bsl::shared_ptr<ntci::Upgradable>& NtcChannel::upgradable()
{
return d_upgradable;
}

} // close package namespace
} // close enterprise namespace
41 changes: 19 additions & 22 deletions src/groups/bmq/bmqio/bmqio_ntcchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ class NtcChannel : public bmqio::Channel,
bdlmt::Signaler<WatermarkFnType> d_watermarkSignaler;
bdlmt::Signaler<CloseFnType> d_closeSignaler;
bmqio::ChannelFactory::ResultCallback d_resultCallback;
bsl::shared_ptr<ntci::EncryptionClient> d_encryptionClient_sp;
ntci::UpgradeFunction d_upgradeCallback;
bslma::Allocator* d_allocator_p;
bsl::shared_ptr<ntci::Upgradable> d_upgradable;
bslma::Allocator* d_allocator_p;

private:
// NOT IMPLEMENTED
Expand Down Expand Up @@ -312,16 +311,6 @@ class NtcChannel : public bmqio::Channel,
const bmqio::ChannelFactory::ResultCallback& resultCallback,
bslma::Allocator* basicAllocator = 0);

/// Create a new channel implemented by the specified `interface`.
/// Optionally specify a `basicAllocator` used to supply memory. Initialize
/// this channel's upgrade callback with `upgradeCallback`. If
/// 'basicAllocator is 0, the currently installed default allocator is
/// used.
NtcChannel(const bsl::shared_ptr<ntci::Interface>& interface,
const bmqio::ChannelFactory::ResultCallback& resultCallback,
const ntci::UpgradeFunction& upgradeCallback,
bslma::Allocator* basicAllocator = 0);

/// Destroy this object.
~NtcChannel() BSLS_KEYWORD_OVERRIDE;

Expand Down Expand Up @@ -431,6 +420,9 @@ class NtcChannel : public bmqio::Channel,
/// Set the write queue high watermark to the specified `highWatermark`.
void setWriteQueueHighWatermark(int highWatermark);

/// Set the upgradable handle if this channel has been upgraded.
void setUpgradable(const bsl::shared_ptr<ntci::Upgradable>& upgradable);

/// Assume the TLS server role and begin upgrading the socket from
/// being unencrypted to being encrypted with TLS. Invoke the specified
/// `upgradeCallback` when the socket has completed upgrading to TLS.
Expand Down Expand Up @@ -472,6 +464,11 @@ class NtcChannel : public bmqio::Channel,
/// Return the socket interface for this channel. This function is
/// undefined unless the channel has succesfully established a connection.
const ntci::StreamSocket& streamSocket() const;

/// Return the upgradable handle for the channel.
const bsl::shared_ptr<ntci::Upgradable>& upgradable() const;

bsl::shared_ptr<ntci::Upgradable>& upgradable();
};

// =====================
Expand Down Expand Up @@ -529,15 +526,15 @@ class NtcListener : public bmqio::ChannelFactoryOperationHandle,
};

// INSTANCE DATA
bslmt::Mutex d_mutex;
bsl::shared_ptr<ntci::Interface> d_interface_sp;
bsl::shared_ptr<ntci::ListenerSocket> d_listenerSocket_sp;
bsl::string d_localUri;
State d_state;
bmqio::ListenOptions d_options;
bmqvt::PropertyBag d_properties;
bdlmt::Signaler<CloseFnType> d_closeSignaler;
bmqio::ChannelFactory::ResultCallback d_resultCallback;
bslmt::Mutex d_mutex;
bsl::shared_ptr<ntci::Interface> d_interface_sp;
bsl::shared_ptr<ntci::ListenerSocket> d_listenerSocket_sp;
bsl::string d_localUri;
State d_state;
bmqio::ListenOptions d_options;
bmqvt::PropertyBag d_properties;
bdlmt::Signaler<CloseFnType> d_closeSignaler;
bmqio::ChannelFactory::ResultCallback d_resultCallback;
bsl::shared_ptr<ntci::EncryptionServer> d_encryptionServer_sp;
ntci::UpgradeFunction d_upgradeCallback;
bslma::Allocator* d_allocator_p;
Expand Down
82 changes: 65 additions & 17 deletions src/groups/bmq/bmqio/bmqio_ntcchannelfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,23 @@ void NtcChannelFactory::processListenerResult(
<< AddressFormatter(alias.get()) << " to "
<< alias->peerUri() << " registered"
<< BALL_LOG_END;

// Check if we need to upgrade the connection to TLS
if (d_encryptionServer) {
alias->upgrade(d_encryptionServer,
ntca::UpgradeOptions(),
bdlf::BindUtil::bindS(
d_allocator_p,
&NtcChannelFactory::processUpgrade,
this,
event,
status,
channel,
bdlf::PlaceHolders::_1,
bdlf::PlaceHolders::_2,
callback));
return; // RETURN
}
}
}
}
Expand Down Expand Up @@ -145,6 +162,7 @@ void NtcChannelFactory::processChannelResult(
const bsl::shared_ptr<bmqio::Channel>& channel,
const bmqio::ChannelFactory::ResultCallback& callback)
{
// Result callback for connect
BALL_LOG_TRACE << "NTC factory event " << event << " status " << status
<< BALL_LOG_END;

Expand All @@ -155,6 +173,23 @@ void NtcChannelFactory::processChannelResult(
if (alias) {
d_createSignaler(alias, alias);
}

// Check if we need to upgrade the connection to TLS
if (d_encryptionClient) {
alias->upgrade(
d_encryptionClient,
ntca::UpgradeOptions(),
bdlf::BindUtil::bindS(d_allocator_p,
&NtcChannelFactory::processUpgrade,
this,
event,
status,
channel,
bdlf::PlaceHolders::_1,
bdlf::PlaceHolders::_2,
callback));
return; // RETURN
}
}
}

Expand Down Expand Up @@ -184,12 +219,25 @@ void NtcChannelFactory::processChannelClosed(int handle)
}

void NtcChannelFactory::processUpgrade(
const bsl::shared_ptr<ntci::Upgradable>& upgradable,
const ntca::UpgradeEvent& event,
const UpgradeCallback& onUpgrade)
bmqio::ChannelFactoryEvent::Enum event,
const bmqio::Status& status,
const bsl::shared_ptr<bmqio::NtcChannel>& channel,
const bsl::shared_ptr<ntci::Upgradable>& upgradable,
const ntca::UpgradeEvent& upgradeEvent,
const bmqio::ChannelFactory::ResultCallback& callback)
{
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
onUpgrade(upgradable, event);
if (upgradeEvent.isError()) {
BALL_LOG_ERROR << "Received error during TLS negotiation: " << event;
bmqio::Status st(bmqio::StatusCategory::e_GENERIC_ERROR,
d_allocator_p);
channel->close(st);
callback(ChannelFactoryEvent::e_CONNECT_FAILED, st, channel);
return; // RETURN
}

channel->setUpgradable(upgradable);

callback(event, status, channel);
}

// CREATORS
Expand All @@ -206,6 +254,8 @@ NtcChannelFactory::NtcChannelFactory(
, d_stateMutex()
, d_stateCondition()
, d_state(e_STATE_DEFAULT)
, d_encryptionServer()
, d_encryptionClient()
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
}
Expand All @@ -223,6 +273,8 @@ NtcChannelFactory::NtcChannelFactory(
, d_stateMutex()
, d_stateCondition()
, d_state(e_STATE_DEFAULT)
, d_encryptionServer()
, d_encryptionClient()
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
bsl::shared_ptr<bdlbb::BlobBufferFactory> blobBufferFactory_sp(
Expand Down Expand Up @@ -469,22 +521,18 @@ int NtcChannelFactory::lookupChannel(
return d_channels.find(channelId, result);
}

ntsa::Error NtcChannelFactory::createEncryptionServer(
bsl::shared_ptr<ntci::EncryptionServer>* result,
const ntca::EncryptionServerOptions& options)
NtcChannelFactory& NtcChannelFactory::setEncryptionServer(
const bsl::shared_ptr<ntci::EncryptionServer>& encryptionServer)
{
return d_interface_sp->createEncryptionServer(result,
options,
d_allocator_p);
d_encryptionServer = encryptionServer;
return *this;
}

ntsa::Error NtcChannelFactory::createEncryptionClient(
bsl::shared_ptr<ntci::EncryptionClient>* result,
const ntca::EncryptionClientOptions& options)
NtcChannelFactory& NtcChannelFactory::setEncryptionClient(
const bsl::shared_ptr<ntci::EncryptionClient>& encryptionClient)
{
return d_interface_sp->createEncryptionClient(result,
options,
d_allocator_p);
d_encryptionClient = encryptionClient;
return *this;
}

NtcCertificateLoader NtcChannelFactory::createCertificateLoader()
Expand Down
47 changes: 22 additions & 25 deletions src/groups/bmq/bmqio/bmqio_ntcchannelfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,18 @@ class NtcChannelFactory : public bmqio::ChannelFactory {
};

// INSTANCE DATA
bsl::shared_ptr<ntci::Interface> d_interface_sp;
ListenerCatalog d_listeners;
ChannelCatalog d_channels;
bdlmt::Signaler<CreateFnType> d_createSignaler;
bdlmt::Signaler<LimitFnType> d_limitSignaler;
bool d_owned;
bslmt::Mutex d_stateMutex;
bslmt::Condition d_stateCondition;
State d_state;
bslma::Allocator* d_allocator_p;
bsl::shared_ptr<ntci::Interface> d_interface_sp;
ListenerCatalog d_listeners;
ChannelCatalog d_channels;
bdlmt::Signaler<CreateFnType> d_createSignaler;
bdlmt::Signaler<LimitFnType> d_limitSignaler;
bool d_owned;
bslmt::Mutex d_stateMutex;
bslmt::Condition d_stateCondition;
State d_state;
bsl::shared_ptr<ntci::EncryptionServer> d_encryptionServer;
bsl::shared_ptr<ntci::EncryptionClient> d_encryptionClient;
bslma::Allocator* d_allocator_p;

private:
// NOT IMPLEMENTED
Expand Down Expand Up @@ -155,15 +157,12 @@ class NtcChannelFactory : public bmqio::ChannelFactory {
void processChannelClosed(int handle);

/// Process a TLS upgrade
void processUpgrade(const bsl::shared_ptr<ntci::Upgradable>& upgradable,
const ntca::UpgradeEvent& event,
const UpgradeCallback& callback);

/// Upgrade the channel to a TLS connection as a listener.
void upgradeListener(bmqio::NtcChannel* channel);

/// Upgrade the channel to a TLS connection as a client.
void upgradeChannel(bmqio::NtcChannel* channel);
void processUpgrade(bmqio::ChannelFactoryEvent::Enum event,
const bmqio::Status& status,
const bsl::shared_ptr<bmqio::NtcChannel>& channel,
const bsl::shared_ptr<ntci::Upgradable>& upgradable,
const ntca::UpgradeEvent& upgradeEvent,
const bmqio::ChannelFactory::ResultCallback& callback);

public:
// PUBLIC TYPES
Expand Down Expand Up @@ -267,19 +266,17 @@ class NtcChannelFactory : public bmqio::ChannelFactory {
/// Load into the specified `result` a new encryption server with the
/// specified `options`. Optionally specify a `basicAllocator` used to
/// supply memory. Return the error.
ntsa::Error
createEncryptionServer(bsl::shared_ptr<ntci::EncryptionServer>* result,
const ntca::EncryptionServerOptions& options);
NtcChannelFactory& setEncryptionServer(
const bsl::shared_ptr<ntci::EncryptionServer>& encryptionServer);

/// @brief Create an encryption server using this channel factory's
/// interface.
///
/// Load into the specified `result` a new encryption client with the
/// specified `options`. Optionally specify a `basicAllocator` used to
/// supply memory. Return the error.
ntsa::Error
createEncryptionClient(bsl::shared_ptr<ntci::EncryptionClient>* result,
const ntca::EncryptionClientOptions& options);
NtcChannelFactory& setEncryptionClient(
const bsl::shared_ptr<ntci::EncryptionClient>& encryptionServer);

/// @brief Create a certificate loader based on the underlying interface.
NtcCertificateLoader createCertificateLoader();
Expand Down
4 changes: 2 additions & 2 deletions src/groups/bmq/bmqio/bmqio_ntcchannelfactory.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ EncryptionPair makeEncryption(ntci::Interface* interface,

ntca::EncryptionServerOptions encryptionServerOptions;
encryptionServerOptions.setMinMethod(ntca::EncryptionMethod::e_TLS_V1_3);
encryptionServerOptions.setMaxMethod(ntca::EncryptionMethod::e_TLS_V1_3);
encryptionServerOptions.setMaxMethod(ntca::EncryptionMethod::e_DEFAULT);
encryptionServerOptions.setAuthentication(
ntca::EncryptionAuthentication::e_NONE);

Expand Down Expand Up @@ -203,7 +203,7 @@ EncryptionPair makeEncryption(ntci::Interface* interface,

ntca::EncryptionClientOptions encryptionClientOptions;
encryptionClientOptions.setMinMethod(ntca::EncryptionMethod::e_TLS_V1_3);
encryptionClientOptions.setMaxMethod(ntca::EncryptionMethod::e_TLS_V1_3);
encryptionClientOptions.setMaxMethod(ntca::EncryptionMethod::e_DEFAULT);
encryptionClientOptions.setAuthentication(
ntca::EncryptionAuthentication::e_VERIFY);

Expand Down
Loading

0 comments on commit a5ff6ec

Please sign in to comment.