Skip to content

Commit

Permalink
Add filters support
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Ivanov <[email protected]>
  • Loading branch information
alexander-e1off committed Dec 12, 2023
1 parent d9224b5 commit a98f3b0
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// m_bmqstoragetool_commandprocessor.h -*-C++-*-
#ifndef INCLUDED_M_BMQSTORAGETOOL_COMMANDPROCESSOR
#define INCLUDED_M_BMQSTORAGETOOL_COMMANDPROCESSOR

// bmqstoragetool
#include <m_bmqstoragetool_parameters.h>

Expand Down Expand Up @@ -48,3 +52,5 @@ inline CommandProcessor::CommandProcessor(

} // close package namespace
} // close enterprise namespace

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// m_bmqstoragetool_commandprocessorfactory.h -*-C++-*-
#ifndef INCLUDED_M_BMQSTORAGETOOL_COMMANDPROCESSORFACTORY
#define INCLUDED_M_BMQSTORAGETOOL_COMMANDPROCESSORFACTORY

// bmqstoragetool
#include <m_bmqstoragetool_parameters.h>
#include <m_bmqstoragetool_searchprocessor.h>
Expand All @@ -36,3 +40,5 @@ class CommandProcessorFactory {

} // close package namespace
} // close enterprise namespace

#endif
56 changes: 56 additions & 0 deletions src/applications/bmqstoragetool/m_bmqstoragetool_filters.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// m_bmqstoragetool_filters.cpp -*-C++-*-

#include <m_bmqstoragetool_filters.h>

namespace BloombergLP {
namespace m_bmqstoragetool {

// =====================
// class Filters
// =====================

Filters::Filters(const bsl::vector<bsl::string>& queueHexKeys,
const bsl::vector<bsl::string>& queueURIS,
bslma::Allocator* allocator)
: d_queueKeys(allocator)
{
if (!queueHexKeys.empty()) {
for (auto& key : queueHexKeys) {
d_queueKeys.push_back(
mqbu::StorageKey(mqbu::StorageKey::HexRepresentation(),
key.c_str()));
}
}
}

bool Filters::apply(const mqbs::MessageRecord& record)
{
if (!d_queueKeys.empty())
// Match by queueKey
if (auto it = bsl::find(d_queueKeys.begin(),
d_queueKeys.end(),
record.queueKey());
it == d_queueKeys.end()) {
// Not matched
return false; // RETURN
}
return true;
}

} // close package namespace
} // close enterprise namespace
51 changes: 51 additions & 0 deletions src/applications/bmqstoragetool/m_bmqstoragetool_filters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// m_bmqstoragetool_filters.h -*-C++-*-
#ifndef INCLUDED_M_BMQSTORAGETOOL_FILTERS
#define INCLUDED_M_BMQSTORAGETOOL_FILTERS

#include <m_bmqstoragetool_parameters.h>

// MQB
#include <mqbu_storagekey.h>

namespace BloombergLP {
namespace m_bmqstoragetool {

// =====================
// class Filters
// =====================

class Filters {
private:
// DATA
bsl::vector<mqbu::StorageKey> d_queueKeys;

public:
// CREATORS
explicit Filters(const bsl::vector<bsl::string>& queueHexKeys,
const bsl::vector<bsl::string>& queueURIS,
bslma::Allocator* allocator);

// MANIPULATORS
bool apply(const mqbs::MessageRecord& record);
// Apply filters and return true if filter matched, false otherwise.
};

} // close package namespace
} // close enterprise namespace

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// m_bmqstoragetool_searchprocessor.cpp -*-C++-*-

// bmqstoragetool
#include <m_bmqstoragetool_searchprocessor.h>
#include <m_bmqstoragetool_searchresult.h>
Expand All @@ -37,58 +39,6 @@
namespace BloombergLP {
namespace m_bmqstoragetool {

namespace {

// TODO: remove
template <typename ITER>
bool resetIterator(mqbs::MappedFileDescriptor* mfd,
ITER* iter,
const char* filename,
bsl::ostream& errorDescription)
{
if (!bdls::FilesystemUtil::isRegularFile(filename)) {
errorDescription << "File [" << filename << "] is not a regular file.";
return false; // RETURN
}

// 1) Open
mwcu::MemOutStream errorDesc;
int rc = mqbs::FileSystemUtil::open(
mfd,
filename,
bdls::FilesystemUtil::getFileSize(filename),
true, // read only
errorDesc);
if (0 != rc) {
errorDescription << "Failed to open file [" << filename
<< "] rc: " << rc << ", error: " << errorDesc.str();
return false; // RETURN
}

// 2) Basic sanity check
rc = mqbs::FileStoreProtocolUtil::hasBmqHeader(*mfd);
if (0 != rc) {
errorDescription << "Missing BlazingMQ header from file [" << filename
<< "] rc: " << rc;
mqbs::FileSystemUtil::close(mfd);
return false; // RETURN
}

// 3) Load iterator and check
rc = iter->reset(mfd, mqbs::FileStoreProtocolUtil::bmqHeader(*mfd));
if (0 != rc) {
errorDescription << "Failed to create iterator for file [" << filename
<< "] rc: " << rc;
mqbs::FileSystemUtil::close(mfd);
return false; // RETURN
}

BSLS_ASSERT_OPT(iter->isValid());
return true; // RETURN
}

} // close unnamed namespace

// =====================
// class SearchProcessor
// =====================
Expand All @@ -105,6 +55,10 @@ SearchProcessor::SearchProcessor(const bsl::shared_ptr<Parameters>& params,

void SearchProcessor::process(bsl::ostream& ostream)
{
Filters filters(d_parameters->queueKey(),
d_parameters->queueName(),
d_allocator_p);

// TODO: why unique_ptr doesn't support deleter in reset()
// bsl::unique_ptr<SearchResult> searchResult_p;
bsl::shared_ptr<SearchResult> searchResult_p;
Expand All @@ -113,20 +67,23 @@ void SearchProcessor::process(bsl::ostream& ostream)
SearchGuidResult(ostream,
d_parameters->details(),
d_parameters->guid(),
filters,
d_allocator_p),
d_allocator_p);
}
else if (d_parameters->outstanding()) {
searchResult_p.reset(new (*d_allocator_p) SearchOutstandingResult(
ostream,
d_parameters->details(),
filters,
d_allocator_p),
d_allocator_p);
}
else if (d_parameters->confirmed()) {
searchResult_p.reset(new (*d_allocator_p)
SearchConfirmedResult(ostream,
d_parameters->details(),
filters,
d_allocator_p),
d_allocator_p);
}
Expand All @@ -135,13 +92,15 @@ void SearchProcessor::process(bsl::ostream& ostream)
new (*d_allocator_p)
SearchPartiallyConfirmedResult(ostream,
d_parameters->details(),
filters,
d_allocator_p),
d_allocator_p);
}
else {
searchResult_p.reset(new (*d_allocator_p)
SearchAllResult(ostream,
d_parameters->details(),
filters,
d_allocator_p),
d_allocator_p);
}
Expand All @@ -164,8 +123,8 @@ void SearchProcessor::process(bsl::ostream& ostream)
}
// MessageRecord
else if (iter->recordType() == mqbs::RecordType::e_MESSAGE) {
const mqbs::MessageRecord& message = iter->asMessageRecord();
stopSearch = searchResult_p->processMessageRecord(message);
const mqbs::MessageRecord& record = iter->asMessageRecord();
stopSearch = searchResult_p->processMessageRecord(record);
}
// ConfirmRecord
else if (iter->recordType() == mqbs::RecordType::e_CONFIRM) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// m_bmqstoragetool_searchprocessor.h -*-C++-*-
#ifndef INCLUDED_M_BMQSTORAGETOOL_SEARCHPROCESSOR
#define INCLUDED_M_BMQSTORAGETOOL_SEARCHPROCESSOR

// bmqstoragetool
#include <m_bmqstoragetool_commandprocessor.h>

Expand All @@ -21,7 +25,6 @@
#include <mqbs_filestoreprotocol.h>
#include <mqbs_journalfileiterator.h>
#include <mqbs_mappedfiledescriptor.h>
// #include <mqbu_storagekey.h>

// BDE
#include <bsls_keyword.h>
Expand Down Expand Up @@ -51,3 +54,5 @@ class SearchProcessor : public CommandProcessor {

} // close package namespace
} // close enterprise namespace

#endif
Loading

0 comments on commit a98f3b0

Please sign in to comment.