Skip to content

Commit

Permalink
Feat: track queue depth per appId
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Oct 2, 2024
1 parent f7067f8 commit 376d7b0
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 82 deletions.
16 changes: 10 additions & 6 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ void FileBackedStorage::setQueue(mqbi::Queue* queue)
d_virtualStorageCatalog.setQueue(queue);

// Update queue stats if a queue has been associated with the storage.

if (queue) {
d_virtualStorageCatalog.setQueueStats(queue->stats());

const bsls::Types::Int64 numMessage = numMessages(
mqbu::StorageKey::k_NULL_KEY);
const bsls::Types::Int64 numByte = numBytes(
Expand All @@ -263,6 +264,9 @@ void FileBackedStorage::setQueue(mqbi::Queue* queue)
<< mwcu::PrintUtil::prettyNumber(numByte)
<< " bytes of outstanding data.";
}
else {
d_virtualStorageCatalog.setQueueStats(NULL);
}
}

void FileBackedStorage::close()
Expand Down Expand Up @@ -381,8 +385,8 @@ FileBackedStorage::confirm(const bmqt::MessageGUID& msgGUID,
return mqbi::StorageResult::e_GUID_NOT_FOUND; // RETURN
}

mqbi::StorageResult::Enum rc = d_virtualStorageCatalog.confirm(msgGUID,
appKey);
const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.confirm(msgGUID, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
return rc; // RETURN
}
Expand All @@ -391,7 +395,7 @@ FileBackedStorage::confirm(const bmqt::MessageGUID& msgGUID,
BSLS_ASSERT_SAFE(!handles.empty());

DataStoreRecordHandle handle;
int writeResult = d_store_p->writeConfirmRecord(
const int writeResult = d_store_p->writeConfirmRecord(
&handle,
msgGUID,
d_queueKey,
Expand Down Expand Up @@ -830,8 +834,8 @@ void FileBackedStorage::processConfirmRecord(
--it->second.d_refCount; // Update outstanding refCount

if (!appKey.isNull()) {
mqbi::StorageResult::Enum rc = d_virtualStorageCatalog.confirm(guid,
appKey);
const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.confirm(guid, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
BALL_LOG_ERROR << "#STORAGE_INVALID_CONFIRM " << "Partition ["
<< partitionId() << "]"
Expand Down
9 changes: 7 additions & 2 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ void InMemoryStorage::setQueue(mqbi::Queue* queue)
// Update queue stats if a queue has been associated with the storage.

if (queue) {
d_virtualStorageCatalog.setQueueStats(queue->stats());

const bsls::Types::Int64 numMessage = numMessages(
mqbu::StorageKey::k_NULL_KEY);
const bsls::Types::Int64 numByte = numBytes(
Expand All @@ -128,6 +130,9 @@ void InMemoryStorage::setQueue(mqbi::Queue* queue)
<< mwcu::PrintUtil::prettyNumber(numByte)
<< " bytes of outstanding.";
}
else {
d_virtualStorageCatalog.setQueueStats(NULL);
}
}

void InMemoryStorage::close()
Expand Down Expand Up @@ -256,8 +261,8 @@ InMemoryStorage::confirm(const bmqt::MessageGUID& msgGUID,
}

if (!appKey.isNull()) {
mqbi::StorageResult::Enum rc = d_virtualStorageCatalog.confirm(msgGUID,
appKey);
const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.confirm(msgGUID, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
return rc; // RETURN
}
Expand Down
19 changes: 18 additions & 1 deletion src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ VirtualStorageCatalog::VirtualStorageCatalog(mqbi::Storage* storage,
, d_numMessages(0)
, d_defaultAppMessage(defaultAppMessage().d_rdaInfo)
, d_queue_p(0)
, d_stats_p(0)
, d_allocator_p(allocator)
{
// PRECONDITIONS
Expand All @@ -115,6 +116,10 @@ VirtualStorageCatalog::~VirtualStorageCatalog()
}

// MANIPULATORS
void VirtualStorageCatalog::setQueueStats(mqbstat::QueueStatsDomain* stats)
{
d_stats_p = stats;
}

VirtualStorageCatalog::DataStreamIterator
VirtualStorageCatalog::begin(const bmqt::MessageGUID& where)
Expand Down Expand Up @@ -270,8 +275,14 @@ VirtualStorageCatalog::confirm(const bmqt::MessageGUID& msgGUID,
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());

setup(&data->second);
const mqbi::StorageResult::Enum rc = it->value()->confirm(&data->second);
if (d_stats_p && mqbi::StorageResult::Enum::e_SUCCESS == rc) {
d_stats_p->onEvent(mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
data->second.d_size,
it->key1());
}

return it->value()->confirm(&data->second);
return rc;
}

mqbi::StorageResult::Enum
Expand Down Expand Up @@ -365,6 +376,12 @@ VirtualStorageCatalog::removeAll(const mqbu::StorageKey& appKey)
++itData;
}
}

if (d_stats_p) {
d_stats_p->onEvent(mqbstat::QueueStatsDomain::EventType::e_PURGE,
0,
itVs->key1());
}
}
else {
for (VirtualStoragesIter it = d_virtualStorages.begin();
Expand Down
37 changes: 22 additions & 15 deletions src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
// storages associated with a queue.

// MQB

#include <mqbi_storage.h>
#include <mqbs_virtualstorage.h>
#include <mqbstat_queuestats.h>
#include <mqbu_storagekey.h>

// MWC
Expand Down Expand Up @@ -107,37 +107,39 @@ class VirtualStorageCatalog {

private:
// DATA
mqbi::Storage* d_storage_p; // Physical storage underlying all
// virtual storages known to this
// object
/// Physical storage underlying all virtual storages known to this object
mqbi::Storage* d_storage_p;

/// Map of appKey to corresponding virtual storage
VirtualStorages d_virtualStorages;
// Map of appKey to corresponding
// virtual storage

/// Available ordinal values for virtual storages.
AvailableOrdinals d_availableOrdinals;
// available ordinal values for Virtual Storages.

/// Monotonically increasing value to generate new ordinal.
Ordinal d_nextOrdinal;
// Monotonically increasing value to generate new ordinal.

/// The DataStream tracking all Apps states.
VirtualStorage::DataStream d_dataStream;
// The DataStream tracking all Apps states.

/// Cumulative count of all bytes.
bsls::Types::Int64 d_totalBytes;
// Cumulative count of all bytes.

/// Cumulative count of all messages.
bsls::Types::Int64 d_numMessages;
// Cumulative count of all messages.

/// The default App state
mqbi::AppMessage d_defaultAppMessage;
// The default App state

/// This could be null if a local or remote
/// queue instance has not been created.
mqbi::Queue* d_queue_p;
// This could be null if a local or remote
// queue instance has not been created.

bslma::Allocator* d_allocator_p; // Allocator to use
/// Optional queue stats to update. Held, not owned
mqbstat::QueueStatsDomain* d_stats_p;

/// Allocator to use
bslma::Allocator* d_allocator_p;

private:
// NOT IMPLEMENTED
Expand All @@ -161,6 +163,11 @@ class VirtualStorageCatalog {
~VirtualStorageCatalog();

// MANIPULATORS
/// Use the specified `stats` to report metrics on message add, delete or
/// storage purge. The provided NULL value is also valid and means that no
/// stats updates required. Typically `stats` should be set on PRIMARY,
/// and unset in REPLICA and PROXY.
void setQueueStats(mqbstat::QueueStatsDomain* stats);

/// If the specified 'where' is unset, return reference to the beginning of
/// the DataStream. Otherwise, return reference to the corresponding item
Expand Down
46 changes: 42 additions & 4 deletions src/groups/mqb/mqbstat/mqbstat_jsonprinter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,51 @@ struct ConversionUtils {

populateMetric(&values, ctx, Stat::e_NB_PRODUCER);
populateMetric(&values, ctx, Stat::e_NB_CONSUMER);

populateMetric(&values, ctx, Stat::e_MESSAGES_CURRENT);
populateMetric(&values, ctx, Stat::e_MESSAGES_MAX);
populateMetric(&values, ctx, Stat::e_BYTES_CURRENT);
populateMetric(&values, ctx, Stat::e_BYTES_MAX);

populateMetric(&values, ctx, Stat::e_PUT_MESSAGES_DELTA);
populateMetric(&values, ctx, Stat::e_PUT_BYTES_DELTA);
populateMetric(&values, ctx, Stat::e_PUT_MESSAGES_ABS);
populateMetric(&values, ctx, Stat::e_PUT_BYTES_ABS);

populateMetric(&values, ctx, Stat::e_PUSH_MESSAGES_DELTA);
populateMetric(&values, ctx, Stat::e_PUSH_BYTES_DELTA);
populateMetric(&values, ctx, Stat::e_PUSH_MESSAGES_ABS);
populateMetric(&values, ctx, Stat::e_PUSH_BYTES_ABS);

populateMetric(&values, ctx, Stat::e_ACK_DELTA);
populateMetric(&values, ctx, Stat::e_ACK_ABS);
populateMetric(&values, ctx, Stat::e_ACK_TIME_AVG);
populateMetric(&values, ctx, Stat::e_ACK_TIME_MAX);

populateMetric(&values, ctx, Stat::e_NACK_DELTA);
populateMetric(&values, ctx, Stat::e_NACK_ABS);

populateMetric(&values, ctx, Stat::e_CONFIRM_DELTA);
populateMetric(&values, ctx, Stat::e_CONFIRM_ABS);
populateMetric(&values, ctx, Stat::e_CONFIRM_TIME_AVG);
populateMetric(&values, ctx, Stat::e_CONFIRM_TIME_MAX);

populateMetric(&values, ctx, Stat::e_REJECT_ABS);
populateMetric(&values, ctx, Stat::e_REJECT_DELTA);

populateMetric(&values, ctx, Stat::e_QUEUE_TIME_AVG);
populateMetric(&values, ctx, Stat::e_QUEUE_TIME_MAX);

populateMetric(&values, ctx, Stat::e_GC_MSGS_DELTA);
populateMetric(&values, ctx, Stat::e_GC_MSGS_ABS);

populateMetric(&values, ctx, Stat::e_ROLE);

populateMetric(&values, ctx, Stat::e_CFG_MSGS);
populateMetric(&values, ctx, Stat::e_CFG_BYTES);

populateMetric(&values, ctx, Stat::e_NO_SC_MSGS_DELTA);
populateMetric(&values, ctx, Stat::e_NO_SC_MSGS_ABS);
}

inline static void populateOneDomainStats(bdljsn::JsonObject* domainObject,
Expand Down Expand Up @@ -196,10 +230,14 @@ class JsonPrinter::JsonPrinterImpl {
inline JsonPrinter::JsonPrinterImpl::JsonPrinterImpl(
const StatContextsMap& statContextsMap,
bslma::Allocator* allocator)
: d_opsCompact(bdljsn::WriteOptions().setSpacesPerLevel(0).setStyle(
bdljsn::WriteStyle::e_COMPACT))
, d_opsPretty(bdljsn::WriteOptions().setSpacesPerLevel(4).setStyle(
bdljsn::WriteStyle::e_PRETTY))
: d_opsCompact(bdljsn::WriteOptions()
.setSpacesPerLevel(0)
.setStyle(bdljsn::WriteStyle::e_COMPACT)
.setSortMembers(true))
, d_opsPretty(bdljsn::WriteOptions()
.setSpacesPerLevel(4)
.setStyle(bdljsn::WriteStyle::e_PRETTY)
.setSortMembers(true))
, d_contexts(statContextsMap, allocator)
{
// NOTHING
Expand Down
34 changes: 30 additions & 4 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,14 @@ void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value)
case EventType::e_ADD_MESSAGE: {
d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_BYTES, value);
d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1);
if (!d_subContextsHolder.empty()) {
bsl::list<StatSubContextMp>::iterator it = d_subContextsHolder.begin();
while (it != d_subContextsHolder.end()) {
it->get()->adjustValue(DomainQueueStats::e_STAT_BYTES, value);
it->get()->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1);
++it;
}
}
} break;
case EventType::e_DEL_MESSAGE: {
d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_BYTES, -value);
Expand All @@ -557,6 +565,14 @@ void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value)
// the stat to get rates
d_statContext_mp->setValue(DomainQueueStats::e_STAT_BYTES, 0);
d_statContext_mp->setValue(DomainQueueStats::e_STAT_MESSAGES, 0);
if (!d_subContextsHolder.empty()) {
bsl::list<StatSubContextMp>::iterator it = d_subContextsHolder.begin();
while (it != d_subContextsHolder.end()) {
it->get()->setValue(DomainQueueStats::e_STAT_BYTES, 0);
it->get()->setValue(DomainQueueStats::e_STAT_MESSAGES, 0);
++it;
}
}
} break;
case EventType::e_CHANGE_ROLE: {
d_statContext_mp->setValue(DomainQueueStats::e_STAT_ROLE, value);
Expand Down Expand Up @@ -613,10 +629,23 @@ void QueueStatsDomain::onEvent(EventType::Enum type,
appIdContext->reportValue(DomainQueueStats::e_STAT_CONFIRM_TIME,
value);
} break;

case EventType::e_QUEUE_TIME: {
appIdContext->reportValue(DomainQueueStats::e_STAT_QUEUE_TIME, value);
} break;
case EventType::e_ADD_MESSAGE: {
appIdContext->adjustValue(DomainQueueStats::e_STAT_BYTES, value);
appIdContext->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1);
} break;
case EventType::e_DEL_MESSAGE: {
appIdContext->adjustValue(DomainQueueStats::e_STAT_BYTES, -value);
appIdContext->adjustValue(DomainQueueStats::e_STAT_MESSAGES, -1);
} break;
case EventType::e_PURGE: {
// NOTE: Setting the value like that will cause weird results if using
// the stat to get rates
appIdContext->setValue(DomainQueueStats::e_STAT_BYTES, 0);
appIdContext->setValue(DomainQueueStats::e_STAT_MESSAGES, 0);
} break;

// Some of these event types make no sense per appId and should be reported
// per entire queue instead
Expand All @@ -627,10 +656,7 @@ void QueueStatsDomain::onEvent(EventType::Enum type,
case EventType::e_REJECT: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_PUSH: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_PUT: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_ADD_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_DEL_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_GC_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_PURGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CHANGE_ROLE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CFG_MSGS: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CFG_BYTES: BSLS_ANNOTATION_FALLTHROUGH;
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
// 'mqbstat::QueueStatsUtil' is a utility namespace exposing methods to
// initialize the stat contexts and associated objects.

// MQB

// BMQ
#include <bmqt_uri.h>

Expand Down
2 changes: 2 additions & 0 deletions src/integration-tests/test_admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ def test_queue_stats(single_node: Cluster) -> None:

expect_same_structure(queue_stats, dt.TEST_QUEUE_STATS_AFTER_CONFIRM)

# also check purge

consumer_foo.close(f"{task.uri}?id=foo")
consumer_bar.close(f"{task.uri}?id=bar")
consumer_baz.close(f"{task.uri}?id=baz")
Expand Down
Loading

0 comments on commit 376d7b0

Please sign in to comment.