From 82c3a71355fd77f7d1bafb7d8e02327de450c1cb Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Tue, 2 Jul 2024 12:24:53 -0400 Subject: [PATCH 1/5] mqb: fix ClientSession::initiateShutdown crash (#349) Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/mqb/mqba/mqba_clientsession.cpp | 45 +++++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index 6a5024caf..d6d8d29f5 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -2816,16 +2816,41 @@ void ClientSession::initiateShutdown(const ShutdownCb& callback, BALL_LOG_INFO << description() << ": initiateShutdown"; - dispatcher()->execute( - bdlf::BindUtil::bind( - bdlf::MemFnUtil::memFn(&ClientSession::initiateShutdownDispatched, - d_self.acquire()), - callback, - timeout), - this, - mqbi::DispatcherEventType::e_DISPATCHER); - // Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling - // 'd_flushList' + // The 'd_self.acquire()' return 'shared_ptr' but that does + // not relate to the 'shared_ptr' acquired by + // 'mqbnet::TransportManagerIterator'. The latter is bound to the + // 'initiateShutdown'. The former can be null after 'd_self.invalidate()' + // call. ('invalidate()' waits for all _acquired_ 'shared_ptr' references + // to drop). + // + // We have a choice, either 1) bind the latter to 'initiateShutdown' to + // make sure 'd_self.acquire()' returns not null, or 2) invoke the + // 'callback' earlier if fail to 'acquire()' because of 'invalidate()', or + // 3) bind _acquired_ 'shared_ptr' to 'initiateShutdown'. + // + // Choosing 2), assuming that calling the (completion) callback from a + // thread other than the *CLIENT* dispatcher thread is ok. The + // 'mwcu::OperationChainLink' expects the completion callback from multiple + // sessions anyway. + + bsl::shared_ptr ptr = d_self.acquire(); + + if (!ptr) { + callback(); + } + else { + dispatcher()->execute( + bdlf::BindUtil::bind( + bdlf::MemFnUtil::memFn( + &ClientSession::initiateShutdownDispatched, + d_self.acquire()), + callback, + timeout), + this, + mqbi::DispatcherEventType::e_DISPATCHER); + // Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling + // 'd_flushList' + } } void ClientSession::invalidate() From c71d6c598dc386a5886f16abe03600a9d3068fdc Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Tue, 2 Jul 2024 12:25:12 -0400 Subject: [PATCH 2/5] wait for unconfirmed before buffering confirms (#336) Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 20 +++++++++++++------ .../test_puts_retransmission.py | 14 +++++++++++-- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 8640c21f3..9d3999b7e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -2633,12 +2633,20 @@ void ClusterQueueHelper::notifyQueue(QueueContext* queueContext, } if (isOpen) { - queue->dispatcher()->execute( - bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream, - queue, - generationCount, - upstreamSubQueueId), - queue); + if (generationCount == 0) { + BALL_LOG_INFO << d_cluster_p->description() + << ": has deconfigured queue [" + << queueContext->uri() << "], subStream id [" + << upstreamSubQueueId << "]"; + } + else { + queue->dispatcher()->execute( + bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream, + queue, + generationCount, + upstreamSubQueueId), + queue); + } } else { queue->dispatcher()->execute( diff --git a/src/integration-tests/test_puts_retransmission.py b/src/integration-tests/test_puts_retransmission.py index 829d9860b..b27b08bd1 100644 --- a/src/integration-tests/test_puts_retransmission.py +++ b/src/integration-tests/test_puts_retransmission.py @@ -508,7 +508,12 @@ def test_shutdown_primary_keep_replica(self, multi_node: Cluster): # If shutting down primary, the replica needs to wait for new primary. self.active_node.wait_status(wait_leader=True, wait_ready=False) - self.inspect_results(allow_duplicates=False) + # Do allow duplicates for the scenario when a CONFIRM had passed Proxy + # but did not reach the replication. New Primary then redelivers and + # the Proxy cannot detect the duplicate because it had removed the GUID + # upon the first CONFIRM + + self.inspect_results(allow_duplicates=True) def test_shutdown_replica(self, multi_node: Cluster): self.setup_cluster_fanout(multi_node) @@ -521,7 +526,12 @@ def test_shutdown_replica(self, multi_node: Cluster): # Because the quorum is 3, cluster is still healthy after shutting down # replica. - self.inspect_results(allow_duplicates=False) + # Do allow duplicates for the scenario when a CONFIRM had passed Proxy + # but did not reach the replication. New Primary then redelivers and + # the Proxy cannot detect the duplicate because it had removed the GUID + # upon the first CONFIRM + + self.inspect_results(allow_duplicates=True) def test_kill_primary_convert_replica(self, multi_node: Cluster): self.setup_cluster_fanout(multi_node) From 346b9c305e74f56841eee93c3562dfae91089b91 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Fri, 5 Jul 2024 20:00:06 +0300 Subject: [PATCH 3/5] MQB: structured stats support, out of order stats snapshot (#335) * MQB: structured stats support, out of order stats snapshot Signed-off-by: Evgeny Malygin --- src/groups/mqb/mqba/mqba_application.cpp | 4 +- .../mqb/mqbstat/mqbstat_jsonprinter.cpp | 270 ++++++++++++++++++ src/groups/mqb/mqbstat/mqbstat_jsonprinter.h | 88 ++++++ src/groups/mqb/mqbstat/mqbstat_printer.cpp | 1 - src/groups/mqb/mqbstat/mqbstat_printer.h | 9 +- src/groups/mqb/mqbstat/mqbstat_queuestats.cpp | 70 ++++- src/groups/mqb/mqbstat/mqbstat_queuestats.h | 7 +- .../mqb/mqbstat/mqbstat_statcontroller.cpp | 145 +++++++--- .../mqb/mqbstat/mqbstat_statcontroller.h | 103 ++++--- src/groups/mqb/mqbstat/package/mqbstat.mem | 1 + src/groups/mwc/mwcst/mwcst_statvalue.h | 2 +- src/integration-tests/test_admin_client.py | 128 ++++++++- src/integration-tests/test_appids.py | 2 +- src/integration-tests/test_breathing.py | 2 +- .../test_graceful_shutdown.py | 4 +- src/integration-tests/test_queue_close.py | 6 +- src/integration-tests/test_queue_reopen.py | 2 +- src/integration-tests/test_subscriptions.py | 4 +- .../blazingmq/dev/configurator/__init__.py | 1 + src/python/blazingmq/dev/it/data/README.md | 5 + src/python/blazingmq/dev/it/data/__init__.py | 14 + .../blazingmq/dev/it/data/data_metrics.py | 200 +++++++++++++ src/python/blazingmq/dev/it/process/client.py | 9 +- .../blazingmq/dev/it/tweaks/generated.py | 6 + src/python/blazingmq/schemas/mqbcfg.py | 10 + 25 files changed, 988 insertions(+), 105 deletions(-) create mode 100644 src/groups/mqb/mqbstat/mqbstat_jsonprinter.cpp create mode 100644 src/groups/mqb/mqbstat/mqbstat_jsonprinter.h create mode 100644 src/python/blazingmq/dev/it/data/README.md create mode 100644 src/python/blazingmq/dev/it/data/__init__.py create mode 100644 src/python/blazingmq/dev/it/data/data_metrics.py diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index 120867fa9..3b92ef9a3 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -551,7 +551,9 @@ int Application::processCommand(const bslstl::StringRef& source, } else if (command.isStatValue()) { mqbcmd::StatResult statResult; - d_statController_mp->processCommand(&statResult, command.stat()); + d_statController_mp->processCommand(&statResult, + command.stat(), + commandWithOptions.encoding()); if (statResult.isErrorValue()) { cmdResult.makeError(statResult.error()); } diff --git a/src/groups/mqb/mqbstat/mqbstat_jsonprinter.cpp b/src/groups/mqb/mqbstat/mqbstat_jsonprinter.cpp new file mode 100644 index 000000000..717e1cebe --- /dev/null +++ b/src/groups/mqb/mqbstat/mqbstat_jsonprinter.cpp @@ -0,0 +1,270 @@ +// Copyright 2024 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// mqbstat_jsonprinter.cpp -*-C++-*- +#include + +#include + +// MQB +#include + +// MWC +#include + +// BDE +#include +#include +#include +#include + +namespace BloombergLP { +namespace mqbstat { + +namespace { + +// --------------------- +// class JsonPrinterImpl +// --------------------- + +/// The implementation class for JsonPrinter, containing all the cached options +/// for printing statistics as JSON. This implementation exists and is hidden +/// from the package include for the following reasons: +/// - Don't want to expose `bdljsn` names and symbols to the outer scope. +/// - Member fields and functions defined for this implementation are used only +/// locally, so there is no reason to make it visible. +class JsonPrinterImpl { + private: + // CLASS-SCOPE CATEGORY + BALL_LOG_SET_CLASS_CATEGORY("MQBSTAT.JSONPRINTERIMPL"); + + private: + // PRIVATE TYPES + typedef JsonPrinter::StatContextsMap StatContextsMap; + + private: + // DATA + /// Options for printing a compact JSON + const bdljsn::WriteOptions d_opsCompact; + + /// Options for printing a pretty JSON + const bdljsn::WriteOptions d_opsPretty; + + /// StatContext-s map + const StatContextsMap d_contexts; + + private: + // NOT IMPLEMENTED + JsonPrinterImpl(const JsonPrinterImpl& other) BSLS_CPP11_DELETED; + JsonPrinterImpl& + operator=(const JsonPrinterImpl& other) BSLS_CPP11_DELETED; + + // ACCESSORS + + /// "domainQueues" stat context: + /// Populate the specified `bdljsn::JsonObject*` with the values + /// from the specified `ctx`. + static void populateAllDomainsStats(bdljsn::JsonObject* parent, + const mwcst::StatContext& ctx); + static void populateOneDomainStats(bdljsn::JsonObject* domainObject, + const mwcst::StatContext& ctx); + static void populateQueueStats(bdljsn::JsonObject* queueObject, + const mwcst::StatContext& ctx); + static void populateMetric(bdljsn::JsonObject* metricsObject, + const mwcst::StatContext& ctx, + mqbstat::QueueStatsDomain::Stat::Enum metric); + + public: + // CREATORS + + /// Create a new `JsonPrinterImpl` object, using the specified + /// `statContextsMap` and the specified `allocator`. + explicit JsonPrinterImpl(const StatContextsMap& statContextsMap, + bslma::Allocator* allocator); + + // ACCESSORS + + /// Print the JSON-encoded stats to the specified `out`. + /// If the specified `compact` flag is `true`, the JSON is printed in a + /// compact form, otherwise the JSON is printed in a pretty form. + /// Return `0` on success, and non-zero return code on failure. + /// + /// THREAD: This method is called in the *StatController scheduler* thread. + int printStats(bsl::string* out, bool compact) const; +}; + +inline JsonPrinterImpl::JsonPrinterImpl(const StatContextsMap& statContextsMap, + bslma::Allocator* allocator) +: d_opsCompact(bdljsn::WriteOptions().setSpacesPerLevel(0).setStyle( + bdljsn::WriteStyle::e_COMPACT)) +, d_opsPretty(bdljsn::WriteOptions().setSpacesPerLevel(4).setStyle( + bdljsn::WriteStyle::e_PRETTY)) +, d_contexts(statContextsMap, allocator) +{ + // NOTHING +} + +inline void +JsonPrinterImpl::populateMetric(bdljsn::JsonObject* metricsObject, + const mwcst::StatContext& ctx, + mqbstat::QueueStatsDomain::Stat::Enum metric) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(metricsObject); + + const bsls::Types::Int64 value = + mqbstat::QueueStatsDomain::getValue(ctx, -1, metric); + + (*metricsObject)[mqbstat::QueueStatsDomain::Stat::toString(metric)] + .makeNumber() = value; +} + +inline void +JsonPrinterImpl::populateQueueStats(bdljsn::JsonObject* queueObject, + const mwcst::StatContext& ctx) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(queueObject); + + if (ctx.numValues() == 0) { + // Prefer to omit an empty "values" object + return; // RETURN + } + + bdljsn::JsonObject& values = (*queueObject)["values"].makeObject(); + + typedef mqbstat::QueueStatsDomain::Stat Stat; + + populateMetric(&values, ctx, Stat::e_NB_PRODUCER); + populateMetric(&values, ctx, Stat::e_NB_CONSUMER); + populateMetric(&values, ctx, Stat::e_PUT_MESSAGES_DELTA); + populateMetric(&values, ctx, Stat::e_PUT_BYTES_DELTA); + populateMetric(&values, ctx, Stat::e_PUSH_MESSAGES_DELTA); + populateMetric(&values, ctx, Stat::e_PUSH_BYTES_DELTA); + populateMetric(&values, ctx, Stat::e_ACK_DELTA); + populateMetric(&values, ctx, Stat::e_ACK_TIME_AVG); + populateMetric(&values, ctx, Stat::e_ACK_TIME_MAX); + populateMetric(&values, ctx, Stat::e_NACK_DELTA); + populateMetric(&values, ctx, Stat::e_CONFIRM_DELTA); + populateMetric(&values, ctx, Stat::e_CONFIRM_TIME_AVG); + populateMetric(&values, ctx, Stat::e_CONFIRM_TIME_MAX); +} + +inline void +JsonPrinterImpl::populateOneDomainStats(bdljsn::JsonObject* domainObject, + const mwcst::StatContext& ctx) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(domainObject); + + for (mwcst::StatContextIterator queueIt = ctx.subcontextIterator(); + queueIt; + ++queueIt) { + bdljsn::JsonObject& queueObj = + (*domainObject)[queueIt->name()].makeObject(); + populateQueueStats(&queueObj, *queueIt); + + if (queueIt->numSubcontexts() > 0) { + bdljsn::JsonObject& appIdsObject = queueObj["appIds"].makeObject(); + + // Add metrics per appId, if any + for (mwcst::StatContextIterator appIdIt = + queueIt->subcontextIterator(); + appIdIt; + ++appIdIt) { + // Do not expect another nested StatContext within appId + BSLS_ASSERT_SAFE(0 == appIdIt->numSubcontexts()); + + populateQueueStats(&appIdsObject[appIdIt->name()].makeObject(), + *appIdIt); + } + } + } +} + +inline void +JsonPrinterImpl::populateAllDomainsStats(bdljsn::JsonObject* parent, + const mwcst::StatContext& ctx) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(parent); + + bdljsn::JsonObject& nodes = (*parent)["domains"].makeObject(); + for (mwcst::StatContextIterator domainIt = ctx.subcontextIterator(); + domainIt; + ++domainIt) { + populateOneDomainStats(&nodes[domainIt->name()].makeObject(), + *domainIt); + } +} + +inline int JsonPrinterImpl::printStats(bsl::string* out, bool compact) const +{ + // executed by *StatController scheduler* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE(out); + + bdljsn::Json json; + bdljsn::JsonObject& obj = json.makeObject(); + + { + const mwcst::StatContext& ctx = + *d_contexts.find("domainQueues")->second; + bdljsn::JsonObject& domainQueuesObj = obj["domainQueues"].makeObject(); + + populateAllDomainsStats(&domainQueuesObj, ctx); + } + + const bdljsn::WriteOptions& ops = compact ? d_opsCompact : d_opsPretty; + + mwcu::MemOutStream os; + const int rc = bdljsn::JsonUtil::write(os, json, ops); + if (0 != rc) { + BALL_LOG_ERROR << "Failed to encode stats JSON, rc = " << rc; + return rc; // RETURN + } + (*out) = os.str(); + return 0; +} + +} // close unnamed namespace + +// ----------------- +// class JsonPrinter +// ----------------- + +JsonPrinter::JsonPrinter(const StatContextsMap& statContextsMap, + bslma::Allocator* allocator) +{ + bslma::Allocator* alloc = bslma::Default::allocator(allocator); + + d_impl_mp.load(new (*alloc) JsonPrinterImpl(statContextsMap, alloc), + alloc); +} + +int JsonPrinter::printStats(bsl::string* out, bool compact) const +{ + // executed by *StatController scheduler* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE(out); + BSLS_ASSERT_SAFE(d_impl_mp); + + return d_impl_mp->printStats(out, compact); +} + +} // close package namespace +} // close enterprise namespace diff --git a/src/groups/mqb/mqbstat/mqbstat_jsonprinter.h b/src/groups/mqb/mqbstat/mqbstat_jsonprinter.h new file mode 100644 index 000000000..0089eee51 --- /dev/null +++ b/src/groups/mqb/mqbstat/mqbstat_jsonprinter.h @@ -0,0 +1,88 @@ +// Copyright 2024 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// mqbstat_jsonprinter.h -*-C++-*- +#ifndef INCLUDED_MQBSTAT_JSONPRINTER +#define INCLUDED_MQBSTAT_JSONPRINTER + +//@PURPOSE: Provide a mechanism to print statistics as a JSON +// +//@CLASSES: +// mqbstat::JsonPrinter: statistics printer to JSON +// +//@DESCRIPTION: 'mqbstat::JsonPrinter' handles the printing of the statistics +// as a compact or pretty JSON. It is responsible solely for printing, so any +// statistics updates (e.g. making a new snapshot of the used StatContexts) +// must be done before calling to this component. + +// MWC +#include + +// BDE +#include +#include +#include + +namespace BloombergLP { + +namespace mqbstat { + +// FORWARD DECLARATIONS +namespace { +class JsonPrinterImpl; +} // close unnamed namespace + +// ================= +// class JsonPrinter +// ================= + +class JsonPrinter { + private: + // DATA + /// Managed pointer to the printer implementation. + bslma::ManagedPtr d_impl_mp; + + private: + // NOT IMPLEMENTED + JsonPrinter(const JsonPrinter& other) BSLS_CPP11_DELETED; + JsonPrinter& operator=(const JsonPrinter& other) BSLS_CPP11_DELETED; + + public: + // PUBLIC TYPES + typedef bsl::unordered_map + StatContextsMap; + + // CREATORS + + /// Create a new `JsonPrinter` object, using the specified + /// `statContextsMap` and the optionally specified `allocator`. + explicit JsonPrinter(const StatContextsMap& statContextsMap, + bslma::Allocator* allocator = 0); + + // ACCESSORS + + /// Print the JSON-encoded stats to the specified `out`. + /// If the specified `compact` flag is `true`, the JSON is printed in + /// compact form, otherwise the JSON is printed in pretty form. + /// Return `0` on success, and non-zero return code on failure. + /// + /// THREAD: This method is called in the *StatController scheduler* thread. + int printStats(bsl::string* out, bool compact) const; +}; + +} // close package namespace +} // close enterprise namespace + +#endif diff --git a/src/groups/mqb/mqbstat/mqbstat_printer.cpp b/src/groups/mqb/mqbstat/mqbstat_printer.cpp index 89bb21517..62fbf33ac 100644 --- a/src/groups/mqb/mqbstat/mqbstat_printer.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_printer.cpp @@ -18,7 +18,6 @@ #include // MQB -#include #include // MWC diff --git a/src/groups/mqb/mqbstat/mqbstat_printer.h b/src/groups/mqb/mqbstat/mqbstat_printer.h index 187c788c1..36f90be58 100644 --- a/src/groups/mqb/mqbstat/mqbstat_printer.h +++ b/src/groups/mqb/mqbstat/mqbstat_printer.h @@ -26,7 +26,6 @@ // It holds the tables and table info providers which can be printed. // MQB - #include // MWC @@ -128,10 +127,10 @@ class Printer { /// Create a new `Printer` object, using the specified `config`, /// `eventScheduler`, `statContextsMap` and the specified `allocator` /// for memory allocation. - Printer(const mqbcfg::StatsConfig& config, - bdlmt::EventScheduler* eventScheduler, - const StatContextsMap& statContextsMap, - bslma::Allocator* allocator); + explicit Printer(const mqbcfg::StatsConfig& config, + bdlmt::EventScheduler* eventScheduler, + const StatContextsMap& statContextsMap, + bslma::Allocator* allocator); // MANIPULATORS diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp index f2491c79b..7350b1808 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp @@ -218,6 +218,61 @@ class ContextNameMatcher { } // close unnamed namespace +// ----------------------------- +// struct QueueStatsDomain::Stat +// ----------------------------- + +const char* QueueStatsDomain::Stat::toString(Stat::Enum value) +{ +#define MQBSTAT_CASE(VAL, DESC) \ + case (VAL): { \ + return (DESC); \ + } break; + + switch (value) { + MQBSTAT_CASE(e_NB_PRODUCER, "queue_producers_count") + MQBSTAT_CASE(e_NB_CONSUMER, "queue_consumers_count") + MQBSTAT_CASE(e_MESSAGES_CURRENT, "queue_msgs_current") + MQBSTAT_CASE(e_MESSAGES_MAX, "queue_content_msgs") + MQBSTAT_CASE(e_BYTES_CURRENT, "queue_bytes_current") + MQBSTAT_CASE(e_BYTES_MAX, "queue_content_bytes") + MQBSTAT_CASE(e_PUT_MESSAGES_DELTA, "queue_put_msgs") + MQBSTAT_CASE(e_PUT_BYTES_DELTA, "queue_put_bytes") + MQBSTAT_CASE(e_PUT_MESSAGES_ABS, "queue_put_msgs_abs") + MQBSTAT_CASE(e_PUT_BYTES_ABS, "queue_put_bytes_abs") + MQBSTAT_CASE(e_PUSH_MESSAGES_DELTA, "queue_push_msgs") + MQBSTAT_CASE(e_PUSH_BYTES_DELTA, "queue_push_bytes") + MQBSTAT_CASE(e_PUSH_MESSAGES_ABS, "queue_push_msgs_abs") + MQBSTAT_CASE(e_PUSH_BYTES_ABS, "queue_push_bytes_abs") + MQBSTAT_CASE(e_ACK_DELTA, "queue_ack_msgs") + MQBSTAT_CASE(e_ACK_ABS, "queue_ack_msgs_abs") + MQBSTAT_CASE(e_ACK_TIME_AVG, "queue_ack_time_avg") + MQBSTAT_CASE(e_ACK_TIME_MAX, "queue_ack_time_max") + MQBSTAT_CASE(e_NACK_DELTA, "queue_nack_msgs") + MQBSTAT_CASE(e_NACK_ABS, "queue_nack_msgs_abs") + MQBSTAT_CASE(e_CONFIRM_DELTA, "queue_confirm_msgs") + MQBSTAT_CASE(e_CONFIRM_ABS, "queue_confirm_msgs_abs") + MQBSTAT_CASE(e_CONFIRM_TIME_AVG, "queue_confirm_time_avg") + MQBSTAT_CASE(e_CONFIRM_TIME_MAX, "queue_confirm_time_max") + MQBSTAT_CASE(e_REJECT_ABS, "queue_reject_msgs_abs") + MQBSTAT_CASE(e_REJECT_DELTA, "queue_reject_msgs") + MQBSTAT_CASE(e_QUEUE_TIME_AVG, "queue_queue_time_avg") + MQBSTAT_CASE(e_QUEUE_TIME_MAX, "queue_queue_time_max") + MQBSTAT_CASE(e_GC_MSGS_DELTA, "queue_gc_msgs") + MQBSTAT_CASE(e_GC_MSGS_ABS, "queue_gc_msgs_abs") + MQBSTAT_CASE(e_ROLE, "queue_role") + MQBSTAT_CASE(e_CFG_MSGS, "queue_cfg_msgs") + MQBSTAT_CASE(e_CFG_BYTES, "queue_cfg_bytes") + MQBSTAT_CASE(e_NO_SC_MSGS_DELTA, "queue_nack_noquorum_msgs") + MQBSTAT_CASE(e_NO_SC_MSGS_ABS, "queue_nack_noquorum_msgs_abs") + } + + BSLS_ASSERT(!"invalid enumerator"); + return 0; + +#undef MQBSTAT_CASE +} + // ---------------------- // class QueueStatsDomain // ---------------------- @@ -229,8 +284,19 @@ QueueStatsDomain::getValue(const mwcst::StatContext& context, { // invoked from the SNAPSHOT thread + // PRECONDITIONS + BSLS_ASSERT_SAFE(snapshotId >= -1); // do not support other negatives yet + const mwcst::StatValue::SnapshotLocation latestSnapshot(0, 0); - const mwcst::StatValue::SnapshotLocation oldestSnapshot(0, snapshotId); + +#define OLDEST_SNAPSHOT(STAT) \ + (mwcst::StatValue::SnapshotLocation( \ + 0, \ + (snapshotId >= 0) \ + ? snapshotId \ + : (context.value(mwcst::StatContext::e_DIRECT_VALUE, (STAT)) \ + .historySize(0) - \ + 1))) #define STAT_SINGLE(OPERATION, STAT) \ mwcst::StatUtil::OPERATION( \ @@ -241,7 +307,7 @@ QueueStatsDomain::getValue(const mwcst::StatContext& context, mwcst::StatUtil::OPERATION( \ context.value(mwcst::StatContext::e_DIRECT_VALUE, STAT), \ latestSnapshot, \ - oldestSnapshot) + OLDEST_SNAPSHOT(STAT)) switch (stat) { case QueueStatsDomain::Stat::e_NB_PRODUCER: { diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.h b/src/groups/mqb/mqbstat/mqbstat_queuestats.h index e3f42a59b..81d281aea 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.h +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.h @@ -138,6 +138,10 @@ class QueueStatsDomain { e_NO_SC_MSGS_DELTA, e_NO_SC_MSGS_ABS }; + + /// Return the non-modifiable string description corresponding to + /// the specified enumeration `value`. + static const char* toString(Stat::Enum value); }; struct Role { @@ -198,7 +202,8 @@ class QueueStatsDomain { /// represented by its associated specified `context` as the difference /// between the latest snapshot-ed value (i.e., `snapshotId == 0`) and /// the value that was recorded at the specified `snapshotId` snapshots - /// ago. + /// ago. The negative `snapshotId == -1` means that the oldest available + /// snapshot should be used, while other negative values are not supported. /// /// THREAD: This method can only be invoked from the `snapshot` thread. static bsls::Types::Int64 getValue(const mwcst::StatContext& context, diff --git a/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp b/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp index adc57183d..aa499e8e9 100644 --- a/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -71,6 +72,13 @@ namespace mqbstat { namespace { +const int k_MAX_INSTANT_MESSAGES = 1; +// Maximum messages logged with throttling in a short period of time. + +const bsls::Types::Int64 k_NS_PER_MESSAGE = 15 * + bdlt::TimeUnitRatio::k_NS_PER_M; +// Time interval between messages logged with throttling. + const char k_PUBLISHINTERVAL_SUFFIX[] = ".PUBLISHINTERVAL"; typedef bsl::unordered_set PluginFactories; @@ -233,26 +241,54 @@ void StatController::initializeStats() false))); } -void StatController::captureStats(mqbcmd::StatResult* result) +void StatController::captureStatsAndSemaphorePost( + mqbcmd::StatResult* result, + bslmt::Semaphore* semaphore, + const mqbcmd::EncodingFormat::Value& encoding) { - // This must execute in the *SCHEDULER* thread. + // executed by the *SCHEDULER* thread if (d_allocatorsStatContext_p) { // When using test allocator, we don't have a stat context d_allocatorsStatContext_p->snapshot(); } - mwcu::MemOutStream os; - d_printer_mp->printStats(os); - result->makeStats(os.str()); -} + switch (encoding) { + case mqbcmd::EncodingFormat::TEXT: { + mwcu::MemOutStream os; + d_printer_mp->printStats(os); + result->makeStats() = os.str(); + } break; // BREAK + + case mqbcmd::EncodingFormat::JSON_COMPACT: BSLS_ANNOTATION_FALLTHROUGH; + case mqbcmd::EncodingFormat::JSON_PRETTY: { + // Make an unscheduled snapshot, but do not notify stats consumers + // since it's not necessary. We typically use this code path to get + // the latests stats during integration tests, and this case we + // neither want to wait until the next scheduled snapshot nor get the + // outdated existing one. + const bool savedNextSnapshot = snapshot(); + if (savedNextSnapshot) { + const bool compact = (encoding == + mqbcmd::EncodingFormat::JSON_COMPACT); + const int rc = d_jsonPrinter_mp->printStats(&result->makeStats(), + compact); + if (0 != rc) { + result->makeError().message() = "Stats print to json failed"; + } + } + else { + result->makeError().message() = + "Cannot save the recent snapshot, trying to make snapshots " + "too often"; + } + } break; // BREAK -void StatController::captureStatsAndSemaphorePost(mqbcmd::StatResult* result, - bslmt::Semaphore* semaphore) -{ - // executed by the *SCHEDULER* thread + default: { + BSLS_ASSERT(!"invalid enumerator"); + } break; // BREAK + } - captureStats(result); semaphore->post(); } @@ -482,24 +518,20 @@ void StatController::listTunables(mqbcmd::StatResult* result, } } -void StatController::snapshot() +bool StatController::snapshot() { // executed by the *SCHEDULER* thread + const bsls::Types::Int64 now = mwcsys::Time::highResolutionTimer(); // Safeguard against too frequent invocation from the scheduler. - const bsls::Types::Int64 now = mwcsys::Time::highResolutionTimer(); - const bsls::Types::Int64 nsDelta = now - d_lastSnapshotTime; - const bsls::Types::Int64 minDelta = - mqbcfg::BrokerConfig::get().stats().snapshotInterval() * - bdlt::TimeUnitRatio::k_NS_PER_S / 2; - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(nsDelta < minDelta)) { - BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - if (d_lastSnapshotLogLimiter.requestPermission()) { - BALL_LOG_INFO << "snapshot invoked too frequently (delta = " - << mwcu::PrintUtil::prettyTimeInterval(nsDelta) - << "), skipping snapshot"; - } - return; // RETURN + if (!d_snapshotThrottle.requestPermission()) { + const bsls::Types::Int64 nsDelta = now - d_lastSnapshotTime; + BALL_LOGTHROTTLE_WARN(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + << "[THROTTLED] Snapshot invoked too frequently (delta = " + << mwcu::PrintUtil::prettyTimeInterval(nsDelta) + << "), skipping snapshot"; + + return false; // RETURN } d_lastSnapshotTime = now; @@ -518,21 +550,36 @@ void StatController::snapshot() // through snapshot d_systemStatMonitor_mp->snapshot(); - // StatConsumers will report all stats - bsl::vector::iterator it = d_statConsumers.begin(); - for (; it != d_statConsumers.end(); ++it) { - (*it)->onSnapshot(); - } - // Printer needs to be notified of every snapshot, but has an internal // action counter to know when it's time to print. Allocator stat context - // only has an history size of 2, so we need to snapshot only once, just + // only has a history size of 2, so we need to snapshot only once, just // before printing. + // TODO: adopt this code for `mqbstat::JsonPrinter` when we report + // allocator stats in json const bool willPrint = d_printer_mp->nextSnapshotWillPrint(); if (d_allocatorsStatContext_p && willPrint) { d_allocatorsStatContext_p->snapshot(); } + return true; +} + +void StatController::snapshotAndNotify() +{ + // executed by the *SCHEDULER* thread + + if (!snapshot()) { + // Trying to make snapshots too often + return; // RETURN + } + + // StatConsumers will report all stats + bsl::vector::iterator it = d_statConsumers.begin(); + for (; it != d_statConsumers.end(); ++it) { + (*it)->onSnapshot(); + } + + const bool willPrint = d_printer_mp->nextSnapshotWillPrint(); d_printer_mp->onSnapshot(); // Finally, perform cleanup of expired stat contexts if we have printed @@ -612,6 +659,7 @@ StatController::StatController(const CommandProcessorFn& commandProcessor, : d_allocators(allocator) , d_scheduler_mp(0) , d_lastSnapshotTime() +, d_snapshotThrottle() , d_allocatorsStatContext_p(allocatorsStatContext) , d_statContextsMap(allocator) , d_statContextChannelsLocal_mp(0) @@ -621,6 +669,7 @@ StatController::StatController(const CommandProcessorFn& commandProcessor, , d_bufferFactory_p(bufferFactory) , d_commandProcessorFn(bsl::allocator_arg, allocator, commandProcessor) , d_printer_mp(0) +, d_jsonPrinter_mp(0) , d_statConsumers(allocator) , d_statConsumerMaxPublishInterval(0) , d_eventScheduler_p(eventScheduler) @@ -630,9 +679,11 @@ StatController::StatController(const CommandProcessorFn& commandProcessor, BSLS_ASSERT_SAFE(eventScheduler->clockType() == bsls::SystemClockType::e_MONOTONIC); - d_lastSnapshotLogLimiter.initialize(1, - 15 * bdlt::TimeUnitRatio::k_NS_PER_M); - // Throttling of one maximum alarm per 15 minutes + // Have to initialize the throttle with default "allow none" parameters to + // avoid possible undefined behaviour according to its usage contract. + // The throttle should be reinitialized during 'start()' with normal + // values. + d_snapshotThrottle.initialize(0, 1); } int StatController::start(bsl::ostream& errorDescription) @@ -651,6 +702,13 @@ int StatController::start(bsl::ostream& errorDescription) return 0; // RETURN } + // Initialize the safeguard throttle against too often snapshots + const int k_MAX_SNAPSHOTS_PER_INTERVAL = 4; + d_snapshotThrottle.initialize(k_MAX_SNAPSHOTS_PER_INTERVAL, + brkrCfg.stats().snapshotInterval() * + bdlt::TimeUnitRatio::k_NS_PER_S / + k_MAX_SNAPSHOTS_PER_INTERVAL); + // Start the scheduler d_scheduler_mp.load(new (*d_allocator_p) bdlmt::TimerEventScheduler( bsls::SystemClockType::e_MONOTONIC, @@ -784,6 +842,11 @@ int StatController::start(bsl::ostream& errorDescription) errorStream.reset(); } + // Create the json printer + d_jsonPrinter_mp.load(new (*d_allocator_p) + JsonPrinter(ctxPtrMap, d_allocator_p), + d_allocator_p); + // Max value for the stat publish interval must be the minimum history size // of all stat contexts. d_statConsumerMaxPublishInterval = @@ -793,7 +856,7 @@ int StatController::start(bsl::ostream& errorDescription) // Start the clock d_scheduler_mp->startClock( bsls::TimeInterval(brkrCfg.stats().snapshotInterval()), - bdlf::BindUtil::bind(&StatController::snapshot, this)); + bdlf::BindUtil::bind(&StatController::snapshotAndNotify, this)); BALL_LOG_INFO << "Starting statistics [SnapshotInterval: " << brkrCfg.stats().snapshotInterval() << ", PrintInterval: " << brkrCfg.stats().printer().printInterval() << "]"; @@ -834,6 +897,7 @@ void StatController::stop() DESTROY_OBJ((*it), it->name()); } DESTROY_OBJ(d_printer_mp, "Printer"); + DESTROY_OBJ(d_jsonPrinter_mp, "JsonPrinter"); DESTROY_OBJ(d_systemStatMonitor_mp, "SystemStatMonitor"); DESTROY_OBJ(d_scheduler_mp, "Scheduler"); @@ -852,8 +916,10 @@ void StatController::loadStatContexts(StatContexts* contexts) } } -int StatController::processCommand(mqbcmd::StatResult* result, - const mqbcmd::StatCommand& command) +int StatController::processCommand( + mqbcmd::StatResult* result, + const mqbcmd::StatCommand& command, + const mqbcmd::EncodingFormat::Value& encoding) { if (command.isShowValue()) { bslmt::Semaphore semaphore; @@ -862,7 +928,8 @@ int StatController::processCommand(mqbcmd::StatResult* result, bdlf::BindUtil::bind(&StatController::captureStatsAndSemaphorePost, this, result, - &semaphore)); + &semaphore, + encoding)); semaphore.wait(); return 0; // RETURN } diff --git a/src/groups/mqb/mqbstat/mqbstat_statcontroller.h b/src/groups/mqb/mqbstat/mqbstat_statcontroller.h index ebc49b324..14a7fb60a 100644 --- a/src/groups/mqb/mqbstat/mqbstat_statcontroller.h +++ b/src/groups/mqb/mqbstat/mqbstat_statcontroller.h @@ -30,6 +30,7 @@ // MQB #include +#include #include // MWC @@ -129,6 +130,7 @@ class StatController { typedef bsl::shared_ptr StatContextSp; typedef bslma::ManagedPtr SystemStatMonitorMp; typedef bslma::ManagedPtr PrinterMp; + typedef bslma::ManagedPtr JsonPrinterMp; typedef bslma::ManagedPtr StatPublisherMp; typedef bslma::ManagedPtr StatConsumerMp; @@ -154,67 +156,72 @@ class StatController { StatContextDetailsMap; // DATA + /// Allocator store to spawn new allocators + /// for sub-components. mwcma::CountingAllocatorStore d_allocators; - // Allocator store to spawn new allocators - // for sub-components. + /// This component should use it's own + /// scheduler to not have stats interfere + /// with critical other parts. SchedulerMp d_scheduler_mp; - // This component should use it's own - // scheduler to not have stats interfere - // with critical other parts. + /// Time at which snapshot was last called. bsls::Types::Int64 d_lastSnapshotTime; - // Time at which snapshot was last called. - bdlmt::Throttle d_lastSnapshotLogLimiter; - // Throttler for alarming on excessive - // snapshots. + /// Throttler for safeguarding against too often snapshot invocations. + /// Note that snapshots are done periodically via EventScheduler, but + /// also it is possible to make urgent out-of-order snapshot with + /// admin command. + bdlmt::Throttle d_snapshotThrottle; + /// Stat context of the counting allocators, + /// if used. mwcst::StatContext* d_allocatorsStatContext_p; - // Stat context of the counting allocators, - // if used. + /// Map holding all the stat contexts StatContextDetailsMap d_statContextsMap; - // Map holding all the stat contexts + /// 'local' child stat context of the + /// 'channels' stat context StatContextMp d_statContextChannelsLocal_mp; - // 'local' child stat context of the - // 'channels' stat context + /// 'remote' child stat context of the + /// 'channels' stat context StatContextMp d_statContextChannelsRemote_mp; - // 'remote' child stat context of the - // 'channels' stat context + /// System stat monitor (for cpu and + /// memory). SystemStatMonitorMp d_systemStatMonitor_mp; - // System stat monitor (for cpu and - // memory). + /// Used to instantiate 'StatConsumer' + /// plugins at start-time. mqbplug::PluginManager* d_pluginManager_p; - // Used to instantiate 'StatConsumer' - // plugins at start-time. + /// Buffer factory for a StatsProvider if + /// provided as a plugin. bdlbb::BlobBufferFactory* d_bufferFactory_p; - // Buffer factory for a StatsProvider if - // provided as a plugin. + /// Function to invoke when receiving a command + /// from a command processor plugin. CommandProcessorFn d_commandProcessorFn; - // Function to invoke when receiving a - // command from a command processor plugin. + /// Console and log file stats printer PrinterMp d_printer_mp; - // Printer + /// JsonPrinter used for admin commands processing + JsonPrinterMp d_jsonPrinter_mp; + + /// Registered stat consumers bsl::vector d_statConsumers; + /// StatConsumer max publish interval int d_statConsumerMaxPublishInterval; - // StatConsumer max publish interval + /// Event scheduler passed in from application bdlmt::EventScheduler* d_eventScheduler_p; - // Event scheduler passed in from - // application + /// Allocator to use. bslma::Allocator* d_allocator_p; - // Allocator to use. private: // PRIVATE MANIPULATORS @@ -222,14 +229,16 @@ class StatController { /// Initialize all the stat contexts and associated Tables and TIPs. void initializeStats(); - /// Capture the stats and store the stats in the specified `result` - /// object. - void captureStats(mqbcmd::StatResult* result); - /// Capture the stats to the specified `result`' object and post on the - /// specified `semaphore` once done. - void captureStatsAndSemaphorePost(mqbcmd::StatResult* result, - bslmt::Semaphore* semaphore); + /// specified `semaphore` once done. The specified `encoding` parameter + /// controls whether the result should be in a text format or in a JSON. + /// Note that the JSON format is typically used within integration tests + /// so we make an out of order stats snapshot when compact or pretty JSON + /// encoding is passed. + void captureStatsAndSemaphorePost( + mqbcmd::StatResult* result, + bslmt::Semaphore* semaphore, + const mqbcmd::EncodingFormat::Value& encoding); /// Process specified `tunable` subcommand and load the result into the /// specified `result` and post on the optionally specified `semaphore` @@ -251,8 +260,14 @@ class StatController { void listTunables(mqbcmd::StatResult* result, bslmt::Semaphore* semaphore = 0); - /// Snapshot the stats. - void snapshot(); + /// Try to snapshot the stats. + /// Return `true` upon success and `false` otherwise. The attempt to make + /// a snapshot might fail if we try to call `snapshot()` too often. + bool snapshot(); + + /// Try to snapshot the stats and notify all the registered stat consumers + /// upon success. + void snapshotAndNotify(); // PRIVATE ACCESSORS @@ -298,11 +313,13 @@ class StatController { /// (allocators, systems, domainQueues, clients, ...). void loadStatContexts(StatContexts* contexts); - /// Process the specified `command`, and write the result to the - /// `result`' object. Return zero on success or a nonzero value - /// otherwise. - int processCommand(mqbcmd::StatResult* result, - const mqbcmd::StatCommand& command); + /// Process the specified `command`, and write the result to the `result` + /// object. Return zero on success or a nonzero value otherwise. + /// The specified `encoding` parameter controls whether the result should + /// be in a text format or in a JSON. + int processCommand(mqbcmd::StatResult* result, + const mqbcmd::StatCommand& command, + const mqbcmd::EncodingFormat::Value& encoding); /// Retrieve the domains top-level stat context. mwcst::StatContext* domainsStatContext(); diff --git a/src/groups/mqb/mqbstat/package/mqbstat.mem b/src/groups/mqb/mqbstat/package/mqbstat.mem index bb65c7810..0dd4cde8e 100644 --- a/src/groups/mqb/mqbstat/package/mqbstat.mem +++ b/src/groups/mqb/mqbstat/package/mqbstat.mem @@ -1,6 +1,7 @@ mqbstat_brokerstats mqbstat_clusterstats mqbstat_domainstats +mqbstat_jsonprinter mqbstat_printer mqbstat_queuestats mqbstat_statcontroller diff --git a/src/groups/mwc/mwcst/mwcst_statvalue.h b/src/groups/mwc/mwcst/mwcst_statvalue.h index 86bddf45c..7ab0d6ca8 100644 --- a/src/groups/mwc/mwcst/mwcst_statvalue.h +++ b/src/groups/mwc/mwcst/mwcst_statvalue.h @@ -339,7 +339,7 @@ class StatValue { /// Return the snapshot referred to by the specified `location`. The /// behavior is undefined unless /// `location.level() < numLevels()` and - /// `location.index() <= historySize(location.level())` + /// `location.index() < historySize(location.level())` const Snapshot& snapshot(const SnapshotLocation& location) const; /// Return the minimum value of this StatValue since creation. diff --git a/src/integration-tests/test_admin_client.py b/src/integration-tests/test_admin_client.py index bf108680f..3c944f93f 100644 --- a/src/integration-tests/test_admin_client.py +++ b/src/integration-tests/test_admin_client.py @@ -19,14 +19,16 @@ """ import dataclasses import json -from typing import Dict, Optional +from typing import Dict, Optional, Union import blazingmq.dev.it.testconstants as tc from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import Cluster, order, single_node, + tweak, ) +from blazingmq.dev.it.data import data_metrics as dt from blazingmq.dev.it.process.admin import AdminClient from blazingmq.dev.it.process.client import Client @@ -70,6 +72,53 @@ def post_n_msgs( posted[task.uri] = task +def extract_stats(admin_response: str) -> dict: + """ + Extracts the dictionary containing stats from the specified 'admin_response'. + Note that due to xsd schema limitations it's not possible to make a schema for a json + containing random keys. Due to this, the stats encoding is not ideal: + - The outer layer is a 'str' response from the admin session + - Next layer is a 'dict' containing "stats" field with 'str' text + - The last layer is another 'dict' containing the stats itself + """ + d1 = json.loads(admin_response) + d2 = json.loads(d1["stats"]) + return d2 + + +def expect_same_structure( + entry: Union[dict, list, str, int], + expected: Union[dict, list, str, int, dt.ValueConstraint], +) -> None: + """ + Check if the specified 'entry' has the same structure as the specified 'expected'. + Note that the 'expected' param could have fixed parameters as well as value constraint + placeholders that are used to represent a value non-fixed across different test launches. + Assert on failure. + """ + + if isinstance(expected, dict): + assert isinstance(entry, dict) + assert expected.keys() == entry.keys() + for key in expected: + expect_same_structure(entry[key], expected[key]) + elif isinstance(expected, list): + assert isinstance(entry, list) + assert len(expected) == len(entry) + for obj2, expected2 in zip(entry, expected): + expect_same_structure(obj2, expected2) + elif isinstance(expected, str): + assert isinstance(entry, str) + assert expected == entry + else: + assert isinstance(entry, int) + if isinstance(expected, dt.ValueConstraint): + assert expected.check(entry) + else: + assert isinstance(expected, int) + assert entry == expected + + def test_breathing(single_node: Cluster) -> None: """ Test: basic admin session usage. @@ -108,6 +157,83 @@ def test_breathing(single_node: Cluster) -> None: admin.stop() +@tweak.broker.app_config.stats.app_id_tag_domains([tc.DOMAIN_FANOUT]) +def test_queue_stats(single_node: Cluster) -> None: + """ + Test: queue metrics via admin command. + Preconditions: + - Establish admin session with the cluster. + + Stage 1: check stats after posting messages + - Open a producer + - Post messages to a fanout queue + - Verify stats acquired via admin command with the expected stats + + Stage 2: check stats after confirming messages + - Open a consumer for each appId + - Confirm a portion of messages for each consumer + - Verify stats acquired via admin command with the expected stats + + Stage 3: check too-often stats safeguard + - Send several 'stat show' requests + - Verify that the admin session complains about too often stat request + + Concerns: + - The broker is able to report queue metrics for fanout queue. + - Safeguarding mechanism prevents from getting stats too often. + """ + + # Preconditions + admin = AdminClient() + admin.connect(*single_node.admin_endpoint) + + # Stage 1: check stats after posting messages + cluster: Cluster = single_node + proxies = cluster.proxy_cycle() + proxy = next(proxies) + producer: Client = proxy.create_client("producer") + + task = PostRecord(tc.DOMAIN_FANOUT, "test_stats", num=32) + post_n_msgs(producer, task) + + stats = extract_stats(admin.send_admin("encoding json_pretty stat show")) + queue_stats = stats["domainQueues"]["domains"][tc.DOMAIN_FANOUT][task.uri] + + expect_same_structure(queue_stats, dt.TEST_QUEUE_STATS_AFTER_POST) + + # Stage 2: check stats after confirming messages + consumer_foo: Client = proxy.create_client("consumer_foo") + consumer_foo.open(f"{task.uri}?id=foo", flags=["read"], succeed=True) + consumer_foo.confirm(f"{task.uri}?id=foo", "*", succeed=True) + + consumer_bar: Client = proxy.create_client("consumer_bar") + consumer_bar.open(f"{task.uri}?id=bar", flags=["read"], succeed=True) + consumer_bar.confirm(f"{task.uri}?id=bar", "+22", succeed=True) + + consumer_baz: Client = proxy.create_client("consumer_baz") + consumer_baz.open(f"{task.uri}?id=baz", flags=["read"], succeed=True) + consumer_baz.confirm(f"{task.uri}?id=baz", "+11", succeed=True) + + stats = extract_stats(admin.send_admin("encoding json_pretty stat show")) + queue_stats = stats["domainQueues"]["domains"][tc.DOMAIN_FANOUT][task.uri] + + expect_same_structure(queue_stats, dt.TEST_QUEUE_STATS_AFTER_CONFIRM) + + consumer_foo.close(f"{task.uri}?id=foo") + consumer_bar.close(f"{task.uri}?id=bar") + consumer_baz.close(f"{task.uri}?id=baz") + + # Stage 3: check too-often stats safeguard + for i in range(5): + admin.send_admin("encoding json_pretty stat show") + res = admin.send_admin("encoding json_pretty stat show") + obj = json.loads(res) + + expect_same_structure(obj, dt.TEST_QUEUE_STATS_TOO_OFTEN_SNAPSHOTS) + + admin.stop() + + def test_admin_encoding(single_node: Cluster) -> None: """ Test: admin commands output format. diff --git a/src/integration-tests/test_appids.py b/src/integration-tests/test_appids.py index 0a002366b..d6296102e 100644 --- a/src/integration-tests/test_appids.py +++ b/src/integration-tests/test_appids.py @@ -508,7 +508,7 @@ def test_unauthorization(cluster: Cluster): def test_two_consumers_of_unauthorized_app(multi_node: Cluster): - """DRQS 167201621: First client open authorized and unauthorized apps; + """Ticket 167201621: First client open authorized and unauthorized apps; second client opens unauthorized app. Then, primary shuts down causing replica to issue wildcard close requests to primary. diff --git a/src/integration-tests/test_breathing.py b/src/integration-tests/test_breathing.py index a6ef99e06..d5dda9290 100644 --- a/src/integration-tests/test_breathing.py +++ b/src/integration-tests/test_breathing.py @@ -523,7 +523,7 @@ def test_verify_broadcast(cluster: Cluster): def test_verify_redelivery(cluster: Cluster): """Drop one consumer having unconfirmed message while there is another consumer unable to take the message (due to max_unconfirmed_messages - limit). Then start new consumer and make sure it does not crash (DRQS + limit). Then start new consumer and make sure it does not crash (Ticket 156808957) and receives that unconfirmed message. """ proxies = cluster.proxy_cycle() diff --git a/src/integration-tests/test_graceful_shutdown.py b/src/integration-tests/test_graceful_shutdown.py index ef5c4184f..9240d5c89 100644 --- a/src/integration-tests/test_graceful_shutdown.py +++ b/src/integration-tests/test_graceful_shutdown.py @@ -110,7 +110,7 @@ def num_broker_messages(): assert wait_until(lambda: num_broker_messages() == 3, 3) - # DRQS 168471730. Downstream should update its opened subIds upon + # Ticket 168471730. Downstream should update its opened subIds upon # closing and not attempt to deconfigure it upon StopRequest self.producer.close(tc.URI_FANOUT, block=True) @@ -339,7 +339,7 @@ def test_multiple_stop_requests(self, multi_cluster: Cluster): @tweak.cluster.queue_operations.shutdown_timeout_ms(999999) def test_active_node_down_stop_requests(self, multi_cluster: Cluster): """ - DRQS 169782591 + Ticket 169782591 We have: Consumer -> Proxy -> active_node -> upstream_node. Start shutting down active_node (one of cluster.virtual_nodes()) Because there are unconfirmed, Proxy lingers with StopResponse. diff --git a/src/integration-tests/test_queue_close.py b/src/integration-tests/test_queue_close.py index 80d258781..770ec15c9 100644 --- a/src/integration-tests/test_queue_close.py +++ b/src/integration-tests/test_queue_close.py @@ -52,7 +52,7 @@ def test_close_queue(single_node: Cluster): @start_cluster(False) def test_close_while_reopening(multi_node: Cluster): """ - DRQS 169125974. Closing queue while reopen response is pending should + Ticket 169125974. Closing queue while reopen response is pending should not result in a dangling handle. """ @@ -116,7 +116,7 @@ def test_close_while_reopening(multi_node: Cluster): def test_close_open(multi_node: Cluster): """ - DRQS 169326671. Close, followed by Open with a different subId. + Ticket 169326671. Close, followed by Open with a different subId. """ proxies = multi_node.proxy_cycle() # pick proxy in datacenter opposite to the primary's @@ -140,7 +140,7 @@ def test_close_open(multi_node: Cluster): @tweak.cluster.queue_operations.reopen_retry_interval_ms(1234) def test_close_while_retrying_reopen(multi_node: Cluster): """ - DRQS 170043950. Trigger reopen failure causing proxy to retry on + Ticket 170043950. Trigger reopen failure causing proxy to retry on timeout. While waiting, close the queue and make sure, the retry accounts for that close. """ diff --git a/src/integration-tests/test_queue_reopen.py b/src/integration-tests/test_queue_reopen.py index 8bafaf170..ea3602b36 100644 --- a/src/integration-tests/test_queue_reopen.py +++ b/src/integration-tests/test_queue_reopen.py @@ -66,7 +66,7 @@ def test_reopen_empty_queue(multi_node: Cluster): def test_reopen_substream(multi_node: Cluster): """ - DRQS 169527537. Make a primary's client reopen the same appId with a + Ticket 169527537. Make a primary's client reopen the same appId with a different subId. """ diff --git a/src/integration-tests/test_subscriptions.py b/src/integration-tests/test_subscriptions.py index f5af63e22..f2adf041e 100644 --- a/src/integration-tests/test_subscriptions.py +++ b/src/integration-tests/test_subscriptions.py @@ -1454,7 +1454,7 @@ def test_no_capacity_all_optimization(cluster: Cluster): Test: delivery optimization works during routing when all subscriptions have no capacity. - DRQS 171509204 + Ticket 171509204 - Create 1 producer / 3 consumers: C1, C2, C3. - Consumers: max_unconfirmed_messages = 1. @@ -1557,7 +1557,7 @@ def test_no_capacity_all_fanout(cluster: Cluster): Test: delivery optimization encountered with one app does not affect other apps. - DRQS 171509204 + Ticket 171509204 - Create 1 producer / 2 consumers: C_foo, C_bar. - C_foo: max_unconfirmed_messages = 128. diff --git a/src/python/blazingmq/dev/configurator/__init__.py b/src/python/blazingmq/dev/configurator/__init__.py index 0ea7b8e61..beb401e6f 100644 --- a/src/python/blazingmq/dev/configurator/__init__.py +++ b/src/python/blazingmq/dev/configurator/__init__.py @@ -535,6 +535,7 @@ class Proto: ), ), stats=mqbcfg.StatsConfig( + app_id_tag_domains=[], plugins=[], snapshot_interval=1, printer=mqbcfg.StatsPrinterConfig( diff --git a/src/python/blazingmq/dev/it/data/README.md b/src/python/blazingmq/dev/it/data/README.md new file mode 100644 index 000000000..bc6ce6be6 --- /dev/null +++ b/src/python/blazingmq/dev/it/data/README.md @@ -0,0 +1,5 @@ +# Integration Tests Additional Data + +This directory contains additional data used by integration tests. +What could be a data: +- Structs too large to define in ITs scripts, for readability concerns. \ No newline at end of file diff --git a/src/python/blazingmq/dev/it/data/__init__.py b/src/python/blazingmq/dev/it/data/__init__.py new file mode 100644 index 000000000..1dfd29ae6 --- /dev/null +++ b/src/python/blazingmq/dev/it/data/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2024 Bloomberg Finance L.P. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/blazingmq/dev/it/data/data_metrics.py b/src/python/blazingmq/dev/it/data/data_metrics.py new file mode 100644 index 000000000..e31479f38 --- /dev/null +++ b/src/python/blazingmq/dev/it/data/data_metrics.py @@ -0,0 +1,200 @@ +# Copyright 2024 Bloomberg Finance L.P. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Additional data for metrics-related integration tests. + +Note: currently these tests are placed in 'test_admin_client.py' tests block, but + might be moved to their separate source file if we add more of them. + +Note: when the metrics collection evolves, it might be necessary to update + the data structures in this source. To do so, it's possible to copy values + observed in the debugger for the specific test and paste them as a dictionary, + while replacing non-fixed parameters such as time intervals with a + condition placeholders (see 'GreaterThan'). +""" + + +from typing import Any + + +class ValueConstraint: + """The base class for non-fixed values in a data.""" + + def check(self, value: Any) -> bool: + """Return 'True' if the specified 'value' satisfy the checked constraint. + The caller is responsible for deciding what to do next, e.g. raising an assert. + """ + ... + + +class GreaterThan(ValueConstraint): + """This constraint checks if the provided value is greater than the set constraint.""" + + def __init__(self, constraint: Any) -> None: + self._constraint = constraint + + def check(self, value: Any) -> bool: + """Check if the specified 'value' is greater than the stored '_constraint'.""" + return self._constraint < value + + +TEST_QUEUE_STATS_AFTER_POST = { + "appIds": { + "bar": { + "values": { + "queue_confirm_time_max": 0, + "queue_confirm_time_avg": 0, + "queue_nack_msgs": 0, + "queue_ack_time_max": 0, + "queue_ack_msgs": 0, + "queue_confirm_msgs": 0, + "queue_push_bytes": 0, + "queue_consumers_count": 0, + "queue_producers_count": 0, + "queue_push_msgs": 0, + "queue_ack_time_avg": 0, + "queue_put_bytes": 0, + "queue_put_msgs": 0, + } + }, + "baz": { + "values": { + "queue_confirm_time_max": 0, + "queue_confirm_time_avg": 0, + "queue_nack_msgs": 0, + "queue_ack_time_max": 0, + "queue_ack_msgs": 0, + "queue_confirm_msgs": 0, + "queue_push_bytes": 0, + "queue_consumers_count": 0, + "queue_producers_count": 0, + "queue_push_msgs": 0, + "queue_ack_time_avg": 0, + "queue_put_bytes": 0, + "queue_put_msgs": 0, + } + }, + "foo": { + "values": { + "queue_confirm_time_max": 0, + "queue_confirm_time_avg": 0, + "queue_nack_msgs": 0, + "queue_ack_time_max": 0, + "queue_ack_msgs": 0, + "queue_confirm_msgs": 0, + "queue_push_bytes": 0, + "queue_consumers_count": 0, + "queue_producers_count": 0, + "queue_push_msgs": 0, + "queue_ack_time_avg": 0, + "queue_put_bytes": 0, + "queue_put_msgs": 0, + } + }, + }, + "values": { + "queue_confirm_time_max": 0, + "queue_confirm_time_avg": 0, + "queue_nack_msgs": 0, + "queue_ack_time_max": GreaterThan(0), + "queue_ack_msgs": 32, + "queue_confirm_msgs": 0, + "queue_push_bytes": 0, + "queue_consumers_count": 0, + "queue_producers_count": 0, + "queue_push_msgs": 0, + "queue_ack_time_avg": GreaterThan(0), + "queue_put_bytes": 96, + "queue_put_msgs": 32, + }, +} + +TEST_QUEUE_STATS_AFTER_CONFIRM = { + "appIds": { + "bar": { + "values": { + "queue_confirm_time_max": GreaterThan(0), + "queue_confirm_time_avg": GreaterThan(0), + "queue_nack_msgs": 0, + "queue_ack_time_max": 0, + "queue_ack_msgs": 0, + "queue_confirm_msgs": 0, + "queue_push_bytes": 0, + "queue_consumers_count": 0, + "queue_producers_count": 0, + "queue_push_msgs": 0, + "queue_ack_time_avg": 0, + "queue_put_bytes": 0, + "queue_put_msgs": 0, + } + }, + "baz": { + "values": { + "queue_confirm_time_max": GreaterThan(0), + "queue_confirm_time_avg": GreaterThan(0), + "queue_nack_msgs": 0, + "queue_ack_time_max": 0, + "queue_ack_msgs": 0, + "queue_confirm_msgs": 0, + "queue_push_bytes": 0, + "queue_consumers_count": 0, + "queue_producers_count": 0, + "queue_push_msgs": 0, + "queue_ack_time_avg": 0, + "queue_put_bytes": 0, + "queue_put_msgs": 0, + } + }, + "foo": { + "values": { + "queue_confirm_time_max": GreaterThan(0), + "queue_confirm_time_avg": GreaterThan(0), + "queue_nack_msgs": 0, + "queue_ack_time_max": 0, + "queue_ack_msgs": 0, + "queue_confirm_msgs": 0, + "queue_push_bytes": 0, + "queue_consumers_count": 0, + "queue_producers_count": 0, + "queue_push_msgs": 0, + "queue_ack_time_avg": 0, + "queue_put_bytes": 0, + "queue_put_msgs": 0, + } + }, + }, + "values": { + "queue_confirm_time_max": GreaterThan(0), + "queue_confirm_time_avg": GreaterThan(0), + "queue_nack_msgs": 0, + "queue_ack_time_max": GreaterThan(0), + "queue_ack_msgs": 32, + "queue_confirm_msgs": 65, + "queue_push_bytes": 288, + "queue_consumers_count": 3, + "queue_producers_count": 0, + "queue_push_msgs": 96, + "queue_ack_time_avg": GreaterThan(0), + "queue_put_bytes": 96, + "queue_put_msgs": 32, + }, +} + +TEST_QUEUE_STATS_TOO_OFTEN_SNAPSHOTS = { + "error": { + "message": "Cannot save the recent snapshot, trying to make snapshots too often" + } +} diff --git a/src/python/blazingmq/dev/it/process/client.py b/src/python/blazingmq/dev/it/process/client.py index 10182085d..6caab3ae7 100644 --- a/src/python/blazingmq/dev/it/process/client.py +++ b/src/python/blazingmq/dev/it/process/client.py @@ -157,7 +157,14 @@ def start_session(self, block=None, succeed=None, no_except=None, **kw): return res.error_code def open( - self, uri, flags, block=None, succeed=None, no_except=None, timeout=None, **kw + self, + uri, + flags: List[str], + block=None, + succeed=None, + no_except=None, + timeout=None, + **kw, ): """ Open the queue with the specified 'uri' and the specified 'flags'. If diff --git a/src/python/blazingmq/dev/it/tweaks/generated.py b/src/python/blazingmq/dev/it/tweaks/generated.py index 24f8933ae..a90fa6420 100644 --- a/src/python/blazingmq/dev/it/tweaks/generated.py +++ b/src/python/blazingmq/dev/it/tweaks/generated.py @@ -407,6 +407,12 @@ def __call__( dispatcher_config = DispatcherConfig() class Stats(metaclass=TweakMetaclass): + class AppIdTagDomains(metaclass=TweakMetaclass): + def __call__(self, value: None) -> Callable: + ... + + app_id_tag_domains = AppIdTagDomains() + class SnapshotInterval(metaclass=TweakMetaclass): def __call__(self, value: int) -> Callable: ... diff --git a/src/python/blazingmq/schemas/mqbcfg.py b/src/python/blazingmq/schemas/mqbcfg.py index 05fa6801f..e3dd040f7 100644 --- a/src/python/blazingmq/schemas/mqbcfg.py +++ b/src/python/blazingmq/schemas/mqbcfg.py @@ -1776,6 +1776,16 @@ class ClusterProxyDefinition: @dataclass class StatsConfig: + app_id_tag_domains: List[str] = field( + default_factory=list, + metadata={ + "name": "appIdTagDomains", + "type": "Element", + "namespace": "http://bloomberg.com/schemas/mqbcfg", + "min_occurs": 1, + "required": True, + }, + ) snapshot_interval: int = field( default=1, metadata={ From 0aecfd84f7d7a0b9b53bf55c7c9b8cd0b6a7d5a4 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Wed, 10 Jul 2024 22:02:53 +0300 Subject: [PATCH 4/5] ITs: fix run-tests script (#354) Signed-off-by: Evgeny Malygin --- src/integration-tests/run-tests | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/integration-tests/run-tests b/src/integration-tests/run-tests index 6bc10b86e..cd6546938 100755 --- a/src/integration-tests/run-tests +++ b/src/integration-tests/run-tests @@ -19,8 +19,9 @@ set -e PRESET="legacy_mode or fsm_mode" -if [ -n "$1" ]; then - PRESET="$1" +if [[ -n "$1" ]] && ! [[ "$1" == -* ]]; then + PRESET=$1 + shift echo "Use IT preset '$PRESET' from the script argument" elif [ -n "$BLAZINGMQ_IT_PRESET" ]; then PRESET=$BLAZINGMQ_IT_PRESET @@ -35,4 +36,4 @@ repo_dir=${repo_dir%/src/*} export PYTHONPATH=$repo_dir/src/python:$PYTHONPATH cd "$repo_dir/src/integration-tests" -python3 -m pytest -m "$PRESET" "${@: 2}" +python3 -m pytest -m "$PRESET" "$@" From a145cd1afbd518096a27ddd4a310e384e08ebf41 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Fri, 12 Jul 2024 23:37:23 +0300 Subject: [PATCH 5/5] CI: use the latest black formatter version (#355) * CI: use the latest black formatter version * CI: install python3.12 via action Signed-off-by: Evgeny Malygin --- .github/workflows/formatting-check.yaml | 5 +- .../blazingmq/dev/configurator/__init__.py | 20 +- .../blazingmq/dev/it/tweaks/generated.py | 644 +++++++----------- 3 files changed, 244 insertions(+), 425 deletions(-) diff --git a/.github/workflows/formatting-check.yaml b/.github/workflows/formatting-check.yaml index 9eeb28b9c..aff598752 100644 --- a/.github/workflows/formatting-check.yaml +++ b/.github/workflows/formatting-check.yaml @@ -35,9 +35,12 @@ jobs: - uses: actions/checkout@v4 with: fetch-depth: 0 + - uses: actions/setup-python@v5 + with: + python-version: '3.12' - name: black style check run: | - sudo apt-get install black + pip3 install git+https://github.com/psf/black black . git diff -q | tee format_diff.txt if [ -s format_diff.txt ]; then exit 1; fi diff --git a/src/python/blazingmq/dev/configurator/__init__.py b/src/python/blazingmq/dev/configurator/__init__.py index beb401e6f..b994cd3da 100644 --- a/src/python/blazingmq/dev/configurator/__init__.py +++ b/src/python/blazingmq/dev/configurator/__init__.py @@ -355,32 +355,28 @@ class Site(abc.ABC): configurator: "Configurator" @abc.abstractmethod - def __str__(self) -> str: - ... + def __str__(self) -> str: ... @abc.abstractmethod - def install(self, from_path: Union[str, Path], to_path: Union[str, Path]) -> None: - ... + def install( + self, from_path: Union[str, Path], to_path: Union[str, Path] + ) -> None: ... @abc.abstractmethod - def create_file(self, path: Union[str, Path], content: str, mode=None) -> None: - ... + def create_file(self, path: Union[str, Path], content: str, mode=None) -> None: ... @abc.abstractmethod - def mkdir(self, path: Union[str, Path]) -> None: - ... + def mkdir(self, path: Union[str, Path]) -> None: ... @abc.abstractmethod - def rmdir(self, path: Union[str, Path]) -> None: - ... + def rmdir(self, path: Union[str, Path]) -> None: ... @abc.abstractmethod def create_json_file( self, path: Union[str, Path], content, - ) -> None: - ... + ) -> None: ... def _cluster_definition_partial_prototype(partition_config: mqbcfg.PartitionConfig): diff --git a/src/python/blazingmq/dev/it/tweaks/generated.py b/src/python/blazingmq/dev/it/tweaks/generated.py index a90fa6420..1295347c7 100644 --- a/src/python/blazingmq/dev/it/tweaks/generated.py +++ b/src/python/blazingmq/dev/it/tweaks/generated.py @@ -45,100 +45,99 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.AllocatorType, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... allocator_type = AllocatorType() class AllocationLimit(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... allocation_limit = AllocationLimit() class LogController(metaclass=TweakMetaclass): class FileName(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[str, NoneType] + ) -> Callable: ... file_name = FileName() class FileMaxAgeDays(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[int, NoneType] + ) -> Callable: ... file_max_age_days = FileMaxAgeDays() class RotationBytes(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[int, NoneType] + ) -> Callable: ... rotation_bytes = RotationBytes() class LogfileFormat(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[str, NoneType] + ) -> Callable: ... logfile_format = LogfileFormat() class ConsoleFormat(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[str, NoneType] + ) -> Callable: ... console_format = ConsoleFormat() class LoggingVerbosity(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[str, NoneType] + ) -> Callable: ... logging_verbosity = LoggingVerbosity() class BslsLogSeverityThreshold(metaclass=TweakMetaclass): - def __call__(self, value: str) -> Callable: - ... + def __call__(self, value: str) -> Callable: ... bsls_log_severity_threshold = BslsLogSeverityThreshold() class ConsoleSeverityThreshold(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[str, NoneType] + ) -> Callable: ... console_severity_threshold = ConsoleSeverityThreshold() class Categories(metaclass=TweakMetaclass): - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... categories = Categories() class Syslog(metaclass=TweakMetaclass): class Enabled(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... enabled = Enabled() class AppName(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[str, NoneType] - ) -> Callable: - ... + ) -> Callable: ... app_name = AppName() class LogFormat(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[str, NoneType] - ) -> Callable: - ... + ) -> Callable: ... log_format = LogFormat() class Verbosity(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[str, NoneType] - ) -> Callable: - ... + ) -> Callable: ... verbosity = Verbosity() @@ -147,8 +146,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.SyslogConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... syslog = Syslog() @@ -157,76 +155,64 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.LogController, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... log_controller = LogController() def __call__( self, value: typing.Union[blazingmq.schemas.mqbcfg.TaskConfig, NoneType] - ) -> Callable: - ... + ) -> Callable: ... task_config = TaskConfig() class AppConfig(metaclass=TweakMetaclass): class BrokerInstanceName(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... broker_instance_name = BrokerInstanceName() class BrokerVersion(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... broker_version = BrokerVersion() class ConfigVersion(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... config_version = ConfigVersion() class EtcDir(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... etc_dir = EtcDir() class HostName(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... host_name = HostName() class HostTags(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... host_tags = HostTags() class HostDataCenter(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... host_data_center = HostDataCenter() class IsRunningOnDev(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[bool, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[bool, NoneType]) -> Callable: ... is_running_on_dev = IsRunningOnDev() class LogsObserverMaxSize(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... logs_observer_max_size = LogsObserverMaxSize() class LatencyMonitorDomain(metaclass=TweakMetaclass): - def __call__(self, value: str) -> Callable: - ... + def __call__(self, value: str) -> Callable: ... latency_monitor_domain = LatencyMonitorDomain() @@ -235,8 +221,7 @@ class Sessions(metaclass=TweakMetaclass): class NumProcessors(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... num_processors = NumProcessors() @@ -244,24 +229,21 @@ class ProcessorConfig(metaclass=TweakMetaclass): class QueueSize(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size = QueueSize() class QueueSizeLowWatermark(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size_low_watermark = QueueSizeLowWatermark() class QueueSizeHighWatermark(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size_high_watermark = QueueSizeHighWatermark() @@ -271,8 +253,7 @@ def __call__( blazingmq.schemas.mqbcfg.DispatcherProcessorParameters, NoneType, ], - ) -> Callable: - ... + ) -> Callable: ... processor_config = ProcessorConfig() @@ -281,8 +262,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.DispatcherProcessorConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... sessions = Sessions() @@ -290,8 +270,7 @@ class Queues(metaclass=TweakMetaclass): class NumProcessors(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... num_processors = NumProcessors() @@ -299,24 +278,21 @@ class ProcessorConfig(metaclass=TweakMetaclass): class QueueSize(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size = QueueSize() class QueueSizeLowWatermark(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size_low_watermark = QueueSizeLowWatermark() class QueueSizeHighWatermark(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size_high_watermark = QueueSizeHighWatermark() @@ -326,8 +302,7 @@ def __call__( blazingmq.schemas.mqbcfg.DispatcherProcessorParameters, NoneType, ], - ) -> Callable: - ... + ) -> Callable: ... processor_config = ProcessorConfig() @@ -336,8 +311,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.DispatcherProcessorConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... queues = Queues() @@ -345,8 +319,7 @@ class Clusters(metaclass=TweakMetaclass): class NumProcessors(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... num_processors = NumProcessors() @@ -354,24 +327,21 @@ class ProcessorConfig(metaclass=TweakMetaclass): class QueueSize(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size = QueueSize() class QueueSizeLowWatermark(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size_low_watermark = QueueSizeLowWatermark() class QueueSizeHighWatermark(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... queue_size_high_watermark = QueueSizeHighWatermark() @@ -381,8 +351,7 @@ def __call__( blazingmq.schemas.mqbcfg.DispatcherProcessorParameters, NoneType, ], - ) -> Callable: - ... + ) -> Callable: ... processor_config = ProcessorConfig() @@ -391,8 +360,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.DispatcherProcessorConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... clusters = Clusters() @@ -401,70 +369,59 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.DispatcherConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... dispatcher_config = DispatcherConfig() class Stats(metaclass=TweakMetaclass): class AppIdTagDomains(metaclass=TweakMetaclass): - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... app_id_tag_domains = AppIdTagDomains() class SnapshotInterval(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... snapshot_interval = SnapshotInterval() class Plugins(metaclass=TweakMetaclass): class Name(metaclass=TweakMetaclass): - def __call__(self, value: str) -> Callable: - ... + def __call__(self, value: str) -> Callable: ... name = Name() class QueueSize(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... queue_size = QueueSize() class QueueHighWatermark(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... queue_high_watermark = QueueHighWatermark() class QueueLowWatermark(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... queue_low_watermark = QueueLowWatermark() class PublishInterval(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... publish_interval = PublishInterval() class NamespacePrefix(metaclass=TweakMetaclass): - def __call__(self, value: str) -> Callable: - ... + def __call__(self, value: str) -> Callable: ... namespace_prefix = NamespacePrefix() class Hosts(metaclass=TweakMetaclass): - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... hosts = Hosts() class InstanceId(metaclass=TweakMetaclass): - def __call__(self, value: str) -> Callable: - ... + def __call__(self, value: str) -> Callable: ... instance_id = InstanceId() @@ -472,20 +429,17 @@ class PrometheusSpecific(metaclass=TweakMetaclass): class Mode(metaclass=TweakMetaclass): def __call__( self, value: blazingmq.schemas.mqbcfg.ExportMode - ) -> Callable: - ... + ) -> Callable: ... mode = Mode() class Host(metaclass=TweakMetaclass): - def __call__(self, value: str) -> Callable: - ... + def __call__(self, value: str) -> Callable: ... host = Host() class Port(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... port = Port() @@ -495,48 +449,41 @@ def __call__( blazingmq.schemas.mqbcfg.StatPluginConfigPrometheus, NoneType, ], - ) -> Callable: - ... + ) -> Callable: ... prometheus_specific = PrometheusSpecific() - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... plugins = Plugins() class Printer(metaclass=TweakMetaclass): class PrintInterval(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... print_interval = PrintInterval() class File(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[str, NoneType] - ) -> Callable: - ... + ) -> Callable: ... file = File() class MaxAgeDays(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... max_age_days = MaxAgeDays() class RotateBytes(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... rotate_bytes = RotateBytes() class RotateDays(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... rotate_days = RotateDays() @@ -545,42 +492,36 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.StatsPrinterConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... printer = Printer() def __call__( self, value: typing.Union[blazingmq.schemas.mqbcfg.StatsConfig, NoneType], - ) -> Callable: - ... + ) -> Callable: ... stats = Stats() class NetworkInterfaces(metaclass=TweakMetaclass): class Heartbeats(metaclass=TweakMetaclass): class Client(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... client = Client() class DownstreamBroker(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... downstream_broker = DownstreamBroker() class UpstreamBroker(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... upstream_broker = UpstreamBroker() class ClusterPeer(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... cluster_peer = ClusterPeer() @@ -589,8 +530,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.Heartbeat, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... heartbeats = Heartbeats() @@ -598,70 +538,60 @@ class TcpInterface(metaclass=TweakMetaclass): class Name(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[str, NoneType] - ) -> Callable: - ... + ) -> Callable: ... name = Name() class Port(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... port = Port() class IoThreads(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... io_threads = IoThreads() class MaxConnections(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_connections = MaxConnections() class LowWatermark(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... low_watermark = LowWatermark() class HighWatermark(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[int, NoneType] - ) -> Callable: - ... + ) -> Callable: ... high_watermark = HighWatermark() class NodeLowWatermark(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... node_low_watermark = NodeLowWatermark() class NodeHighWatermark(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... node_high_watermark = NodeHighWatermark() class HeartbeatIntervalMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... heartbeat_interval_ms = HeartbeatIntervalMs() class UseNtf(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... use_ntf = UseNtf() @@ -670,8 +600,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.TcpInterfaceConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... tcp_interface = TcpInterface() @@ -680,15 +609,15 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.NetworkInterfaces, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... network_interfaces = NetworkInterfaces() class BmqconfConfig(metaclass=TweakMetaclass): class CacheTtlseconds(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[int, NoneType] + ) -> Callable: ... cache_ttlseconds = CacheTtlseconds() @@ -697,48 +626,41 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.BmqconfConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... bmqconf_config = BmqconfConfig() class Plugins(metaclass=TweakMetaclass): class Libraries(metaclass=TweakMetaclass): - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... libraries = Libraries() class Enabled(metaclass=TweakMetaclass): - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... enabled = Enabled() def __call__( self, value: typing.Union[blazingmq.schemas.mqbcfg.Plugins, NoneType], - ) -> Callable: - ... + ) -> Callable: ... plugins = Plugins() class MessagePropertiesV2(metaclass=TweakMetaclass): class AdvertiseV2Support(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... advertise_v2_support = AdvertiseV2Support() class MinCppSdkVersion(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... min_cpp_sdk_version = MinCppSdkVersion() class MinJavaSdkVersion(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... min_java_sdk_version = MinJavaSdkVersion() @@ -747,36 +669,31 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.MessagePropertiesV2, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... message_properties_v2 = MessagePropertiesV2() class ConfigureStream(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... configure_stream = ConfigureStream() def __call__( self, value: typing.Union[blazingmq.schemas.mqbcfg.AppConfig, NoneType] - ) -> Callable: - ... + ) -> Callable: ... app_config = AppConfig() class Domain: class Name(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... name = Name() class Mode(metaclass=TweakMetaclass): class Fanout(metaclass=TweakMetaclass): class AppIds(metaclass=TweakMetaclass): - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... app_ids = AppIds() @@ -785,8 +702,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.QueueModeFanout, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... fanout = Fanout() @@ -796,8 +712,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.QueueModePriority, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... priority = Priority() @@ -807,82 +722,78 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.QueueModeBroadcast, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... broadcast = Broadcast() def __call__( self, value: typing.Union[blazingmq.schemas.mqbconf.QueueMode, NoneType] - ) -> Callable: - ... + ) -> Callable: ... mode = Mode() class Storage(metaclass=TweakMetaclass): class DomainLimits(metaclass=TweakMetaclass): class Messages(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[int, NoneType] + ) -> Callable: ... messages = Messages() class MessagesWatermarkRatio(metaclass=TweakMetaclass): - def __call__(self, value: decimal.Decimal) -> Callable: - ... + def __call__(self, value: decimal.Decimal) -> Callable: ... messages_watermark_ratio = MessagesWatermarkRatio() class Bytes(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[int, NoneType] + ) -> Callable: ... bytes = Bytes() class BytesWatermarkRatio(metaclass=TweakMetaclass): - def __call__(self, value: decimal.Decimal) -> Callable: - ... + def __call__(self, value: decimal.Decimal) -> Callable: ... bytes_watermark_ratio = BytesWatermarkRatio() def __call__( self, value: typing.Union[blazingmq.schemas.mqbconf.Limits, NoneType], - ) -> Callable: - ... + ) -> Callable: ... domain_limits = DomainLimits() class QueueLimits(metaclass=TweakMetaclass): class Messages(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[int, NoneType] + ) -> Callable: ... messages = Messages() class MessagesWatermarkRatio(metaclass=TweakMetaclass): - def __call__(self, value: decimal.Decimal) -> Callable: - ... + def __call__(self, value: decimal.Decimal) -> Callable: ... messages_watermark_ratio = MessagesWatermarkRatio() class Bytes(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[int, NoneType] + ) -> Callable: ... bytes = Bytes() class BytesWatermarkRatio(metaclass=TweakMetaclass): - def __call__(self, value: decimal.Decimal) -> Callable: - ... + def __call__(self, value: decimal.Decimal) -> Callable: ... bytes_watermark_ratio = BytesWatermarkRatio() def __call__( self, value: typing.Union[blazingmq.schemas.mqbconf.Limits, NoneType], - ) -> Callable: - ... + ) -> Callable: ... queue_limits = QueueLimits() @@ -893,8 +804,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.InMemoryStorage, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... in_memory = InMemory() @@ -904,16 +814,14 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.FileBackedStorage, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... file_backed = FileBacked() def __call__( self, value: typing.Union[blazingmq.schemas.mqbconf.Storage, NoneType], - ) -> Callable: - ... + ) -> Callable: ... config = Config() @@ -922,45 +830,38 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.StorageDefinition, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... storage = Storage() class MaxConsumers(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_consumers = MaxConsumers() class MaxProducers(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_producers = MaxProducers() class MaxQueues(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_queues = MaxQueues() class MsgGroupIdConfig(metaclass=TweakMetaclass): class Rebalance(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... rebalance = Rebalance() class MaxGroups(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_groups = MaxGroups() class TtlSeconds(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... ttl_seconds = TtlSeconds() @@ -969,32 +870,27 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.MsgGroupIdConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... msg_group_id_config = MsgGroupIdConfig() class MaxIdleTime(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_idle_time = MaxIdleTime() class MessageTtl(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... message_ttl = MessageTtl() class MaxDeliveryAttempts(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_delivery_attempts = MaxDeliveryAttempts() class DeduplicationTimeMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... deduplication_time_ms = DeduplicationTimeMs() @@ -1005,8 +901,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.QueueConsistencyEventual, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... eventual = Eventual() @@ -1016,23 +911,20 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbconf.QueueConsistencyStrong, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... strong = Strong() def __call__( self, value: typing.Union[blazingmq.schemas.mqbconf.Consistency, NoneType], - ) -> Callable: - ... + ) -> Callable: ... consistency = Consistency() class Subscriptions(metaclass=TweakMetaclass): class AppId(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... app_id = AppId() @@ -1040,53 +932,47 @@ class Expression(metaclass=TweakMetaclass): class Version(metaclass=TweakMetaclass): def __call__( self, value: blazingmq.schemas.mqbconf.ExpressionVersion - ) -> Callable: - ... + ) -> Callable: ... version = Version() class Text(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__( + self, value: typing.Union[str, NoneType] + ) -> Callable: ... text = Text() def __call__( self, value: typing.Union[blazingmq.schemas.mqbconf.Expression, NoneType], - ) -> Callable: - ... + ) -> Callable: ... expression = Expression() - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... subscriptions = Subscriptions() class Cluster: class Name(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... name = Name() class Nodes(metaclass=TweakMetaclass): class Id(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... id = Id() class Name(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... name = Name() class DataCenter(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... data_center = DataCenter() @@ -1095,8 +981,7 @@ class Tcp(metaclass=TweakMetaclass): class Endpoint(metaclass=TweakMetaclass): def __call__( self, value: typing.Union[str, NoneType] - ) -> Callable: - ... + ) -> Callable: ... endpoint = Endpoint() @@ -1105,8 +990,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.TcpClusterNodeConnection, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... tcp = Tcp() @@ -1115,129 +999,108 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.ClusterNodeConnection, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... transport = Transport() - def __call__(self, value: None) -> Callable: - ... + def __call__(self, value: None) -> Callable: ... nodes = Nodes() class PartitionConfig(metaclass=TweakMetaclass): class NumPartitions(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... num_partitions = NumPartitions() class Location(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... location = Location() class ArchiveLocation(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[str, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[str, NoneType]) -> Callable: ... archive_location = ArchiveLocation() class MaxDataFileSize(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... max_data_file_size = MaxDataFileSize() class MaxJournalFileSize(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... max_journal_file_size = MaxJournalFileSize() class MaxQlistFileSize(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... max_qlist_file_size = MaxQlistFileSize() class Preallocate(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... preallocate = Preallocate() class MaxArchivedFileSets(metaclass=TweakMetaclass): - def __call__(self, value: typing.Union[int, NoneType]) -> Callable: - ... + def __call__(self, value: typing.Union[int, NoneType]) -> Callable: ... max_archived_file_sets = MaxArchivedFileSets() class PrefaultPages(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... prefault_pages = PrefaultPages() class FlushAtShutdown(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... flush_at_shutdown = FlushAtShutdown() class SyncConfig(metaclass=TweakMetaclass): class StartupRecoveryMaxDurationMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... startup_recovery_max_duration_ms = StartupRecoveryMaxDurationMs() class MaxAttemptsStorageSync(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_attempts_storage_sync = MaxAttemptsStorageSync() class StorageSyncReqTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... storage_sync_req_timeout_ms = StorageSyncReqTimeoutMs() class MasterSyncMaxDurationMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... master_sync_max_duration_ms = MasterSyncMaxDurationMs() class PartitionSyncStateReqTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... partition_sync_state_req_timeout_ms = PartitionSyncStateReqTimeoutMs() class PartitionSyncDataReqTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... partition_sync_data_req_timeout_ms = PartitionSyncDataReqTimeoutMs() class StartupWaitDurationMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... startup_wait_duration_ms = StartupWaitDurationMs() class FileChunkSize(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... file_chunk_size = FileChunkSize() class PartitionSyncEventSize(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... partition_sync_event_size = PartitionSyncEventSize() @@ -1246,16 +1109,14 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.StorageSyncConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... sync_config = SyncConfig() def __call__( self, value: typing.Union[blazingmq.schemas.mqbcfg.PartitionConfig, NoneType], - ) -> Callable: - ... + ) -> Callable: ... partition_config = PartitionConfig() @@ -1265,144 +1126,121 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.MasterAssignmentAlgorithm, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... master_assignment = MasterAssignment() class Elector(metaclass=TweakMetaclass): class InitialWaitTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... initial_wait_timeout_ms = InitialWaitTimeoutMs() class MaxRandomWaitTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_random_wait_timeout_ms = MaxRandomWaitTimeoutMs() class ScoutingResultTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... scouting_result_timeout_ms = ScoutingResultTimeoutMs() class ElectionResultTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... election_result_timeout_ms = ElectionResultTimeoutMs() class HeartbeatBroadcastPeriodMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... heartbeat_broadcast_period_ms = HeartbeatBroadcastPeriodMs() class HeartbeatCheckPeriodMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... heartbeat_check_period_ms = HeartbeatCheckPeriodMs() class HeartbeatMissCount(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... heartbeat_miss_count = HeartbeatMissCount() class Quorum(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... quorum = Quorum() class LeaderSyncDelayMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... leader_sync_delay_ms = LeaderSyncDelayMs() def __call__( self, value: typing.Union[blazingmq.schemas.mqbcfg.ElectorConfig, NoneType], - ) -> Callable: - ... + ) -> Callable: ... elector = Elector() class QueueOperations(metaclass=TweakMetaclass): class OpenTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... open_timeout_ms = OpenTimeoutMs() class ConfigureTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... configure_timeout_ms = ConfigureTimeoutMs() class CloseTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... close_timeout_ms = CloseTimeoutMs() class ReopenTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... reopen_timeout_ms = ReopenTimeoutMs() class ReopenRetryIntervalMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... reopen_retry_interval_ms = ReopenRetryIntervalMs() class ReopenMaxAttempts(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... reopen_max_attempts = ReopenMaxAttempts() class AssignmentTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... assignment_timeout_ms = AssignmentTimeoutMs() class KeepaliveDurationMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... keepalive_duration_ms = KeepaliveDurationMs() class ConsumptionMonitorPeriodMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... consumption_monitor_period_ms = ConsumptionMonitorPeriodMs() class StopTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... stop_timeout_ms = StopTimeoutMs() class ShutdownTimeoutMs(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... shutdown_timeout_ms = ShutdownTimeoutMs() class AckWindowSize(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... ack_window_size = AckWindowSize() @@ -1411,21 +1249,18 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.QueueOperationsConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... queue_operations = QueueOperations() class ClusterAttributes(metaclass=TweakMetaclass): class IsCslmodeEnabled(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... is_cslmode_enabled = IsCslmodeEnabled() class IsFsmworkflow(metaclass=TweakMetaclass): - def __call__(self, value: bool) -> Callable: - ... + def __call__(self, value: bool) -> Callable: ... is_fsmworkflow = IsFsmworkflow() @@ -1434,57 +1269,48 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.ClusterAttributes, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... cluster_attributes = ClusterAttributes() class ClusterMonitorConfig(metaclass=TweakMetaclass): class MaxTimeLeader(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_time_leader = MaxTimeLeader() class MaxTimeMaster(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_time_master = MaxTimeMaster() class MaxTimeNode(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_time_node = MaxTimeNode() class MaxTimeFailover(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... max_time_failover = MaxTimeFailover() class ThresholdLeader(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... threshold_leader = ThresholdLeader() class ThresholdMaster(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... threshold_master = ThresholdMaster() class ThresholdNode(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... threshold_node = ThresholdNode() class ThresholdFailover(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... threshold_failover = ThresholdFailover() @@ -1493,33 +1319,28 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.ClusterMonitorConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... cluster_monitor_config = ClusterMonitorConfig() class MessageThrottleConfig(metaclass=TweakMetaclass): class LowThreshold(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... low_threshold = LowThreshold() class HighThreshold(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... high_threshold = HighThreshold() class LowInterval(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... low_interval = LowInterval() class HighInterval(metaclass=TweakMetaclass): - def __call__(self, value: int) -> Callable: - ... + def __call__(self, value: int) -> Callable: ... high_interval = HighInterval() @@ -1528,8 +1349,7 @@ def __call__( value: typing.Union[ blazingmq.schemas.mqbcfg.MessageThrottleConfig, NoneType ], - ) -> Callable: - ... + ) -> Callable: ... message_throttle_config = MessageThrottleConfig()