Skip to content

Commit

Permalink
Merge branch 'main' into add_sanitizers_to_ci
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-e1off authored Jul 15, 2024
2 parents 673e020 + a145cd1 commit 9bdc208
Show file tree
Hide file tree
Showing 30 changed files with 1,295 additions and 549 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/formatting-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ jobs:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: black style check
run: |
sudo apt-get install black
pip3 install git+https://github.com/psf/black
black .
git diff -q | tee format_diff.txt
if [ -s format_diff.txt ]; then exit 1; fi
4 changes: 3 additions & 1 deletion src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,9 @@ int Application::processCommand(const bslstl::StringRef& source,
}
else if (command.isStatValue()) {
mqbcmd::StatResult statResult;
d_statController_mp->processCommand(&statResult, command.stat());
d_statController_mp->processCommand(&statResult,
command.stat(),
commandWithOptions.encoding());
if (statResult.isErrorValue()) {
cmdResult.makeError(statResult.error());
}
Expand Down
45 changes: 35 additions & 10 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2816,16 +2816,41 @@ void ClientSession::initiateShutdown(const ShutdownCb& callback,

BALL_LOG_INFO << description() << ": initiateShutdown";

dispatcher()->execute(
bdlf::BindUtil::bind(
bdlf::MemFnUtil::memFn(&ClientSession::initiateShutdownDispatched,
d_self.acquire()),
callback,
timeout),
this,
mqbi::DispatcherEventType::e_DISPATCHER);
// Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling
// 'd_flushList'
// The 'd_self.acquire()' return 'shared_ptr<ClientSession>' but that does
// not relate to the 'shared_ptr<mqbnet::Session>' acquired by
// 'mqbnet::TransportManagerIterator'. The latter is bound to the
// 'initiateShutdown'. The former can be null after 'd_self.invalidate()'
// call. ('invalidate()' waits for all _acquired_ 'shared_ptr' references
// to drop).
//
// We have a choice, either 1) bind the latter to 'initiateShutdown' to
// make sure 'd_self.acquire()' returns not null, or 2) invoke the
// 'callback' earlier if fail to 'acquire()' because of 'invalidate()', or
// 3) bind _acquired_ 'shared_ptr' to 'initiateShutdown'.
//
// Choosing 2), assuming that calling the (completion) callback from a
// thread other than the *CLIENT* dispatcher thread is ok. The
// 'mwcu::OperationChainLink' expects the completion callback from multiple
// sessions anyway.

bsl::shared_ptr<ClientSession> ptr = d_self.acquire();

if (!ptr) {
callback();
}
else {
dispatcher()->execute(
bdlf::BindUtil::bind(
bdlf::MemFnUtil::memFn(
&ClientSession::initiateShutdownDispatched,
d_self.acquire()),
callback,
timeout),
this,
mqbi::DispatcherEventType::e_DISPATCHER);
// Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling
// 'd_flushList'
}
}

void ClientSession::invalidate()
Expand Down
20 changes: 14 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2633,12 +2633,20 @@ void ClusterQueueHelper::notifyQueue(QueueContext* queueContext,
}

if (isOpen) {
queue->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream,
queue,
generationCount,
upstreamSubQueueId),
queue);
if (generationCount == 0) {
BALL_LOG_INFO << d_cluster_p->description()
<< ": has deconfigured queue ["
<< queueContext->uri() << "], subStream id ["
<< upstreamSubQueueId << "]";
}
else {
queue->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream,
queue,
generationCount,
upstreamSubQueueId),
queue);
}
}
else {
queue->dispatcher()->execute(
Expand Down
270 changes: 270 additions & 0 deletions src/groups/mqb/mqbstat/mqbstat_jsonprinter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
// Copyright 2024 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// mqbstat_jsonprinter.cpp -*-C++-*-
#include <mqbstat_jsonprinter.h>

#include <mqbscm_version.h>

// MQB
#include <mqbstat_queuestats.h>

// MWC
#include <mwcu_memoutstream.h>

// BDE
#include <ball_log.h>
#include <bdljsn_json.h>
#include <bdljsn_jsonutil.h>
#include <bsls_assert.h>

namespace BloombergLP {
namespace mqbstat {

namespace {

// ---------------------
// class JsonPrinterImpl
// ---------------------

/// The implementation class for JsonPrinter, containing all the cached options
/// for printing statistics as JSON. This implementation exists and is hidden
/// from the package include for the following reasons:
/// - Don't want to expose `bdljsn` names and symbols to the outer scope.
/// - Member fields and functions defined for this implementation are used only
/// locally, so there is no reason to make it visible.
class JsonPrinterImpl {
private:
// CLASS-SCOPE CATEGORY
BALL_LOG_SET_CLASS_CATEGORY("MQBSTAT.JSONPRINTERIMPL");

private:
// PRIVATE TYPES
typedef JsonPrinter::StatContextsMap StatContextsMap;

private:
// DATA
/// Options for printing a compact JSON
const bdljsn::WriteOptions d_opsCompact;

/// Options for printing a pretty JSON
const bdljsn::WriteOptions d_opsPretty;

/// StatContext-s map
const StatContextsMap d_contexts;

private:
// NOT IMPLEMENTED
JsonPrinterImpl(const JsonPrinterImpl& other) BSLS_CPP11_DELETED;
JsonPrinterImpl&
operator=(const JsonPrinterImpl& other) BSLS_CPP11_DELETED;

// ACCESSORS

/// "domainQueues" stat context:
/// Populate the specified `bdljsn::JsonObject*` with the values
/// from the specified `ctx`.
static void populateAllDomainsStats(bdljsn::JsonObject* parent,
const mwcst::StatContext& ctx);
static void populateOneDomainStats(bdljsn::JsonObject* domainObject,
const mwcst::StatContext& ctx);
static void populateQueueStats(bdljsn::JsonObject* queueObject,
const mwcst::StatContext& ctx);
static void populateMetric(bdljsn::JsonObject* metricsObject,
const mwcst::StatContext& ctx,
mqbstat::QueueStatsDomain::Stat::Enum metric);

public:
// CREATORS

/// Create a new `JsonPrinterImpl` object, using the specified
/// `statContextsMap` and the specified `allocator`.
explicit JsonPrinterImpl(const StatContextsMap& statContextsMap,
bslma::Allocator* allocator);

// ACCESSORS

/// Print the JSON-encoded stats to the specified `out`.
/// If the specified `compact` flag is `true`, the JSON is printed in a
/// compact form, otherwise the JSON is printed in a pretty form.
/// Return `0` on success, and non-zero return code on failure.
///
/// THREAD: This method is called in the *StatController scheduler* thread.
int printStats(bsl::string* out, bool compact) const;
};

inline JsonPrinterImpl::JsonPrinterImpl(const StatContextsMap& statContextsMap,
bslma::Allocator* allocator)
: d_opsCompact(bdljsn::WriteOptions().setSpacesPerLevel(0).setStyle(
bdljsn::WriteStyle::e_COMPACT))
, d_opsPretty(bdljsn::WriteOptions().setSpacesPerLevel(4).setStyle(
bdljsn::WriteStyle::e_PRETTY))
, d_contexts(statContextsMap, allocator)
{
// NOTHING
}

inline void
JsonPrinterImpl::populateMetric(bdljsn::JsonObject* metricsObject,
const mwcst::StatContext& ctx,
mqbstat::QueueStatsDomain::Stat::Enum metric)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(metricsObject);

const bsls::Types::Int64 value =
mqbstat::QueueStatsDomain::getValue(ctx, -1, metric);

(*metricsObject)[mqbstat::QueueStatsDomain::Stat::toString(metric)]
.makeNumber() = value;
}

inline void
JsonPrinterImpl::populateQueueStats(bdljsn::JsonObject* queueObject,
const mwcst::StatContext& ctx)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(queueObject);

if (ctx.numValues() == 0) {
// Prefer to omit an empty "values" object
return; // RETURN
}

bdljsn::JsonObject& values = (*queueObject)["values"].makeObject();

typedef mqbstat::QueueStatsDomain::Stat Stat;

populateMetric(&values, ctx, Stat::e_NB_PRODUCER);
populateMetric(&values, ctx, Stat::e_NB_CONSUMER);
populateMetric(&values, ctx, Stat::e_PUT_MESSAGES_DELTA);
populateMetric(&values, ctx, Stat::e_PUT_BYTES_DELTA);
populateMetric(&values, ctx, Stat::e_PUSH_MESSAGES_DELTA);
populateMetric(&values, ctx, Stat::e_PUSH_BYTES_DELTA);
populateMetric(&values, ctx, Stat::e_ACK_DELTA);
populateMetric(&values, ctx, Stat::e_ACK_TIME_AVG);
populateMetric(&values, ctx, Stat::e_ACK_TIME_MAX);
populateMetric(&values, ctx, Stat::e_NACK_DELTA);
populateMetric(&values, ctx, Stat::e_CONFIRM_DELTA);
populateMetric(&values, ctx, Stat::e_CONFIRM_TIME_AVG);
populateMetric(&values, ctx, Stat::e_CONFIRM_TIME_MAX);
}

inline void
JsonPrinterImpl::populateOneDomainStats(bdljsn::JsonObject* domainObject,
const mwcst::StatContext& ctx)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(domainObject);

for (mwcst::StatContextIterator queueIt = ctx.subcontextIterator();
queueIt;
++queueIt) {
bdljsn::JsonObject& queueObj =
(*domainObject)[queueIt->name()].makeObject();
populateQueueStats(&queueObj, *queueIt);

if (queueIt->numSubcontexts() > 0) {
bdljsn::JsonObject& appIdsObject = queueObj["appIds"].makeObject();

// Add metrics per appId, if any
for (mwcst::StatContextIterator appIdIt =
queueIt->subcontextIterator();
appIdIt;
++appIdIt) {
// Do not expect another nested StatContext within appId
BSLS_ASSERT_SAFE(0 == appIdIt->numSubcontexts());

populateQueueStats(&appIdsObject[appIdIt->name()].makeObject(),
*appIdIt);
}
}
}
}

inline void
JsonPrinterImpl::populateAllDomainsStats(bdljsn::JsonObject* parent,
const mwcst::StatContext& ctx)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(parent);

bdljsn::JsonObject& nodes = (*parent)["domains"].makeObject();
for (mwcst::StatContextIterator domainIt = ctx.subcontextIterator();
domainIt;
++domainIt) {
populateOneDomainStats(&nodes[domainIt->name()].makeObject(),
*domainIt);
}
}

inline int JsonPrinterImpl::printStats(bsl::string* out, bool compact) const
{
// executed by *StatController scheduler* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(out);

bdljsn::Json json;
bdljsn::JsonObject& obj = json.makeObject();

{
const mwcst::StatContext& ctx =
*d_contexts.find("domainQueues")->second;
bdljsn::JsonObject& domainQueuesObj = obj["domainQueues"].makeObject();

populateAllDomainsStats(&domainQueuesObj, ctx);
}

const bdljsn::WriteOptions& ops = compact ? d_opsCompact : d_opsPretty;

mwcu::MemOutStream os;
const int rc = bdljsn::JsonUtil::write(os, json, ops);
if (0 != rc) {
BALL_LOG_ERROR << "Failed to encode stats JSON, rc = " << rc;
return rc; // RETURN
}
(*out) = os.str();
return 0;
}

} // close unnamed namespace

// -----------------
// class JsonPrinter
// -----------------

JsonPrinter::JsonPrinter(const StatContextsMap& statContextsMap,
bslma::Allocator* allocator)
{
bslma::Allocator* alloc = bslma::Default::allocator(allocator);

d_impl_mp.load(new (*alloc) JsonPrinterImpl(statContextsMap, alloc),
alloc);
}

int JsonPrinter::printStats(bsl::string* out, bool compact) const
{
// executed by *StatController scheduler* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(out);
BSLS_ASSERT_SAFE(d_impl_mp);

return d_impl_mp->printStats(out, compact);
}

} // close package namespace
} // close enterprise namespace
Loading

0 comments on commit 9bdc208

Please sign in to comment.