Skip to content

Commit

Permalink
Revert "Revert "Feat[MQB]: Admin API Routing"" (#419)
Browse files Browse the repository at this point in the history
* Revert "Revert "Admin API Routing (#352)" (#407)"

This reverts commit d526712.

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqba, mqbblp: Fix Solaris build

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

---------

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored Sep 17, 2024
1 parent 5aaf596 commit 3f35614
Show file tree
Hide file tree
Showing 36 changed files with 13,781 additions and 8,362 deletions.
5 changes: 3 additions & 2 deletions src/groups/mqb/mqba/mqba_adminsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ void AdminSession::enqueueAdminCommand(
&AdminSession::onProcessedAdminCommand,
d_self.acquireWeak()),
adminCommandCtrlMsg,
bdlf::PlaceHolders::_1, // rc
bdlf::PlaceHolders::_2)); // commandExecResults
bdlf::PlaceHolders::_1, // rc
bdlf::PlaceHolders::_2), // commandExecResults
false); // fromReroute
}

// CREATORS
Expand Down
312 changes: 233 additions & 79 deletions src/groups/mqb/mqba/mqba_application.cpp

Large diffs are not rendered by default.

59 changes: 47 additions & 12 deletions src/groups/mqb/mqba/mqba_application.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2014-2023 Bloomberg Finance L.P.
// Copyright 2014-2024 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -27,8 +27,10 @@
// used by the BlazingMQ broker.

// MQB

#include <mqba_commandrouter.h>
#include <mqbcmd_messages.h>
#include <mqbconfm_messages.h>
#include <mqbi_cluster.h>

// MWC
#include <mwcma_countingallocatorstore.h>
Expand Down Expand Up @@ -121,6 +123,15 @@ class Application {
// Thread pool for admin commands
// execution.

bdlmt::ThreadPool d_adminRerouteExecutionPool;
// Thread pool for routed admin commands execution.
// Ensuring rerouted commands always execute on their
// own dedicated thread prevents a case where two nodes
// are simultaneously waiting for each other to process
// a routed command, but cannot make process because
// the calling thread is blocked ("deadlock").
// Note that rerouted commands never route again.

bdlbb::PooledBlobBufferFactory d_bufferFactory;

BlobSpPool d_blobSpPool;
Expand Down Expand Up @@ -194,27 +205,51 @@ class Application {
/// Stop the application.
void stop();

/// Process the command in the specified `cmd` coming from the specified
/// `source`, and write the result of the command in the specified `os`.
/// Process the command `cmd` coming from the specified `source`, and write
/// the result of the command in the given output stream, `os`.
/// Mark `fromReroute` as true if executing the command from a reroute to
/// ensure proper routing logic. Returns 0 on success, -1 on early exit,
/// -2 on error, and some non-zero error code on parse failure.
int processCommand(const bslstl::StringRef& source,
const bsl::string& cmd,
bsl::ostream& os);

/// Process the command in the specified `cmd` coming from the specified
/// `source`, and send the result of the command in the specified
/// `onProcessedCb`.
bsl::ostream& os,
bool fromReroute = false);

/// Process the command `cmd` coming from the specified `source` node, and
/// send the result of the command in the given `onProcessedCb`. Mark
/// `fromReroute` as true if executing command from a reroute to ensure
/// proper routing logic. Returns the error code of calling
/// `processCommand` with the given `cmd`, `source`, and `fromReroute`.
int processCommandCb(
const bslstl::StringRef& source,
const bsl::string& cmd,
const bsl::function<void(int, const bsl::string&)>& onProcessedCb);
const bsl::function<void(int, const bsl::string&)>& onProcessedCb,
bool fromReroute = false);

/// Enqueue for execution the command in the specified `cmd` coming from
/// the specified `source`. The specified `onProcessedCb` callback is
/// used to send result of the command after execution.
/// used to send result of the command after execution. Mark `fromReroute`
/// as true if executing command from a reroute to ensure proper routing
/// logic.
int enqueueCommand(
const bslstl::StringRef& source,
const bsl::string& cmd,
const bsl::function<void(int, const bsl::string&)>& onProcessedCb);
const bsl::function<void(int, const bsl::string&)>& onProcessedCb,
bool fromReroute = false);

private:
/// Returns a pointer to the cluster instance that the given `command`
/// needs to execute for. Fails when the given command does not have a
/// cluster associated with it or the cluster cannot be found. On failure,
/// this function returns a nullptr and populates `errorDescription` with
/// a reason.
mqbi::Cluster* getRelevantCluster(bsl::ostream& errorDescription,
const mqbcmd::Command& command) const;

/// Executes the logic of the given `command` and outputs the result in
/// `cmdResult`. Returns 0 on success and -1 on early exit
int executeCommand(const mqbcmd::Command& command,
mqbcmd::InternalResult* cmdResult);
};

} // close package namespace
Expand Down
Loading

0 comments on commit 3f35614

Please sign in to comment.