diff --git a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp index 98851f08d..764cce80d 100644 --- a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp +++ b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp @@ -168,6 +168,10 @@ void loadBrokerIdentity(bmqp_ctrlmsg::ClientIdentity* identity, { bool shouldExtendMessageProperties = false; + // TODO: make this unconditional. Currently, 'V2' is controlled by config + // as a means to prevent SDK from generating 'V2'. + // Regardless of SDK, brokers now decompress MPs and send ConfigureStream. + if (mqbcfg::BrokerConfig::get().brokerVersion() == 999999) { // Always advertise v2 (EX) support in test build (developer workflow, // CI, Jenkins, etc). @@ -546,6 +550,11 @@ SessionNegotiator::onClientIdentityMessage(bsl::ostream& errorDescription, else { bool shouldExtendMessageProperties = false; + // TODO: make this unconditional. Currently, 'V2' is controlled by + // config as a means to prevent SDK from generating 'V2'. + // Regardless of SDK, brokers now decompress MPs and send + // ConfigureStream. + if (mqbcfg::BrokerConfig::get().brokerVersion() == 999999) { // Always advertise v2 (EX) support in test build (developer // workflow, CI, Jenkins, etc). diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index fa9b77562..48b689e9c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -3166,7 +3166,8 @@ bool ClusterQueueHelper::sendConfigureQueueRequest( // TODO: Replace with 'ConfigureStream' once all brokers recognize it const mqbcfg::AppConfig& brkrCfg = mqbcfg::BrokerConfig::get(); - if (brkrCfg.brokerVersion() == bmqp::Protocol::k_DEV_VERSION) { + if (brkrCfg.brokerVersion() == bmqp::Protocol::k_DEV_VERSION || + brkrCfg.configureStream()) { bmqp_ctrlmsg::ConfigureStream& qs = request->request().choice().makeConfigureStream(); diff --git a/src/groups/mqb/mqbblp/mqbblp_routers.cpp b/src/groups/mqb/mqbblp/mqbblp_routers.cpp index 10a462d78..a21d656f9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_routers.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_routers.cpp @@ -288,7 +288,8 @@ void Routers::AppContext::load( if (!itGroup) { const mqbcfg::AppConfig& brkrCfg = mqbcfg::BrokerConfig::get(); - if (brkrCfg.brokerVersion() == bmqp::Protocol::k_DEV_VERSION) { + if (brkrCfg.brokerVersion() == bmqp::Protocol::k_DEV_VERSION || + brkrCfg.configureStream()) { // This must be the same as in // 'ClusterQueueHelper::sendConfigureQueueRequest' diff --git a/src/groups/mqb/mqbcfg/mqbcfg.xsd b/src/groups/mqb/mqbcfg/mqbcfg.xsd index ba0f5e86d..ca9ba8396 100644 --- a/src/groups/mqb/mqbcfg/mqbcfg.xsd +++ b/src/groups/mqb/mqbcfg/mqbcfg.xsd @@ -75,6 +75,7 @@ bmqconfConfig........: configuration for bmqconf plugins..............: configuration for the plugins msgPropertiesSupport.: information about if/how to advertise support for v2 message properties + configureStream......: send new ConfigureStream instead of old ConfigureQueue/> @@ -95,6 +96,7 @@ + diff --git a/src/groups/mqb/mqbcfg/mqbcfg_messages.cpp b/src/groups/mqb/mqbcfg/mqbcfg_messages.cpp index cf1cadcc1..bb8eba117 100644 --- a/src/groups/mqb/mqbcfg/mqbcfg_messages.cpp +++ b/src/groups/mqb/mqbcfg/mqbcfg_messages.cpp @@ -5667,6 +5667,8 @@ const char AppConfig::CLASS_NAME[] = "AppConfig"; const char AppConfig::DEFAULT_INITIALIZER_LATENCY_MONITOR_DOMAIN[] = "bmq.sys.latemon.latency"; +const bool AppConfig::DEFAULT_INITIALIZER_CONFIGURE_STREAM = false; + const bdlat_AttributeInfo AppConfig::ATTRIBUTE_INFO_ARRAY[] = { {ATTRIBUTE_ID_BROKER_INSTANCE_NAME, "brokerInstanceName", @@ -5747,14 +5749,19 @@ const bdlat_AttributeInfo AppConfig::ATTRIBUTE_INFO_ARRAY[] = { "messagePropertiesV2", sizeof("messagePropertiesV2") - 1, "", - bdlat_FormattingMode::e_DEFAULT}}; + bdlat_FormattingMode::e_DEFAULT}, + {ATTRIBUTE_ID_CONFIGURE_STREAM, + "configureStream", + sizeof("configureStream") - 1, + "", + bdlat_FormattingMode::e_TEXT}}; // CLASS METHODS const bdlat_AttributeInfo* AppConfig::lookupAttributeInfo(const char* name, int nameLength) { - for (int i = 0; i < 16; ++i) { + for (int i = 0; i < 17; ++i) { const bdlat_AttributeInfo& attributeInfo = AppConfig::ATTRIBUTE_INFO_ARRAY[i]; @@ -5802,6 +5809,8 @@ const bdlat_AttributeInfo* AppConfig::lookupAttributeInfo(int id) return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_PLUGINS]; case ATTRIBUTE_ID_MESSAGE_PROPERTIES_V2: return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2]; + case ATTRIBUTE_ID_CONFIGURE_STREAM: + return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]; default: return 0; } } @@ -5826,6 +5835,7 @@ AppConfig::AppConfig(bslma::Allocator* basicAllocator) , d_configVersion() , d_logsObserverMaxSize() , d_isRunningOnDev() +, d_configureStream(DEFAULT_INITIALIZER_CONFIGURE_STREAM) { } @@ -5847,6 +5857,7 @@ AppConfig::AppConfig(const AppConfig& original, , d_configVersion(original.d_configVersion) , d_logsObserverMaxSize(original.d_logsObserverMaxSize) , d_isRunningOnDev(original.d_isRunningOnDev) +, d_configureStream(original.d_configureStream) { } @@ -5868,7 +5879,8 @@ AppConfig::AppConfig(AppConfig&& original) noexcept d_brokerVersion(bsl::move(original.d_brokerVersion)), d_configVersion(bsl::move(original.d_configVersion)), d_logsObserverMaxSize(bsl::move(original.d_logsObserverMaxSize)), - d_isRunningOnDev(bsl::move(original.d_isRunningOnDev)) + d_isRunningOnDev(bsl::move(original.d_isRunningOnDev)), + d_configureStream(bsl::move(original.d_configureStream)) { } @@ -5891,6 +5903,7 @@ AppConfig::AppConfig(AppConfig&& original, bslma::Allocator* basicAllocator) , d_configVersion(bsl::move(original.d_configVersion)) , d_logsObserverMaxSize(bsl::move(original.d_logsObserverMaxSize)) , d_isRunningOnDev(bsl::move(original.d_isRunningOnDev)) +, d_configureStream(bsl::move(original.d_configureStream)) { } #endif @@ -5920,6 +5933,7 @@ AppConfig& AppConfig::operator=(const AppConfig& rhs) d_bmqconfConfig = rhs.d_bmqconfConfig; d_plugins = rhs.d_plugins; d_messagePropertiesV2 = rhs.d_messagePropertiesV2; + d_configureStream = rhs.d_configureStream; } return *this; @@ -5946,6 +5960,7 @@ AppConfig& AppConfig::operator=(AppConfig&& rhs) d_bmqconfConfig = bsl::move(rhs.d_bmqconfConfig); d_plugins = bsl::move(rhs.d_plugins); d_messagePropertiesV2 = bsl::move(rhs.d_messagePropertiesV2); + d_configureStream = bsl::move(rhs.d_configureStream); } return *this; @@ -5970,6 +5985,7 @@ void AppConfig::reset() bdlat_ValueTypeFunctions::reset(&d_bmqconfConfig); bdlat_ValueTypeFunctions::reset(&d_plugins); bdlat_ValueTypeFunctions::reset(&d_messagePropertiesV2); + d_configureStream = DEFAULT_INITIALIZER_CONFIGURE_STREAM; } // ACCESSORS @@ -5996,6 +6012,7 @@ AppConfig::print(bsl::ostream& stream, int level, int spacesPerLevel) const printer.printAttribute("bmqconfConfig", this->bmqconfConfig()); printer.printAttribute("plugins", this->plugins()); printer.printAttribute("messagePropertiesV2", this->messagePropertiesV2()); + printer.printAttribute("configureStream", this->configureStream()); printer.end(); return stream; } @@ -6313,7 +6330,7 @@ Configuration::print(bsl::ostream& stream, int level, int spacesPerLevel) const } // close package namespace } // close enterprise namespace -// GENERATED BY BLP_BAS_CODEGEN_2023.10.25 +// GENERATED BY BLP_BAS_CODEGEN_2023.10.07 // USING bas_codegen.pl -m msg --noAggregateConversion --noExternalization // --noIdent --package mqbcfg --msgComponent messages mqbcfg.xsd // ---------------------------------------------------------------------------- diff --git a/src/groups/mqb/mqbcfg/mqbcfg_messages.h b/src/groups/mqb/mqbcfg/mqbcfg_messages.h index 7702fff3f..15e81234e 100644 --- a/src/groups/mqb/mqbcfg/mqbcfg_messages.h +++ b/src/groups/mqb/mqbcfg/mqbcfg_messages.h @@ -8009,6 +8009,8 @@ class AppConfig { // bmqconfConfig........: configuration for bmqconf plugins..............: // configuration for the plugins msgPropertiesSupport.: information about // if/how to advertise support for v2 message properties + // configureStream......: send new ConfigureStream instead of old + // ConfigureQueue/> // INSTANCE DATA bsl::string d_brokerInstanceName; @@ -8027,6 +8029,7 @@ class AppConfig { int d_configVersion; int d_logsObserverMaxSize; bool d_isRunningOnDev; + bool d_configureStream; public: // TYPES @@ -8046,10 +8049,11 @@ class AppConfig { ATTRIBUTE_ID_NETWORK_INTERFACES = 12, ATTRIBUTE_ID_BMQCONF_CONFIG = 13, ATTRIBUTE_ID_PLUGINS = 14, - ATTRIBUTE_ID_MESSAGE_PROPERTIES_V2 = 15 + ATTRIBUTE_ID_MESSAGE_PROPERTIES_V2 = 15, + ATTRIBUTE_ID_CONFIGURE_STREAM = 16 }; - enum { NUM_ATTRIBUTES = 16 }; + enum { NUM_ATTRIBUTES = 17 }; enum { ATTRIBUTE_INDEX_BROKER_INSTANCE_NAME = 0, @@ -8067,7 +8071,8 @@ class AppConfig { ATTRIBUTE_INDEX_NETWORK_INTERFACES = 12, ATTRIBUTE_INDEX_BMQCONF_CONFIG = 13, ATTRIBUTE_INDEX_PLUGINS = 14, - ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2 = 15 + ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2 = 15, + ATTRIBUTE_INDEX_CONFIGURE_STREAM = 16 }; // CONSTANTS @@ -8075,6 +8080,8 @@ class AppConfig { static const char DEFAULT_INITIALIZER_LATENCY_MONITOR_DOMAIN[]; + static const bool DEFAULT_INITIALIZER_CONFIGURE_STREAM; + static const bdlat_AttributeInfo ATTRIBUTE_INFO_ARRAY[]; public: @@ -8230,6 +8237,10 @@ class AppConfig { // Return a reference to the modifiable "MessagePropertiesV2" attribute // of this object. + bool& configureStream(); + // Return a reference to the modifiable "ConfigureStream" attribute of + // this object. + // ACCESSORS bsl::ostream& print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const; @@ -8333,6 +8344,9 @@ class AppConfig { const MessagePropertiesV2& messagePropertiesV2() const; // Return a reference offering non-modifiable access to the // "MessagePropertiesV2" attribute of this object. + + bool configureStream() const; + // Return the value of the "ConfigureStream" attribute of this object. }; // FREE OPERATORS @@ -15948,6 +15962,12 @@ int AppConfig::manipulateAttributes(t_MANIPULATOR& manipulator) return ret; } + ret = manipulator(&d_configureStream, + ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]); + if (ret) { + return ret; + } + return 0; } @@ -16032,6 +16052,11 @@ int AppConfig::manipulateAttribute(t_MANIPULATOR& manipulator, int id) &d_messagePropertiesV2, ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2]); } + case ATTRIBUTE_ID_CONFIGURE_STREAM: { + return manipulator( + &d_configureStream, + ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]); + } default: return NOT_FOUND; } } @@ -16132,6 +16157,11 @@ inline MessagePropertiesV2& AppConfig::messagePropertiesV2() return d_messagePropertiesV2; } +inline bool& AppConfig::configureStream() +{ + return d_configureStream; +} + // ACCESSORS template int AppConfig::accessAttributes(t_ACCESSOR& accessor) const @@ -16234,6 +16264,12 @@ int AppConfig::accessAttributes(t_ACCESSOR& accessor) const return ret; } + ret = accessor(d_configureStream, + ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]); + if (ret) { + return ret; + } + return 0; } @@ -16314,6 +16350,11 @@ int AppConfig::accessAttribute(t_ACCESSOR& accessor, int id) const d_messagePropertiesV2, ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2]); } + case ATTRIBUTE_ID_CONFIGURE_STREAM: { + return accessor( + d_configureStream, + ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM]); + } default: return NOT_FOUND; } } @@ -16414,6 +16455,11 @@ inline const MessagePropertiesV2& AppConfig::messagePropertiesV2() const return d_messagePropertiesV2; } +inline bool AppConfig::configureStream() const +{ + return d_configureStream; +} + // ------------------------ // class ClustersDefinition // ------------------------ @@ -17895,7 +17941,8 @@ inline bool mqbcfg::operator==(const mqbcfg::AppConfig& lhs, lhs.networkInterfaces() == rhs.networkInterfaces() && lhs.bmqconfConfig() == rhs.bmqconfConfig() && lhs.plugins() == rhs.plugins() && - lhs.messagePropertiesV2() == rhs.messagePropertiesV2(); + lhs.messagePropertiesV2() == rhs.messagePropertiesV2() && + lhs.configureStream() == rhs.configureStream(); } inline bool mqbcfg::operator!=(const mqbcfg::AppConfig& lhs, @@ -17931,6 +17978,7 @@ void mqbcfg::hashAppend(t_HASH_ALGORITHM& hashAlg, hashAppend(hashAlg, object.bmqconfConfig()); hashAppend(hashAlg, object.plugins()); hashAppend(hashAlg, object.messagePropertiesV2()); + hashAppend(hashAlg, object.configureStream()); } inline bool mqbcfg::operator==(const mqbcfg::ClustersDefinition& lhs, @@ -17999,7 +18047,7 @@ void mqbcfg::hashAppend(t_HASH_ALGORITHM& hashAlg, } // close enterprise namespace #endif -// GENERATED BY BLP_BAS_CODEGEN_2023.10.25 +// GENERATED BY BLP_BAS_CODEGEN_2023.10.07 // USING bas_codegen.pl -m msg --noAggregateConversion --noExternalization // --noIdent --package mqbcfg --msgComponent messages mqbcfg.xsd // ----------------------------------------------------------------------------