Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generating unique Subscription ids for each request #135

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/applications/bmqtool/m_bmqtool_inpututil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ void InputUtil::verifyProperties(

bsl::unordered_set<bsl::string> pairs;

pairs.insert("pairs_");

while (it.hasNext()) {
bsl::string name = it.name();

Expand Down Expand Up @@ -201,7 +203,7 @@ void InputUtil::verifyProperties(
break;
}
case bmqt::PropertyType::e_SHORT: {
BSLS_ASSERT_SAFE(it.getAsShort()() ==
BSLS_ASSERT_SAFE(it.getAsShort() ==
in.getPropertyAsShort(name));
break;
}
Expand Down
33 changes: 23 additions & 10 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3531,14 +3531,20 @@ void BrokerSession::processPushEvent(const bmqp::Event& event)
citer != sIds.end();
++citer) {
bmqt::CorrelationId correlationId;
unsigned int subscriptionHandleId;
const QueueManager::QueueSp queue =
d_queueManager.observePushEvent(&correlationId, *citer);
d_queueManager.observePushEvent(&correlationId,
&subscriptionHandleId,
*citer);

BSLS_ASSERT(queue);
queueEvent->insertQueue(citer->d_subscriptionId, queue);

queueEvent->addCorrelationId(correlationId,
citer->d_subscriptionId);
// Use 'subscriptionHandle' instead of the internal
// 'citer->d_subscriptionId' so that
// 'bmqimp::Event::subscriptionId()' returns 'subscriptionHandle'

queueEvent->addCorrelationId(correlationId, subscriptionHandleId);
}

// Update event bytes
Expand Down Expand Up @@ -5239,7 +5245,12 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue,

bmqp_ctrlmsg::Subscription subscription(d_allocator_p);

subscription.sId() = cit->first.id();
const unsigned int internalSubscriptionId =
++d_nextInternalSubscriptionId;

subscription.sId() = internalSubscriptionId;
// Using unique id instead of 'SubscriptionHandle::id()'

subscription.consumers().emplace_back(ci);

bmqp_ctrlmsg::ExpressionVersion::Value version;
Expand All @@ -5260,9 +5271,9 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue,
subscription.expression().text() = from.expression().text();

streamParams.subscriptions().emplace_back(subscription);
d_queueManager.registerSubscription(queue,
cit->first.id(),
cit->first.correlationId());
queue->registerInternalSubscriptionId(internalSubscriptionId,
cit->first.id(),
cit->first.correlationId());
}
return context; // RETURN
}
Expand Down Expand Up @@ -5298,9 +5309,9 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue,
streamParams.consumerPriority() = options.consumerPriority();
streamParams.consumerPriorityCount() = 1;

d_queueManager.registerSubscription(queue,
queue->subQueueId(),
bmqt::CorrelationId());
queue->registerInternalSubscriptionId(queue->subQueueId(),
queue->subQueueId(),
bmqt::CorrelationId());
}

return context;
Expand Down Expand Up @@ -5624,6 +5635,7 @@ BrokerSession::BrokerSession(
, d_messageExpirationTimeoutHandle()
, d_nextRequestGroupId(k_NON_BUFFERED_REQUEST_GROUP_ID)
, d_queueRetransmissionTimeoutMap(allocator)
, d_nextInternalSubscriptionId(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_scheduler_p->clockType() ==
Expand Down Expand Up @@ -6123,6 +6135,7 @@ void BrokerSession::onConfigureQueueResponse(
res == bmqt::GenericResult::e_NOT_CONNECTED ||
res == bmqt::GenericResult::e_NOT_SUPPORTED);

(void)res;
BALL_LOG_INFO << "Ignore cancelled request: "
<< context->request();
return; // RETURN
Expand Down
3 changes: 3 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.h
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,9 @@ class BrokerSession BSLS_CPP11_FINAL {
// retransmission timeout provided by
// the broker

unsigned int d_nextInternalSubscriptionId;
// Assists generating unique ids for Configure requests.

private:
// NOT IMPLEMENTED
BrokerSession(const BrokerSession&);
Expand Down
13 changes: 7 additions & 6 deletions src/groups/bmq/bmqimp/bmqimp_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ class Event {
/// undefined unless 0 <= 'position' < numCorrrelationIds(), and event's
/// type() is MESSAGEEVENT, 'messageEventMode()' is READ and the
/// underlying raw event is of type PUSH.
const unsigned int subscriptionId(int position) const;
unsigned int subscriptionId(int position) const;

// MANIPULATORS

Expand Down Expand Up @@ -500,8 +500,8 @@ class Event {
/// event's type() is MESSAGEEVENT, 'messageEventMode()' is READ and the
/// underlying raw event is of type ACK, PUT or PUSH.
void addCorrelationId(const bmqt::CorrelationId& correlationId,
unsigned int subscriptionId =
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID);
unsigned int subscriptionHandleId =
bmqt::SubscriptionHandle::k_INVALID_HANDLE_ID);

/// Insert the specified `queue` to the queues and the specified
/// `corrId` to the list of correlationIds associated with this event.
Expand Down Expand Up @@ -746,7 +746,7 @@ inline const bmqt::CorrelationId& Event::correlationId(int position) const
return d_correlationIds[position].first;
}

inline const unsigned int Event::subscriptionId(int position) const
inline unsigned int Event::subscriptionId(int position) const
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(type() == EventType::e_MESSAGE);
Expand Down Expand Up @@ -798,7 +798,7 @@ inline bmqp::PutEventBuilder* Event::putEventBuilder()
}

inline void Event::addCorrelationId(const bmqt::CorrelationId& correlationId,
unsigned int subscriptionId)
unsigned int subscriptionHandleId)
{
// TODO: when ACK event is created locally we have to fill d_correlationIds
// before the raw ACK 'bmqp::Event' is created and may be used to
Expand All @@ -810,7 +810,8 @@ inline void Event::addCorrelationId(const bmqt::CorrelationId& correlationId,
// BSLS_ASSERT_SAFE(messageEventMode() == MessageEventMode::e_READ);
// BSLS_ASSERT_SAFE(d_rawEvent.isAckEvent());

d_correlationIds.push_back(bsl::make_pair(correlationId, subscriptionId));
d_correlationIds.push_back(
bsl::make_pair(correlationId, subscriptionHandleId));
}

} // close package namespace
Expand Down
9 changes: 6 additions & 3 deletions src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,16 @@ void MessageDumper::dumpPushEvent(bsl::ostream& out, const bmqp::Event& event)
unsigned int subscriptionId;
bmqp::RdaInfo rdaInfo;
bmqt::CorrelationId correlationId;
unsigned int subscriptionHandleId;

iter.extractQueueInfo(&qId, &subscriptionId, &rdaInfo);

QueueManager::QueueSp queue =
d_queueManager_p->lookupQueueBySubscriptionId(&correlationId,
qId,
subscriptionId);
d_queueManager_p->lookupQueueBySubscriptionId(
&correlationId,
&subscriptionHandleId,
qId,
subscriptionId);
BSLS_ASSERT_SAFE(queue);

out << "PUSH Message #" << ++msgNum << ": "
Expand Down
4 changes: 3 additions & 1 deletion src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,9 @@ void Tester::registerSubscription(const bslstl::StringRef& uri,

BSLS_ASSERT_SAFE(queue);

d_queueManager.registerSubscription(queue, subscriptionId, correlationId);
queue->registerInternalSubscriptionId(subscriptionId,
subscriptionId,
correlationId);
}

void Tester::updateSubscriptions(const bslstl::StringRef& uri,
Expand Down
1 change: 1 addition & 0 deletions src/groups/bmq/bmqimp/bmqimp_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ Queue::Queue(bslma::Allocator* allocator)
, d_schemaLearner(allocator)
, d_schemaLearnerContext(d_schemaLearner.createContext())
, d_config(allocator)
, d_registeredInternalSubscriptionIds(allocator)
{
d_handleParameters.uri() = "";
d_handleParameters.flags() = bmqt::QueueFlagsUtil::empty();
Expand Down
53 changes: 53 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ struct QueueStatsUtil {

/// Representation of a Queue (properties, stats, state, ...)
class Queue {
public:
// PUBLIC TYPES
typedef bsl::pair<unsigned int, bmqt::CorrelationId> SubscriptionHandle;
// Not using private 'bmqt::SubscriptionHandle' ctor

private:
// DATA
bslma::Allocator* d_allocator_p;
Expand Down Expand Up @@ -256,6 +261,13 @@ class Queue {

bmqp_ctrlmsg::StreamParameters d_config;

bsl::unordered_map<unsigned int, SubscriptionHandle>
d_registeredInternalSubscriptionIds;
// This keeps SubscriptionHandle (id and CorrelationId) for Configure
// response processing.
// Supporting multiple concurrent Configure requests.
// TODO: This should go into ConfigureRequest context.

private:
// NOT IMPLEMENTED

Expand Down Expand Up @@ -325,6 +337,20 @@ class Queue {
/// reinitialize the state before a new start).
void clearStatContext();

void
registerInternalSubscriptionId(unsigned int internalSubscriptionId,
unsigned int subscriptionHandleId,
const bmqt::CorrelationId& correlationId);
// Keep the specified 'subscriptionHandleId' and 'correlationId'
// associated with the specified 'internalSubscriptionId' between
// Configure request and Configure response (until
// 'extractSubscriptionHandle').

SubscriptionHandle
extractSubscriptionHandle(unsigned int internalSubscriptionId);
// Lookup, copy, erase, and return the copy of what was registered
// by 'registerInternalSubscriptionId'.

// ACCESSORS

/// Return true if this Queue object has a SubQueueId having the default
Expand Down Expand Up @@ -528,6 +554,33 @@ inline Queue& Queue::setConfig(const bmqp_ctrlmsg::StreamParameters& value)
return *this;
}

inline void
Queue::registerInternalSubscriptionId(unsigned int internalSubscriptionId,
unsigned int subscriptionHandleId,
const bmqt::CorrelationId& correlationId)
{
d_registeredInternalSubscriptionIds.emplace(
internalSubscriptionId,
SubscriptionHandle(subscriptionHandleId, correlationId));
}

inline Queue::SubscriptionHandle
Queue::extractSubscriptionHandle(unsigned int internalSubscriptionId)
{
bsl::unordered_map<unsigned int, SubscriptionHandle>::const_iterator cit =
d_registeredInternalSubscriptionIds.find(internalSubscriptionId);

if (cit == d_registeredInternalSubscriptionIds.end()) {
return {internalSubscriptionId, bmqt::CorrelationId()}; // RETURN
}

SubscriptionHandle result(cit->second);

d_registeredInternalSubscriptionIds.erase(cit);
Copy link
Collaborator

@678098 678098 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct that Queue is accessed from one thread only? So modifications of d_registeredInternalSubscriptionIds container do not break anything?

Copy link
Collaborator Author

@dorjesinpo dorjesinpo Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is the FSM thread. So is createConfigureQueueContext! So, there is no need for atomic or Once (or static).
Will commit new changes.


return result;
}

// ACCESSORS
inline QueueState::Enum Queue::state() const
{
Expand Down
Loading
Loading