Skip to content

Commit

Permalink
ConfigureStream unconditionally
Browse files Browse the repository at this point in the history
Signed-off-by: Vitaly Dzhitenov <[email protected]>
  • Loading branch information
dorjesinpo committed Mar 26, 2024
1 parent 4902091 commit 0b6adb1
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 123 deletions.
187 changes: 70 additions & 117 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,25 +183,18 @@ void fillDTSpanQueueBaggage(bmqpi::DTSpan::Baggage* baggage,
bool isConfigure(const bmqp_ctrlmsg::ControlMessage& request,
const bmqp_ctrlmsg::ControlMessage& response)
{
return bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION
? request.choice().isConfigureStreamValue() &&
response.choice().isConfigureStreamResponseValue()
: request.choice().isConfigureQueueStreamValue() &&
response.choice().isConfigureQueueStreamResponseValue();
return request.choice().isConfigureStreamValue() &&
response.choice().isConfigureStreamResponseValue();
}

bool isConfigure(const bmqp_ctrlmsg::ControlMessage& request)
{
return bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION
? request.choice().isConfigureStreamValue()
: request.choice().isConfigureQueueStreamValue();
return request.choice().isConfigureStreamValue();
}

bool isConfigureResponse(const bmqp_ctrlmsg::ControlMessage& request)
{
return bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION
? request.choice().isConfigureStreamResponseValue()
: request.choice().isConfigureQueueStreamResponseValue();
return request.choice().isConfigureStreamResponseValue();
}

void makeDeconfigure(bmqp_ctrlmsg::ControlMessage* request)
Expand Down Expand Up @@ -5194,127 +5187,87 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue,
}
context->setGroupId(grId);

if (bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION) {
// Make ConfigureStream request
bmqp_ctrlmsg::ConfigureStream& configureStream =
context->request().choice().makeConfigureStream();
configureStream.qId() = queue->id();
// Make ConfigureStream request
bmqp_ctrlmsg::ConfigureStream& configureStream =
context->request().choice().makeConfigureStream();
configureStream.qId() = queue->id();

// Populate the request's streamParameters
bmqp_ctrlmsg::StreamParameters& streamParams =
configureStream.streamParameters();
// Populate the request's streamParameters
bmqp_ctrlmsg::StreamParameters& streamParams =
configureStream.streamParameters();

if (!queue->hasDefaultSubQueueId()) {
streamParams.appId() = queue->uri().id();
} // else "__default"
if (!queue->hasDefaultSubQueueId()) {
streamParams.appId() = queue->uri().id();
} // else "__default"

if (isDeconfigure) {
// Empty Subscriptions
return context; // RETURN
}
if (isDeconfigure) {
// Empty Subscriptions
return context; // RETURN
}

bmqt::QueueOptions::SubscriptionsSnapshot snapshot(d_allocator_p);
options.loadSubscriptions(&snapshot);

for (bmqt::QueueOptions::SubscriptionsSnapshot::const_iterator cit =
snapshot.begin();
cit != snapshot.end();
++cit) {
bmqp_ctrlmsg::ConsumerInfo ci;
const bmqt::Subscription& from = cit->second;
if (from.hasMaxUnconfirmedMessages()) {
ci.maxUnconfirmedMessages() = from.maxUnconfirmedMessages();
}
else {
ci.maxUnconfirmedMessages() = options.maxUnconfirmedMessages();
}
if (from.hasMaxUnconfirmedBytes()) {
ci.maxUnconfirmedBytes() = from.maxUnconfirmedBytes();
}
else {
ci.maxUnconfirmedBytes() = options.maxUnconfirmedBytes();
}
if (from.hasConsumerPriority()) {
ci.consumerPriority() = from.consumerPriority();
}
else {
ci.consumerPriority() = options.consumerPriority();
}
bmqt::QueueOptions::SubscriptionsSnapshot snapshot(d_allocator_p);
options.loadSubscriptions(&snapshot);

ci.consumerPriorityCount() = 1;
for (bmqt::QueueOptions::SubscriptionsSnapshot::const_iterator cit =
snapshot.begin();
cit != snapshot.end();
++cit) {
bmqp_ctrlmsg::ConsumerInfo ci;
const bmqt::Subscription& from = cit->second;
if (from.hasMaxUnconfirmedMessages()) {
ci.maxUnconfirmedMessages() = from.maxUnconfirmedMessages();
}
else {
ci.maxUnconfirmedMessages() = options.maxUnconfirmedMessages();
}
if (from.hasMaxUnconfirmedBytes()) {
ci.maxUnconfirmedBytes() = from.maxUnconfirmedBytes();
}
else {
ci.maxUnconfirmedBytes() = options.maxUnconfirmedBytes();
}
if (from.hasConsumerPriority()) {
ci.consumerPriority() = from.consumerPriority();
}
else {
ci.consumerPriority() = options.consumerPriority();
}

bmqp_ctrlmsg::Subscription subscription(d_allocator_p);
ci.consumerPriorityCount() = 1;

const unsigned int internalSubscriptionId =
++d_nextInternalSubscriptionId;
bmqp_ctrlmsg::Subscription subscription(d_allocator_p);

subscription.sId() = internalSubscriptionId;
// Using unique id instead of 'SubscriptionHandle::id()'
const unsigned int internalSubscriptionId =
++d_nextInternalSubscriptionId;

subscription.consumers().emplace_back(ci);
subscription.sId() = internalSubscriptionId;
// Using unique id instead of 'SubscriptionHandle::id()'

bmqp_ctrlmsg::ExpressionVersion::Value version;
subscription.consumers().emplace_back(ci);

switch (from.expression().version()) {
case bmqt::SubscriptionExpression::e_NONE:
version = bmqp_ctrlmsg::ExpressionVersion::E_UNDEFINED;
break;
case bmqt::SubscriptionExpression::e_VERSION_1:
version = bmqp_ctrlmsg::ExpressionVersion::E_VERSION_1;
break;
default:
BSLS_ASSERT_SAFE(false);
version = bmqp_ctrlmsg::ExpressionVersion::E_UNDEFINED;
break;
}
subscription.expression().version() = version;
subscription.expression().text() = from.expression().text();
bmqp_ctrlmsg::ExpressionVersion::Value version;

streamParams.subscriptions().emplace_back(subscription);
queue->registerInternalSubscriptionId(internalSubscriptionId,
cit->first.id(),
cit->first.correlationId());
switch (from.expression().version()) {
case bmqt::SubscriptionExpression::e_NONE:
version = bmqp_ctrlmsg::ExpressionVersion::E_UNDEFINED;
break;
case bmqt::SubscriptionExpression::e_VERSION_1:
version = bmqp_ctrlmsg::ExpressionVersion::E_VERSION_1;
break;
default:
BSLS_ASSERT_SAFE(false);
version = bmqp_ctrlmsg::ExpressionVersion::E_UNDEFINED;
break;
}
return context; // RETURN
}

// Make ConfigureQueueStream request
bmqp_ctrlmsg::ConfigureQueueStream& configureQueueStream =
context->request().choice().makeConfigureQueueStream();
configureQueueStream.qId() = queue->id();
subscription.expression().version() = version;
subscription.expression().text() = from.expression().text();

// Populate the request's streamParameters
bmqp_ctrlmsg::QueueStreamParameters& streamParams =
configureQueueStream.streamParameters();

// Set the SubQueueIdInfo if non-default subQueueId
if (!queue->hasDefaultSubQueueId()) {
bmqp_ctrlmsg::SubQueueIdInfo& sqidInfo =
streamParams.subIdInfo().makeValueInplace();
sqidInfo.appId() = queue->uri().id();
sqidInfo.subId() = queue->subQueueId();
streamParams.subscriptions().emplace_back(subscription);
queue->registerInternalSubscriptionId(internalSubscriptionId,
cit->first.id(),
cit->first.correlationId());
}

streamParams.maxUnconfirmedMessages() = options.maxUnconfirmedMessages();
streamParams.maxUnconfirmedBytes() = options.maxUnconfirmedBytes();
streamParams.consumerPriority() = options.consumerPriority();

// Set consumerPriority and consumerPriorityCount
if (isDeconfigure) {
streamParams.consumerPriority() =
bmqp::Protocol::k_CONSUMER_PRIORITY_INVALID;
streamParams.consumerPriorityCount() = 0;
}
else {
streamParams.consumerPriority() = options.consumerPriority();
streamParams.consumerPriorityCount() = 1;

queue->registerInternalSubscriptionId(queue->subQueueId(),
queue->subQueueId(),
bmqt::CorrelationId());
}

return context;
return context; // RETURN
}

BrokerSession::RequestManagerType::RequestSp
Expand Down
4 changes: 1 addition & 3 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,7 @@ bool waitRealTime(bslmt::TimedSemaphore* sem)

bool isConfigure(const bmqp_ctrlmsg::ControlMessage& request)
{
return bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION
? request.choice().isConfigureStreamValue()
: request.choice().isConfigureQueueStreamValue();
return request.choice().isConfigureStreamValue();
}

void makeResponse(bmqp_ctrlmsg::ControlMessage* response,
Expand Down
8 changes: 5 additions & 3 deletions src/groups/mqb/mqba/mqba_clientsession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2584,9 +2584,11 @@ int main(int argc, char* argv[])
mwcsys::Time::initialize(s_allocator_p);

mqbcfg::AppConfig brokerConfig(s_allocator_p);
brokerConfig.brokerVersion() = 999999; // required for test case 8
// to convert msg properties
// from v1 to v2

// Assuming brokerConfig.messagePropertiesV2().MessagePropertiesV2()
// .advertiseV2Support() == true by default.
//
// Required for test case 8 to convert msg properties from v1 to v2
mqbcfg::BrokerConfig::set(brokerConfig);

bsl::shared_ptr<mwcst::StatContext> statContext =
Expand Down

0 comments on commit 0b6adb1

Please sign in to comment.