Skip to content

Commit

Permalink
#152 Subscriptions: send configureStream requests internally
Browse files Browse the repository at this point in the history
Signed-off-by: Vitaly Dzhitenov <[email protected]>
  • Loading branch information
dorjesinpo authored Dec 5, 2023
1 parent 1ec897a commit 180c3e8
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 11 deletions.
9 changes: 9 additions & 0 deletions src/groups/mqb/mqba/mqba_sessionnegotiator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ void loadBrokerIdentity(bmqp_ctrlmsg::ClientIdentity* identity,
{
bool shouldExtendMessageProperties = false;

// TODO: make this unconditional. Currently, 'V2' is controlled by config
// as a means to prevent SDK from generating 'V2'.
// Regardless of SDK, brokers now decompress MPs and send ConfigureStream.

if (mqbcfg::BrokerConfig::get().brokerVersion() == 999999) {
// Always advertise v2 (EX) support in test build (developer workflow,
// CI, Jenkins, etc).
Expand Down Expand Up @@ -546,6 +550,11 @@ SessionNegotiator::onClientIdentityMessage(bsl::ostream& errorDescription,
else {
bool shouldExtendMessageProperties = false;

// TODO: make this unconditional. Currently, 'V2' is controlled by
// config as a means to prevent SDK from generating 'V2'.
// Regardless of SDK, brokers now decompress MPs and send
// ConfigureStream.

if (mqbcfg::BrokerConfig::get().brokerVersion() == 999999) {
// Always advertise v2 (EX) support in test build (developer
// workflow, CI, Jenkins, etc).
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3166,7 +3166,8 @@ bool ClusterQueueHelper::sendConfigureQueueRequest(
// TODO: Replace with 'ConfigureStream' once all brokers recognize it

const mqbcfg::AppConfig& brkrCfg = mqbcfg::BrokerConfig::get();
if (brkrCfg.brokerVersion() == bmqp::Protocol::k_DEV_VERSION) {
if (brkrCfg.brokerVersion() == bmqp::Protocol::k_DEV_VERSION ||
brkrCfg.configureStream()) {
bmqp_ctrlmsg::ConfigureStream& qs =
request->request().choice().makeConfigureStream();

Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_routers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ void Routers::AppContext::load(

if (!itGroup) {
const mqbcfg::AppConfig& brkrCfg = mqbcfg::BrokerConfig::get();
if (brkrCfg.brokerVersion() == bmqp::Protocol::k_DEV_VERSION) {
if (brkrCfg.brokerVersion() == bmqp::Protocol::k_DEV_VERSION ||
brkrCfg.configureStream()) {
// This must be the same as in
// 'ClusterQueueHelper::sendConfigureQueueRequest'

Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbcfg/mqbcfg.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
bmqconfConfig........: configuration for bmqconf
plugins..............: configuration for the plugins
msgPropertiesSupport.: information about if/how to advertise support for v2 message properties
configureStream......: send new ConfigureStream instead of old ConfigureQueue/>
</documentation>
</annotation>
<sequence>
Expand All @@ -95,6 +96,7 @@
<element name='bmqconfConfig' type='tns:BmqconfConfig'/>
<element name='plugins' type='tns:Plugins'/>
<element name='messagePropertiesV2' type='tns:MessagePropertiesV2'/>
<element name='configureStream' type='boolean' default='false'/>
</sequence>
</complexType>

Expand Down
25 changes: 21 additions & 4 deletions src/groups/mqb/mqbcfg/mqbcfg_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5667,6 +5667,8 @@ const char AppConfig::CLASS_NAME[] = "AppConfig";
const char AppConfig::DEFAULT_INITIALIZER_LATENCY_MONITOR_DOMAIN[] =
"bmq.sys.latemon.latency";

const bool AppConfig::DEFAULT_INITIALIZER_CONFIGURE_STREAM = false;

const bdlat_AttributeInfo AppConfig::ATTRIBUTE_INFO_ARRAY[] = {
{ATTRIBUTE_ID_BROKER_INSTANCE_NAME,
"brokerInstanceName",
Expand Down Expand Up @@ -5747,14 +5749,19 @@ const bdlat_AttributeInfo AppConfig::ATTRIBUTE_INFO_ARRAY[] = {
"messagePropertiesV2",
sizeof("messagePropertiesV2") - 1,
"",
bdlat_FormattingMode::e_DEFAULT}};
bdlat_FormattingMode::e_DEFAULT},
{ATTRIBUTE_ID_CONFIGURE_STREAM,
"configureStream",
sizeof("configureStream") - 1,
"",
bdlat_FormattingMode::e_TEXT}};

// CLASS METHODS

const bdlat_AttributeInfo* AppConfig::lookupAttributeInfo(const char* name,
int nameLength)
{
for (int i = 0; i < 16; ++i) {
for (int i = 0; i < 17; ++i) {
const bdlat_AttributeInfo& attributeInfo =
AppConfig::ATTRIBUTE_INFO_ARRAY[i];

Expand Down Expand Up @@ -5802,6 +5809,8 @@ const bdlat_AttributeInfo* AppConfig::lookupAttributeInfo(int id)
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_PLUGINS];
case ATTRIBUTE_ID_MESSAGE_PROPERTIES_V2:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2];
case ATTRIBUTE_ID_CONFIGURE_STREAM:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM];
default: return 0;
}
}
Expand All @@ -5826,6 +5835,7 @@ AppConfig::AppConfig(bslma::Allocator* basicAllocator)
, d_configVersion()
, d_logsObserverMaxSize()
, d_isRunningOnDev()
, d_configureStream(DEFAULT_INITIALIZER_CONFIGURE_STREAM)
{
}

Expand All @@ -5847,6 +5857,7 @@ AppConfig::AppConfig(const AppConfig& original,
, d_configVersion(original.d_configVersion)
, d_logsObserverMaxSize(original.d_logsObserverMaxSize)
, d_isRunningOnDev(original.d_isRunningOnDev)
, d_configureStream(original.d_configureStream)
{
}

Expand All @@ -5868,7 +5879,8 @@ AppConfig::AppConfig(AppConfig&& original) noexcept
d_brokerVersion(bsl::move(original.d_brokerVersion)),
d_configVersion(bsl::move(original.d_configVersion)),
d_logsObserverMaxSize(bsl::move(original.d_logsObserverMaxSize)),
d_isRunningOnDev(bsl::move(original.d_isRunningOnDev))
d_isRunningOnDev(bsl::move(original.d_isRunningOnDev)),
d_configureStream(bsl::move(original.d_configureStream))
{
}

Expand All @@ -5891,6 +5903,7 @@ AppConfig::AppConfig(AppConfig&& original, bslma::Allocator* basicAllocator)
, d_configVersion(bsl::move(original.d_configVersion))
, d_logsObserverMaxSize(bsl::move(original.d_logsObserverMaxSize))
, d_isRunningOnDev(bsl::move(original.d_isRunningOnDev))
, d_configureStream(bsl::move(original.d_configureStream))
{
}
#endif
Expand Down Expand Up @@ -5920,6 +5933,7 @@ AppConfig& AppConfig::operator=(const AppConfig& rhs)
d_bmqconfConfig = rhs.d_bmqconfConfig;
d_plugins = rhs.d_plugins;
d_messagePropertiesV2 = rhs.d_messagePropertiesV2;
d_configureStream = rhs.d_configureStream;
}

return *this;
Expand All @@ -5946,6 +5960,7 @@ AppConfig& AppConfig::operator=(AppConfig&& rhs)
d_bmqconfConfig = bsl::move(rhs.d_bmqconfConfig);
d_plugins = bsl::move(rhs.d_plugins);
d_messagePropertiesV2 = bsl::move(rhs.d_messagePropertiesV2);
d_configureStream = bsl::move(rhs.d_configureStream);
}

return *this;
Expand All @@ -5970,6 +5985,7 @@ void AppConfig::reset()
bdlat_ValueTypeFunctions::reset(&d_bmqconfConfig);
bdlat_ValueTypeFunctions::reset(&d_plugins);
bdlat_ValueTypeFunctions::reset(&d_messagePropertiesV2);
d_configureStream = DEFAULT_INITIALIZER_CONFIGURE_STREAM;
}

// ACCESSORS
Expand All @@ -5996,6 +6012,7 @@ AppConfig::print(bsl::ostream& stream, int level, int spacesPerLevel) const
printer.printAttribute("bmqconfConfig", this->bmqconfConfig());
printer.printAttribute("plugins", this->plugins());
printer.printAttribute("messagePropertiesV2", this->messagePropertiesV2());
printer.printAttribute("configureStream", this->configureStream());
printer.end();
return stream;
}
Expand Down Expand Up @@ -6313,7 +6330,7 @@ Configuration::print(bsl::ostream& stream, int level, int spacesPerLevel) const
} // close package namespace
} // close enterprise namespace

// GENERATED BY BLP_BAS_CODEGEN_2023.10.25
// GENERATED BY BLP_BAS_CODEGEN_2023.10.07
// USING bas_codegen.pl -m msg --noAggregateConversion --noExternalization
// --noIdent --package mqbcfg --msgComponent messages mqbcfg.xsd
// ----------------------------------------------------------------------------
Expand Down
58 changes: 53 additions & 5 deletions src/groups/mqb/mqbcfg/mqbcfg_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -8009,6 +8009,8 @@ class AppConfig {
// bmqconfConfig........: configuration for bmqconf plugins..............:
// configuration for the plugins msgPropertiesSupport.: information about
// if/how to advertise support for v2 message properties
// configureStream......: send new ConfigureStream instead of old
// ConfigureQueue/>

// INSTANCE DATA
bsl::string d_brokerInstanceName;
Expand All @@ -8027,6 +8029,7 @@ class AppConfig {
int d_configVersion;
int d_logsObserverMaxSize;
bool d_isRunningOnDev;
bool d_configureStream;

public:
// TYPES
Expand All @@ -8046,10 +8049,11 @@ class AppConfig {
ATTRIBUTE_ID_NETWORK_INTERFACES = 12,
ATTRIBUTE_ID_BMQCONF_CONFIG = 13,
ATTRIBUTE_ID_PLUGINS = 14,
ATTRIBUTE_ID_MESSAGE_PROPERTIES_V2 = 15
ATTRIBUTE_ID_MESSAGE_PROPERTIES_V2 = 15,
ATTRIBUTE_ID_CONFIGURE_STREAM = 16
};

enum { NUM_ATTRIBUTES = 16 };
enum { NUM_ATTRIBUTES = 17 };

enum {
ATTRIBUTE_INDEX_BROKER_INSTANCE_NAME = 0,
Expand All @@ -8067,14 +8071,17 @@ class AppConfig {
ATTRIBUTE_INDEX_NETWORK_INTERFACES = 12,
ATTRIBUTE_INDEX_BMQCONF_CONFIG = 13,
ATTRIBUTE_INDEX_PLUGINS = 14,
ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2 = 15
ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2 = 15,
ATTRIBUTE_INDEX_CONFIGURE_STREAM = 16
};

// CONSTANTS
static const char CLASS_NAME[];

static const char DEFAULT_INITIALIZER_LATENCY_MONITOR_DOMAIN[];

static const bool DEFAULT_INITIALIZER_CONFIGURE_STREAM;

static const bdlat_AttributeInfo ATTRIBUTE_INFO_ARRAY[];

public:
Expand Down Expand Up @@ -8230,6 +8237,10 @@ class AppConfig {
// Return a reference to the modifiable "MessagePropertiesV2" attribute
// of this object.

bool& configureStream();
// Return a reference to the modifiable "ConfigureStream" attribute of
// this object.

// ACCESSORS
bsl::ostream&
print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
Expand Down Expand Up @@ -8333,6 +8344,9 @@ class AppConfig {
const MessagePropertiesV2& messagePropertiesV2() const;
// Return a reference offering non-modifiable access to the
// "MessagePropertiesV2" attribute of this object.

bool configureStream() const;
// Return the value of the "ConfigureStream" attribute of this object.
};

// FREE OPERATORS
Expand Down Expand Up @@ -15948,6 +15962,12 @@ int AppConfig::manipulateAttributes(t_MANIPULATOR& manipulator)
return ret;
}

ret = manipulator(&d_configureStream,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]);
if (ret) {
return ret;
}

return 0;
}

Expand Down Expand Up @@ -16032,6 +16052,11 @@ int AppConfig::manipulateAttribute(t_MANIPULATOR& manipulator, int id)
&d_messagePropertiesV2,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2]);
}
case ATTRIBUTE_ID_CONFIGURE_STREAM: {
return manipulator(
&d_configureStream,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]);
}
default: return NOT_FOUND;
}
}
Expand Down Expand Up @@ -16132,6 +16157,11 @@ inline MessagePropertiesV2& AppConfig::messagePropertiesV2()
return d_messagePropertiesV2;
}

inline bool& AppConfig::configureStream()
{
return d_configureStream;
}

// ACCESSORS
template <typename t_ACCESSOR>
int AppConfig::accessAttributes(t_ACCESSOR& accessor) const
Expand Down Expand Up @@ -16234,6 +16264,12 @@ int AppConfig::accessAttributes(t_ACCESSOR& accessor) const
return ret;
}

ret = accessor(d_configureStream,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]);
if (ret) {
return ret;
}

return 0;
}

Expand Down Expand Up @@ -16314,6 +16350,11 @@ int AppConfig::accessAttribute(t_ACCESSOR& accessor, int id) const
d_messagePropertiesV2,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2]);
}
case ATTRIBUTE_ID_CONFIGURE_STREAM: {
return accessor(
d_configureStream,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]);
}
default: return NOT_FOUND;
}
}
Expand Down Expand Up @@ -16414,6 +16455,11 @@ inline const MessagePropertiesV2& AppConfig::messagePropertiesV2() const
return d_messagePropertiesV2;
}

inline bool AppConfig::configureStream() const
{
return d_configureStream;
}

// ------------------------
// class ClustersDefinition
// ------------------------
Expand Down Expand Up @@ -17895,7 +17941,8 @@ inline bool mqbcfg::operator==(const mqbcfg::AppConfig& lhs,
lhs.networkInterfaces() == rhs.networkInterfaces() &&
lhs.bmqconfConfig() == rhs.bmqconfConfig() &&
lhs.plugins() == rhs.plugins() &&
lhs.messagePropertiesV2() == rhs.messagePropertiesV2();
lhs.messagePropertiesV2() == rhs.messagePropertiesV2() &&
lhs.configureStream() == rhs.configureStream();
}

inline bool mqbcfg::operator!=(const mqbcfg::AppConfig& lhs,
Expand Down Expand Up @@ -17931,6 +17978,7 @@ void mqbcfg::hashAppend(t_HASH_ALGORITHM& hashAlg,
hashAppend(hashAlg, object.bmqconfConfig());
hashAppend(hashAlg, object.plugins());
hashAppend(hashAlg, object.messagePropertiesV2());
hashAppend(hashAlg, object.configureStream());
}

inline bool mqbcfg::operator==(const mqbcfg::ClustersDefinition& lhs,
Expand Down Expand Up @@ -17999,7 +18047,7 @@ void mqbcfg::hashAppend(t_HASH_ALGORITHM& hashAlg,
} // close enterprise namespace
#endif

// GENERATED BY BLP_BAS_CODEGEN_2023.10.25
// GENERATED BY BLP_BAS_CODEGEN_2023.10.07
// USING bas_codegen.pl -m msg --noAggregateConversion --noExternalization
// --noIdent --package mqbcfg --msgComponent messages mqbcfg.xsd
// ----------------------------------------------------------------------------
Expand Down

0 comments on commit 180c3e8

Please sign in to comment.