diff --git a/CMakeLists.txt b/CMakeLists.txt index dd5cfcd4b..752441358 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,7 +20,7 @@ endif() set( PERSISTENT_NAMESPACE "eckit" CACHE INTERNAL "" ) # needed for generating .b files for persistent support -ecbuild_find_package( NAME eckit VERSION 1.24.4 REQUIRED ) +ecbuild_find_package( NAME eckit VERSION 1.28.3 REQUIRED ) ### GRIB support @@ -31,7 +31,7 @@ ecbuild_add_option( FEATURE GRIB CONDITION eccodes_FOUND DESCRIPTION "Support for GRIB via eccodes") -ecbuild_find_package( NAME metkit VERSION 1.5 REQUIRED ) +ecbuild_find_package( NAME metkit VERSION 1.11.22 REQUIRED ) ### FDB backend in CEPH object store (using Rados) diff --git a/VERSION b/VERSION index bae891de5..4f62c2ace 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.13.104 +5.13.105 \ No newline at end of file diff --git a/src/fdb5/api/local/AxesVisitor.h b/src/fdb5/api/local/AxesVisitor.h index 0ee6dbc46..c4eaf5392 100644 --- a/src/fdb5/api/local/AxesVisitor.h +++ b/src/fdb5/api/local/AxesVisitor.h @@ -40,7 +40,7 @@ class AxesVisitor : public QueryVisitor { // bool preVisitDatabase(const eckit::URI& uri) override; bool visitDatabase(const Catalogue& catalogue) override; bool visitIndex(const Index&) override; - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } private: // members diff --git a/src/fdb5/api/local/ControlVisitor.h b/src/fdb5/api/local/ControlVisitor.h index 72b28795f..2d5bd44df 100644 --- a/src/fdb5/api/local/ControlVisitor.h +++ b/src/fdb5/api/local/ControlVisitor.h @@ -40,7 +40,7 @@ class ControlVisitor : public QueryVisitor { bool visitDatabase(const Catalogue& catalogue) override; // bool visitDatabase(const Catalogue& catalogue, const Store& store) override; bool visitIndex(const Index&) override { NOTIMP; } - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } private: // members diff --git a/src/fdb5/api/local/DumpVisitor.h b/src/fdb5/api/local/DumpVisitor.h index 9fe951049..417520ff3 100644 --- a/src/fdb5/api/local/DumpVisitor.h +++ b/src/fdb5/api/local/DumpVisitor.h @@ -50,7 +50,7 @@ class DumpVisitor : public QueryVisitor { return true; } bool visitIndex(const Index&) override { NOTIMP; } - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } void visitDatum(const Field& field, const std::string& keyFingerprint) override { EntryVisitor::visitDatum(field, keyFingerprint); diff --git a/src/fdb5/api/local/ListVisitor.h b/src/fdb5/api/local/ListVisitor.h index 0f98c2b45..ec0a0e790 100644 --- a/src/fdb5/api/local/ListVisitor.h +++ b/src/fdb5/api/local/ListVisitor.h @@ -22,9 +22,12 @@ #include "fdb5/database/Catalogue.h" #include "fdb5/database/Index.h" #include "fdb5/database/Key.h" +#include "fdb5/rules/Rule.h" #include "fdb5/api/local/QueryVisitor.h" #include "fdb5/api/helpers/ListIterator.h" +#include "metkit/mars/MarsRequest.h" + namespace fdb5 { namespace api { namespace local { @@ -69,25 +72,31 @@ struct ListVisitor : public QueryVisitor { bool visitIndex(const Index& index) override { QueryVisitor::visitIndex(index); - // Subselect the parts of the request - datumRequest_ = indexRequest_; - for (const auto& kv : index.key()) { - datumRequest_.unsetValues(kv.first); - } if (index.partialMatch(request_)) { + + // Subselect the parts of the request + datumRequest_ = indexRequest_; + for (const auto& kv : index.key()) { + datumRequest_.unsetValues(kv.first); + } + + // Take into account any rule-specific behaviour in the request + datumRequest_ = rule_->registry().canonicalise(datumRequest_); + return true; // Explore contained entries } + return false; // Skip contained entries } /// Test if entry matches the current request. If so, add to the output queue. - void visitDatum(const Field& field, const TypedKey& datumKey) override { + void visitDatum(const Field& field, const Key& datumKey) override { ASSERT(currentCatalogue_); ASSERT(currentIndex_); if (datumKey.match(datumRequest_)) { - queue_.emplace(ListElement({currentCatalogue_->key(), currentIndex_->key(), datumKey.canonical()}, field.stableLocation(), field.timestamp())); + queue_.emplace(ListElement({currentCatalogue_->key(), currentIndex_->key(), datumKey}, field.stableLocation(), field.timestamp())); } } diff --git a/src/fdb5/api/local/MoveVisitor.h b/src/fdb5/api/local/MoveVisitor.h index dbd78bd85..75ea7f854 100644 --- a/src/fdb5/api/local/MoveVisitor.h +++ b/src/fdb5/api/local/MoveVisitor.h @@ -45,7 +45,7 @@ class MoveVisitor : public QueryVisitor { // bool visitDatabase(const Catalogue& catalogue, const Store& store) override; bool visitDatabase(const Catalogue& catalogue) override; bool visitIndex(const Index&) override { NOTIMP; } - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } void visitDatum(const Field& field, const std::string& keyFingerprint) override { NOTIMP; } private: // members diff --git a/src/fdb5/api/local/PurgeVisitor.h b/src/fdb5/api/local/PurgeVisitor.h index 6f72a2251..0acdde7ff 100644 --- a/src/fdb5/api/local/PurgeVisitor.h +++ b/src/fdb5/api/local/PurgeVisitor.h @@ -48,7 +48,7 @@ class PurgeVisitor : public QueryVisitor { bool visitIndex(const Index& index) override; void catalogueComplete(const Catalogue& catalogue) override; void visitDatum(const Field& field, const std::string& keyFingerprint) override; - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } private: // members diff --git a/src/fdb5/api/local/StatsVisitor.h b/src/fdb5/api/local/StatsVisitor.h index 6beae2b89..6971238bb 100644 --- a/src/fdb5/api/local/StatsVisitor.h +++ b/src/fdb5/api/local/StatsVisitor.h @@ -43,7 +43,7 @@ class StatsVisitor : public QueryVisitor { bool visitIndex(const Index& index) override; void catalogueComplete(const Catalogue& catalogue) override; void visitDatum(const Field& field, const std::string& keyFingerprint) override; - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } private: // members diff --git a/src/fdb5/api/local/StatusVisitor.h b/src/fdb5/api/local/StatusVisitor.h index 92f775434..646f840b6 100644 --- a/src/fdb5/api/local/StatusVisitor.h +++ b/src/fdb5/api/local/StatusVisitor.h @@ -39,7 +39,7 @@ class StatusVisitor : public QueryVisitor { bool visitDatabase(const Catalogue& catalogue) override { queue_.emplace(catalogue); return true; } // bool visitDatabase(const Catalogue& catalogue, const Store& store) override { queue_.emplace(catalogue); return true; } bool visitIndex(const Index&) override { NOTIMP; } - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } void visitDatum(const Field& field, const std::string& keyFingerprint) override { EntryVisitor::visitDatum(field, keyFingerprint); diff --git a/src/fdb5/api/local/WipeVisitor.h b/src/fdb5/api/local/WipeVisitor.h index 4448af1f5..6ac93dc06 100644 --- a/src/fdb5/api/local/WipeVisitor.h +++ b/src/fdb5/api/local/WipeVisitor.h @@ -51,7 +51,7 @@ class WipeVisitor : public QueryVisitor { bool visitDatabase(const Catalogue& catalogue) override; bool visitIndex(const Index& index) override; void catalogueComplete(const Catalogue& catalogue) override; - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } void visitDatum(const Field& field, const std::string& keyFingerprint) override { NOTIMP; } void onDatabaseNotFound(const fdb5::DatabaseNotFoundException& e) override { throw e; } diff --git a/src/fdb5/daos/DaosCatalogue.cc b/src/fdb5/daos/DaosCatalogue.cc index 866a1cbc5..cd5597196 100644 --- a/src/fdb5/daos/DaosCatalogue.cc +++ b/src/fdb5/daos/DaosCatalogue.cc @@ -167,7 +167,7 @@ std::vector DaosCatalogue::indexes(bool) const { std::string DaosCatalogue::type() const { - return DaosCatalogue::catalogueTypeName(); + // return DaosCatalogue::catalogueTypeName(); } diff --git a/src/fdb5/daos/DaosCatalogue.h b/src/fdb5/daos/DaosCatalogue.h index 46f4f25a1..b882fa350 100644 --- a/src/fdb5/daos/DaosCatalogue.h +++ b/src/fdb5/daos/DaosCatalogue.h @@ -32,7 +32,7 @@ class DaosCatalogue : public CatalogueImpl, public DaosCommon { DaosCatalogue(const Key& key, const fdb5::Config& config); DaosCatalogue(const eckit::URI& uri, const ControlIdentifiers& controlIdentifiers, const fdb5::Config& config); - static const char* catalogueTypeName() { return fdb5::DaosEngine::typeName(); } + // static const char* catalogueTypeName() { return fdb5::DaosEngine::typeName(); } eckit::URI uri() const override; const Key& indexKey() const override { return currentIndexKey_; } diff --git a/src/fdb5/daos/DaosIndex.h b/src/fdb5/daos/DaosIndex.h index 072898b5d..df83ed079 100644 --- a/src/fdb5/daos/DaosIndex.h +++ b/src/fdb5/daos/DaosIndex.h @@ -41,8 +41,8 @@ class DaosIndex : public IndexBase { bool dirty() const override { NOTIMP; } - void open() override { NOTIMP; }; - void close() override { NOTIMP; } + void open() override { NOTIMP; } + void close() override {} void reopen() override { NOTIMP; } void visit(IndexLocationVisitor& visitor) const override { NOTIMP; } diff --git a/src/fdb5/database/Catalogue.cc b/src/fdb5/database/Catalogue.cc index e5e713e58..e51a2a6fb 100644 --- a/src/fdb5/database/Catalogue.cc +++ b/src/fdb5/database/Catalogue.cc @@ -13,6 +13,7 @@ #include "eckit/config/Resource.h" #include "eckit/exception/Exceptions.h" +#include "eckit/io/AutoCloser.h" #include "eckit/thread/AutoLock.h" #include "eckit/thread/Mutex.h" #include "eckit/utils/StringTools.h" @@ -37,11 +38,18 @@ void Catalogue::visitEntries(EntryVisitor& visitor /*, const Store& store*/, boo std::vector all = indexes(sorted); + // It is likely that many indexes in the same database share resources/files/etc. + // To prevent repeated opening/closing (especially where a PooledFile would facilitate things) + // pre-open the indexes, and keep them open + std::vector> closers; + closers.reserve(all.size()); + // Allow the visitor to selectively reject this DB. if (visitor.visitDatabase(*this)) { if (visitor.visitIndexes()) { - for (const Index& idx : all) { + for (Index& idx : all) { if (visitor.visitEntries()) { + closers.emplace_back(idx); idx.entries(visitor); // contains visitIndex } else { visitor.visitIndex(idx); diff --git a/src/fdb5/database/EntryVisitMechanism.cc b/src/fdb5/database/EntryVisitMechanism.cc index ccecc6c1c..2ebbad5c3 100644 --- a/src/fdb5/database/EntryVisitMechanism.cc +++ b/src/fdb5/database/EntryVisitMechanism.cc @@ -52,6 +52,8 @@ Store& EntryVisitor::store() const { bool EntryVisitor::visitDatabase(const Catalogue& catalogue) { currentCatalogue_ = &catalogue; currentStore_ = nullptr; + currentIndex_ = nullptr; + rule_ = nullptr; return true; } @@ -63,19 +65,20 @@ void EntryVisitor::catalogueComplete(const Catalogue& catalogue) { delete currentStore_; currentStore_ = nullptr; currentIndex_ = nullptr; + rule_ = nullptr; } bool EntryVisitor::visitIndex(const Index& index) { currentIndex_ = &index; + rule_ = currentCatalogue_->schema().ruleFor(currentCatalogue_->key(), currentIndex_->key()); return true; } void EntryVisitor::visitDatum(const Field& field, const std::string& keyFingerprint) { ASSERT(currentCatalogue_); ASSERT(currentIndex_); - const Rule* rule = currentCatalogue_->schema().ruleFor(currentCatalogue_->key(), currentIndex_->key()); - ASSERT(rule); - TypedKey key(keyFingerprint, *rule); + ASSERT(rule_); + Key key(keyFingerprint, *rule_); visitDatum(field, key); } diff --git a/src/fdb5/database/EntryVisitMechanism.h b/src/fdb5/database/EntryVisitMechanism.h index 21aff6dc9..1ff68bdd7 100644 --- a/src/fdb5/database/EntryVisitMechanism.h +++ b/src/fdb5/database/EntryVisitMechanism.h @@ -56,7 +56,7 @@ class EntryVisitor : public eckit::NonCopyable { private: // methods - virtual void visitDatum(const Field& field, const TypedKey& datumKey) = 0; + virtual void visitDatum(const Field& field, const Key& datumKey) = 0; protected: // members @@ -64,7 +64,7 @@ class EntryVisitor : public eckit::NonCopyable { const Catalogue* currentCatalogue_ = nullptr; mutable Store* currentStore_ = nullptr; const Index* currentIndex_ = nullptr; - + const Rule* rule_ = nullptr; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/database/Key.cc b/src/fdb5/database/Key.cc index 74908f97c..0d6d548b2 100644 --- a/src/fdb5/database/Key.cc +++ b/src/fdb5/database/Key.cc @@ -28,6 +28,14 @@ namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- +BaseKey::BaseKey(const std::string& fingerprint, const Rule& rule) { + eckit::Tokenizer parse(":", true); + eckit::StringList values; + parse(fingerprint, values); + + rule.fill(*this, values); +} + void BaseKey::decode(eckit::Stream& s) { keys_.clear(); @@ -45,6 +53,7 @@ void BaseKey::decode(eckit::Stream& s) { } s >> n; + names_.reserve(n); for (size_t i = 0; i < n; ++i) { s >> k; s >> v; // this is the type (ignoring FTM) @@ -148,7 +157,7 @@ bool BaseKey::match(const metkit::mars::MarsRequest& request) const { } bool found=false; - auto values = request.values(k); + const auto& values = request.values(k); std::string can = canonicalise(k, j->second); for (auto it = values.cbegin(); !found && it != values.cend(); it++) { found = can == canonicalise(k, *it); @@ -307,6 +316,9 @@ Key::Key(eckit::Stream& s) { Key::Key(std::initializer_list> l) : BaseKey(l) {} +Key::Key(const std::string& fingerprint, const Rule& rule) : + BaseKey(fingerprint, rule) {} + Key Key::parseString(const std::string& s) { eckit::Tokenizer parse1(","); @@ -342,13 +354,8 @@ TypedKey::TypedKey(const TypesRegistry& reg) : registry_(std::cref(reg)) {} TypedKey::TypedKey(const std::string &s, const Rule& rule) : + BaseKey(s, rule), registry_(std::cref(rule.registry())) { - - eckit::Tokenizer parse(":", true); - eckit::StringList values; - parse(s, values); - - rule.fill(*this, values); } TypedKey::TypedKey(const eckit::StringDict &keys, const TypesRegistry& reg) : diff --git a/src/fdb5/database/Key.h b/src/fdb5/database/Key.h index 612a91049..058e7c0b6 100644 --- a/src/fdb5/database/Key.h +++ b/src/fdb5/database/Key.h @@ -61,6 +61,7 @@ class BaseKey { names_.emplace_back(k.first); } } + BaseKey(const std::string& fingerprint, const Rule& rule); virtual ~BaseKey() = default; @@ -167,6 +168,7 @@ class Key : public BaseKey { explicit Key() = default; explicit Key(eckit::Stream &); explicit Key(const eckit::StringDict &keys); + explicit Key(const std::string& fingerprint, const Rule& rule); Key(std::initializer_list>); static Key parseString(const std::string& s); diff --git a/src/fdb5/database/MoveVisitor.h b/src/fdb5/database/MoveVisitor.h index a27e3f7b8..e6efd693d 100644 --- a/src/fdb5/database/MoveVisitor.h +++ b/src/fdb5/database/MoveVisitor.h @@ -38,7 +38,7 @@ class MoveVisitor : public EntryVisitor { bool visitEntries() override { return false; } bool visitIndex(const Index&) override { NOTIMP; } - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } void visitDatum(const Field& /*field*/, const std::string& /*keyFingerprint*/) override { NOTIMP; } protected: // members diff --git a/src/fdb5/database/WipeVisitor.h b/src/fdb5/database/WipeVisitor.h index 4df6e7c22..272e15931 100644 --- a/src/fdb5/database/WipeVisitor.h +++ b/src/fdb5/database/WipeVisitor.h @@ -37,7 +37,7 @@ class WipeVisitor : public EntryVisitor { ~WipeVisitor() override; bool visitEntries() override { return false; } - void visitDatum(const Field&, const TypedKey&) override { NOTIMP; } + void visitDatum(const Field&, const Key&) override { NOTIMP; } void visitDatum(const Field& /*field*/, const std::string& /*keyFingerprint*/) override { NOTIMP; } protected: // members diff --git a/src/fdb5/rules/MatchOptional.cc b/src/fdb5/rules/MatchOptional.cc index 53e73580b..8f90170b2 100644 --- a/src/fdb5/rules/MatchOptional.cc +++ b/src/fdb5/rules/MatchOptional.cc @@ -65,7 +65,7 @@ bool MatchOptional::optional() const { return true; } -void MatchOptional::fill(TypedKey& key, const std::string &keyword, const std::string& value) const { +void MatchOptional::fill(BaseKey& key, const std::string &keyword, const std::string& value) const { if (!value.empty()) { key.push(keyword, value); } diff --git a/src/fdb5/rules/MatchOptional.h b/src/fdb5/rules/MatchOptional.h index e96eaa9f8..1d7a6a884 100644 --- a/src/fdb5/rules/MatchOptional.h +++ b/src/fdb5/rules/MatchOptional.h @@ -50,7 +50,7 @@ class MatchOptional : public Matcher{ const std::vector& values(const metkit::mars::MarsRequest& rq, const std::string& keyword) const override; void print( std::ostream& out ) const override; const std::string& defaultValue() const override; - void fill(TypedKey& key, const std::string& keyword, const std::string& value) const override; + void fill(BaseKey& key, const std::string& keyword, const std::string& value) const override; private: // members diff --git a/src/fdb5/rules/Matcher.cc b/src/fdb5/rules/Matcher.cc index b3720255e..7ed3d492a 100644 --- a/src/fdb5/rules/Matcher.cc +++ b/src/fdb5/rules/Matcher.cc @@ -48,7 +48,7 @@ const std::vector &Matcher::values(const metkit::mars::MarsRequest& return rq.values(keyword); } -void Matcher::fill(TypedKey& key, const std::string &keyword, const std::string& value) const { +void Matcher::fill(BaseKey& key, const std::string &keyword, const std::string& value) const { key.push(keyword, value); } diff --git a/src/fdb5/rules/Matcher.h b/src/fdb5/rules/Matcher.h index deee98e65..3628df682 100644 --- a/src/fdb5/rules/Matcher.h +++ b/src/fdb5/rules/Matcher.h @@ -31,8 +31,7 @@ namespace mars { namespace fdb5 { -class Key; -class TypedKey; +class BaseKey; class Key; class TypesRegistry; @@ -54,7 +53,7 @@ class Matcher : public eckit::Streamable { virtual const std::string &defaultValue() const; virtual bool match(const std::string &keyword, const Key& key) const = 0; - virtual void fill(TypedKey& key, const std::string &keyword, const std::string& value) const; + virtual void fill(BaseKey& key, const std::string &keyword, const std::string& value) const; virtual void dump(std::ostream &s, const std::string &keyword, const TypesRegistry ®istry) const = 0; diff --git a/src/fdb5/rules/Predicate.cc b/src/fdb5/rules/Predicate.cc index 1dacc40af..6d7b08ad6 100644 --- a/src/fdb5/rules/Predicate.cc +++ b/src/fdb5/rules/Predicate.cc @@ -68,7 +68,7 @@ const std::vector& Predicate::values(const metkit::mars::MarsReques return matcher_->values(rq, keyword_); } -void Predicate::fill(TypedKey& key, const std::string& value) const { +void Predicate::fill(BaseKey& key, const std::string& value) const { matcher_->fill(key, keyword_, value); } diff --git a/src/fdb5/rules/Predicate.h b/src/fdb5/rules/Predicate.h index d9acc7768..84ae390e1 100644 --- a/src/fdb5/rules/Predicate.h +++ b/src/fdb5/rules/Predicate.h @@ -28,7 +28,7 @@ namespace metkit { class MarsRequest; } namespace fdb5 { class Key; -class TypedKey; +class BaseKey; class Matcher; class TypesRegistry; @@ -46,7 +46,7 @@ class Predicate : public eckit::Streamable { bool match(const Key& key) const; void dump( std::ostream &s, const TypesRegistry ®istry ) const; - void fill(TypedKey& key, const std::string& value) const; + void fill(BaseKey& key, const std::string& value) const; const std::string &value(const Key& key) const; const std::vector& values(const metkit::mars::MarsRequest& rq) const; diff --git a/src/fdb5/rules/Rule.cc b/src/fdb5/rules/Rule.cc index f872c38b0..9fb2cd9ec 100644 --- a/src/fdb5/rules/Rule.cc +++ b/src/fdb5/rules/Rule.cc @@ -436,7 +436,7 @@ const Rule* Rule::ruleFor(const std::vector &keys, size_t depth) cons return 0; } -void Rule::fill(TypedKey& key, const eckit::StringList& values) const { +void Rule::fill(BaseKey& key, const eckit::StringList& values) const { // See FDB-103. This is a hack to work around the indexing abstraction // being leaky. diff --git a/src/fdb5/rules/Rule.h b/src/fdb5/rules/Rule.h index 969cea223..300f9bd47 100644 --- a/src/fdb5/rules/Rule.h +++ b/src/fdb5/rules/Rule.h @@ -37,7 +37,7 @@ class Schema; class Predicate; class ReadVisitor; class WriteVisitor; -class Key; +class BaseKey; class Key; class TypedKey; @@ -78,7 +78,7 @@ class Rule : public eckit::Streamable { TypedKey& fullComputedKey) const; const Rule* ruleFor(const std::vector &keys, size_t depth) const; - void fill(TypedKey& key, const eckit::StringList& values) const; + void fill(BaseKey& key, const eckit::StringList& values) const; size_t depth() const; diff --git a/src/fdb5/toc/TocCatalogue.cc b/src/fdb5/toc/TocCatalogue.cc index 7b09ca4ab..ea824eb2c 100644 --- a/src/fdb5/toc/TocCatalogue.cc +++ b/src/fdb5/toc/TocCatalogue.cc @@ -71,10 +71,6 @@ const Schema& TocCatalogue::schema() const { return *schema_; } -const eckit::PathName& TocCatalogue::basePath() const { - return directory_; -} - std::vector TocCatalogue::metadataPaths() const { std::vector paths(subTocPaths()); diff --git a/src/fdb5/toc/TocCatalogue.h b/src/fdb5/toc/TocCatalogue.h index 4bbcb3398..ca89323c3 100644 --- a/src/fdb5/toc/TocCatalogue.h +++ b/src/fdb5/toc/TocCatalogue.h @@ -38,7 +38,6 @@ class TocCatalogue : public CatalogueImpl, public TocHandler { ~TocCatalogue() override {} - const eckit::PathName& basePath() const override; eckit::URI uri() const override; const Key& indexKey() const override { return currentIndexKey_; } diff --git a/src/fdb5/toc/TocCatalogueWriter.cc b/src/fdb5/toc/TocCatalogueWriter.cc index 87d84d9e2..306a6bc3c 100644 --- a/src/fdb5/toc/TocCatalogueWriter.cc +++ b/src/fdb5/toc/TocCatalogueWriter.cc @@ -155,11 +155,11 @@ void TocCatalogueWriter::reconsolidateIndexesAndTocs() { writer_(writer) {} ~ConsolidateIndexVisitor() override {} private: - void visitDatum(const Field& field, const TypedKey& datumKey) override { + void visitDatum(const Field& field, const Key& datumKey) override { // TODO: Do a sneaky schema.expand() here, prepopulated with the current DB/index/Rule, // to extract the full key, including optional values. const TocFieldLocation& location(static_cast(field.location())); - writer_.index(datumKey.canonical(), location.uri(), location.offset(), location.length()); + writer_.index(datumKey, location.uri(), location.offset(), location.length()); } void visitDatum(const Field& field, const std::string& keyFingerprint) override { diff --git a/src/fdb5/toc/TocCommon.cc b/src/fdb5/toc/TocCommon.cc index 25f230c5a..0bcdd373c 100644 --- a/src/fdb5/toc/TocCommon.cc +++ b/src/fdb5/toc/TocCommon.cc @@ -15,6 +15,7 @@ #include "eckit/config/Resource.h" #include "eckit/filesystem/URIManager.h" +#include "eckit/filesystem/LocalPathName.h" #include "eckit/log/Timer.h" #include "fdb5/LibFdb5.h" @@ -23,7 +24,7 @@ namespace fdb5 { -eckit::PathName TocCommon::findRealPath(const eckit::PathName& path) { +eckit::LocalPathName TocCommon::findRealPath(const eckit::LocalPathName& path) { // realpath only works on existing paths, so work back up the path until // we find one that does, get the realpath on that, then reconstruct. @@ -33,7 +34,8 @@ eckit::PathName TocCommon::findRealPath(const eckit::PathName& path) { } TocCommon::TocCommon(const eckit::PathName& directory) : - directory_(findRealPath(directory)), + directory_(findRealPath(eckit::LocalPathName{directory})), + schemaPath_(directory_ / "schema"), dbUID_(static_cast(-1)), userUID_(::getuid()) {} @@ -60,8 +62,12 @@ void TocCommon::checkUID() const { } uid_t TocCommon::dbUID() const { - if (dbUID_ == static_cast(-1)) - dbUID_ = directory_.owner(); + if (dbUID_ == static_cast(-1)) { + // TODO: Do properly in eckit + struct stat s; + SYSCALL(::stat(directory_.localPath(), &s)); + dbUID_ = s.st_uid; + } return dbUID_; } diff --git a/src/fdb5/toc/TocCommon.h b/src/fdb5/toc/TocCommon.h index 6d967207a..aa9db187d 100644 --- a/src/fdb5/toc/TocCommon.h +++ b/src/fdb5/toc/TocCommon.h @@ -15,6 +15,7 @@ #pragma once #include "eckit/filesystem/PathName.h" +#include "eckit/filesystem/LocalPathName.h" #include "eckit/io/FileHandle.h" #include "eckit/thread/ThreadPool.h" #include "eckit/serialisation/Streamable.h" @@ -31,12 +32,12 @@ class TocCommon { TocCommon(const eckit::PathName& path); virtual ~TocCommon() {} - static eckit::PathName findRealPath(const eckit::PathName& path); + static eckit::LocalPathName findRealPath(const eckit::LocalPathName& path); static std::string userName(uid_t uid); virtual void checkUID() const; - virtual const eckit::PathName& basePath() const { return directory_; } + virtual const eckit::LocalPathName& basePath() const { return directory_; } std::string owner() const { return userName(dbUID()); } @@ -46,7 +47,8 @@ class TocCommon { protected: // members - const eckit::PathName directory_; + const eckit::LocalPathName directory_; + const eckit::LocalPathName schemaPath_; mutable uid_t dbUID_; uid_t userUID_; diff --git a/src/fdb5/toc/TocEngine.cc b/src/fdb5/toc/TocEngine.cc index 9f686ebbe..1562baae3 100644 --- a/src/fdb5/toc/TocEngine.cc +++ b/src/fdb5/toc/TocEngine.cc @@ -42,28 +42,7 @@ namespace fdb5 { void TocEngine::scan_dbs(const std::string& path, std::list& dbs) const { - if ((eckit::PathName(path) / "toc").exists()) { - dbs.push_back(path); - return; - } - eckit::StdDir d(path.c_str()); - if (d == nullptr) { - // If fdb-wipe is running in parallel, it is perfectly legit for a (non-matching) - // path to have disappeared - if (errno == ENOENT) { - return; - } - - // It should not be an error if we don't have permission to read a path/DB in the - // tree. This is a multi-user system. - if (errno == EACCES) { - return; - } - - Log::error() << "opendir(" << path << ")" << Log::syserr << std::endl; - throw FailedSystemCall("opendir"); - } // Once readdir_r finally gets deprecated and removed, we may need to // protecting readdir() as not yet guarranteed thread-safe by POSIX @@ -92,7 +71,7 @@ void TocEngine::scan_dbs(const std::string& path, std::list& dbs) c #if defined(eckit_HAVE_DIRENT_D_TYPE) do_stat = false; if (e->d_type == DT_DIR) { - scan_dbs(full.c_str(), dbs); + dbs.push_back(full); } else if (e->d_type == DT_UNKNOWN) { do_stat = true; } @@ -102,7 +81,7 @@ void TocEngine::scan_dbs(const std::string& path, std::list& dbs) c if(eckit::Stat::stat(full.c_str(), &info) == 0) { if(S_ISDIR(info.st_mode)) { - scan_dbs(full.c_str(), dbs); + dbs.push_back(full); } } else Log::error() << "Cannot stat " << full << Log::syserr << std::endl; diff --git a/src/fdb5/toc/TocHandler.cc b/src/fdb5/toc/TocHandler.cc index 9f785d938..34ab8dfe3 100644 --- a/src/fdb5/toc/TocHandler.cc +++ b/src/fdb5/toc/TocHandler.cc @@ -33,6 +33,10 @@ #include "fdb5/api/helpers/ControlIterator.h" #include "fdb5/io/LustreSettings.h" +#if eckit_HAVE_AIO +#include +#endif + using namespace eckit; namespace fdb5 { @@ -78,8 +82,12 @@ class CachedFDProxy { ASSERT((fd != -1) != (!!cached)); } - long read(void* buf, long len) { + long read(void* buf, long len, const char** pdata=nullptr) { + if (pdata && !cached_) throw SeriousBug("Can only return a pointer to data in memory if cached", Here()); if (cached_) { + if (pdata) { + *pdata = reinterpret_cast(cached_->data()) + cached_->position(); + } return cached_->read(buf, len); } else { long ret; @@ -120,7 +128,6 @@ class CachedFDProxy { TocHandler::TocHandler(const eckit::PathName& directory, const Config& config) : TocCommon(directory), - schemaPath_(directory_ / "schema"), tocPath_(directory_ / "toc"), dbConfig_(config), serialisationVersion_(TocSerialisationVersion(config)), @@ -129,8 +136,10 @@ TocHandler::TocHandler(const eckit::PathName& directory, const Config& config) : preloadBTree_(config.userConfig().getBool("preloadTocBTree", true)), fd_(-1), cachedToc_(nullptr), + subTocRead_(nullptr), count_(0), enumeratedMaskedEntries_(false), + numSubtocsRaw_(0), writeMode_(false), dirty_(false) { @@ -142,22 +151,28 @@ TocHandler::TocHandler(const eckit::PathName& directory, const Config& config) : } } -TocHandler::TocHandler(const eckit::PathName& path, const Key& parentKey) : +TocHandler::TocHandler(const eckit::PathName& path, const Key& parentKey, MemoryHandle* cachedToc) : TocCommon(path.dirName()), parentKey_(parentKey), - schemaPath_(TocCommon::findRealPath(path) / "schema"), - tocPath_(TocCommon::findRealPath(path)), + tocPath_(TocCommon::findRealPath(eckit::LocalPathName{path})), serialisationVersion_(TocSerialisationVersion(dbConfig_)), useSubToc_(false), isSubToc_(true), preloadBTree_(false), fd_(-1), - cachedToc_(nullptr), + cachedToc_(cachedToc), + subTocRead_(nullptr), count_(0), enumeratedMaskedEntries_(false), + numSubtocsRaw_(0), writeMode_(false), dirty_(false) { + + if (cachedToc_) { + cachedToc_->openForRead(); + } + /// Are we remapping a mounted DB? if (exists()) { Key key(databaseKey()); @@ -278,6 +293,7 @@ void TocHandler::openForRead() const { // The masked subtocs and indexes could be updated each time, so reset this. enumeratedMaskedEntries_ = false; + numSubtocsRaw_ = 0; maskedEntries_.clear(); if(fdbCacheTocsOnRead) { @@ -371,35 +387,36 @@ size_t TocHandler::roundRecord(TocRecord &r, size_t payloadSize) { // readNext wraps readNextInternal. // readNext reads the next TOC entry from this toc, or from an appropriate subtoc if necessary. -bool TocHandler::readNext( TocRecord &r, bool walkSubTocs, bool hideSubTocEntries, bool hideClearEntries, bool readMasked) const { +bool TocHandler::readNext( TocRecord &r, bool walkSubTocs, bool hideSubTocEntries, bool hideClearEntries, bool readMasked, const TocRecord** data, size_t* length) const { int len; + // For some tools (mainly diagnostic) it makes sense to be able to switch the + // walking behaviour here. + + if (!walkSubTocs) + return readNextInternal(r, data, length); + // Ensure we are able to skip masked entries as appropriate if (!enumeratedMaskedEntries_) { populateMaskedEntriesList(); + preloadSubTocs(readMasked); } - // For some tools (mainly diagnostic) it makes sense to be able to switch the - // walking behaviour here. - - if (!walkSubTocs) - return readNextInternal(r); - while (true) { if (subTocRead_) { - len = subTocRead_->readNext(r, walkSubTocs, hideSubTocEntries, hideClearEntries, readMasked); + len = subTocRead_->readNext(r, walkSubTocs, hideSubTocEntries, hideClearEntries, readMasked, data, length); if (len == 0) { - subTocRead_.reset(); + subTocRead_ = nullptr; } else { ASSERT(r.header_.tag_ != TocRecord::TOC_SUB_TOC); return true; } } else { - if (!readNextInternal(r)) { + if (!readNextInternal(r, data, length)) { return false; @@ -411,46 +428,14 @@ bool TocHandler::readNext( TocRecord &r, bool walkSubTocs, bool hideSubTocEntrie } else if (r.header_.tag_ == TocRecord::TOC_SUB_TOC) { - eckit::MemoryStream s(&r.payload_[0], r.maxPayloadSize); - eckit::PathName path; - s >> path; - // Handle both path and absPath for compatibility as we move from storing - // absolute paths to relative paths. Either may exist in either the TOC_SUB_TOC - // or TOC_CLEAR entries. - ASSERT(path.path().size() > 0); - eckit::PathName absPath; - if (path.path()[0] == '/') { - absPath = findRealPath(path); - if (!absPath.exists()) { - absPath = currentDirectory() / path.baseName(); - } - } else { - absPath = currentDirectory() / path; - } + LocalPathName absPath = parseSubTocRecord(r, readMasked); + if (absPath == "") continue; - // If this subtoc has a masking entry, then skip it, and go on to the next entry. - // Unless readMasked is true, in which case walk it if it exists. - std::pair key(absPath.baseName(), 0); - if (maskedEntries_.find(key) != maskedEntries_.end()) { - if (!readMasked){ - LOG_DEBUG_LIB(LibFdb5) << "SubToc ignored by mask: " << path << std::endl; - continue; - } - // This is a masked subtoc, so it is valid for it to not exist. - if (!absPath.exists()) { - LOG_DEBUG_LIB(LibFdb5) << "SubToc does not exist: " << path << std::endl; - continue; - } - } - - LOG_DEBUG_LIB(LibFdb5) << "Opening SUB_TOC: " << absPath << " " << parentKey_ << std::endl; - - subTocRead_.reset(new TocHandler(absPath, parentKey_)); - subTocRead_->openForRead(); + selectSubTocRead(absPath); if (hideSubTocEntries) { // The first entry in a subtoc must be the init record. Check that - subTocRead_->readNext(r, walkSubTocs, hideSubTocEntries, hideClearEntries, readMasked); + subTocRead_->readNext(r, walkSubTocs, hideSubTocEntries, hideClearEntries, readMasked, data, length); ASSERT(r.header_.tag_ == TocRecord::TOC_INIT); } else { return true; // if not hiding the subtoc entries, return them as normal entries! @@ -459,14 +444,14 @@ bool TocHandler::readNext( TocRecord &r, bool walkSubTocs, bool hideSubTocEntrie } else if (r.header_.tag_ == TocRecord::TOC_INDEX) { eckit::MemoryStream s(&r.payload_[0], r.maxPayloadSize); - eckit::PathName path; + eckit::LocalPathName path; off_t offset; s >> path; s >> offset; - PathName absPath = currentDirectory() / path; + LocalPathName absPath = currentDirectory() / path; - std::pair key(absPath.baseName(), offset); + std::pair key(absPath.baseName(), offset); if (maskedEntries_.find(key) != maskedEntries_.end()) { if(!readMasked){ LOG_DEBUG_LIB(LibFdb5) << "Index ignored by mask: " << path << ":" << offset << std::endl; @@ -493,12 +478,12 @@ bool TocHandler::readNext( TocRecord &r, bool walkSubTocs, bool hideSubTocEntrie // readNext wraps readNextInternal. // readNextInternal reads the next TOC entry from this toc. -bool TocHandler::readNextInternal(TocRecord& r) const { +bool TocHandler::readNextInternal(TocRecord& r, const TocRecord** data, size_t* length) const { CachedFDProxy proxy(tocPath_, fd_, cachedToc_); try { - long len = proxy.read(&r, sizeof(TocRecord::Header)); + long len = proxy.read(&r, sizeof(TocRecord::Header), reinterpret_cast(data)); if (len == 0) { return false; } @@ -511,6 +496,8 @@ bool TocHandler::readNextInternal(TocRecord& r) const { try { long len = proxy.read(&r.payload_, r.header_.size_ - sizeof(TocRecord::Header)); ASSERT(size_t(len) == r.header_.size_ - sizeof(TocRecord::Header)); + + if (length) (*length) = len + sizeof(TocRecord::Header); } catch(...) { dumpTocCache(); throw; @@ -574,7 +561,7 @@ void TocHandler::close() const { if (subTocRead_) { subTocRead_->close(); - subTocRead_.reset(); + subTocRead_ = 0; } if (subTocWrite_) { @@ -596,7 +583,7 @@ void TocHandler::close() const { } void TocHandler::allMaskableEntries(Offset startOffset, Offset endOffset, - std::set>& maskedEntries) const { + std::set>& maskedEntries) const { CachedFDProxy proxy(tocPath_, fd_, cachedToc_); @@ -612,16 +599,13 @@ void TocHandler::allMaskableEntries(Offset startOffset, Offset endOffset, ASSERT(readNextInternal(*r)); eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize); - std::string path; + LocalPathName path; off_t offset; switch (r->header_.tag_) { case TocRecord::TOC_SUB_TOC: { s >> path; - eckit::PathName pathName = path; - // ASSERT(path.size() > 0); - // eckit::PathName absPath = (path[0] == '/') ? findRealPath(path) : (currentDirectory() / path); - maskedEntries.emplace(std::pair(pathName.baseName(), 0)); + maskedEntries.emplace(std::pair(path.baseName(), 0)); break; } @@ -647,6 +631,237 @@ void TocHandler::allMaskableEntries(Offset startOffset, Offset endOffset, } } +eckit::LocalPathName TocHandler::parseSubTocRecord(const TocRecord& r, bool readMasked) const { + + eckit::MemoryStream s(&r.payload_[0], r.maxPayloadSize); + eckit::LocalPathName path; + s >> path; + // Handle both path and absPath for compatibility as we move from storing + // absolute paths to relative paths. Either may exist in either the TOC_SUB_TOC + // or TOC_CLEAR entries. + ASSERT(path.path().size() > 0); + LocalPathName absPath; + if (path.path()[0] == '/') { + absPath = findRealPath(path); + if (!absPath.exists()) { + absPath = currentDirectory() / path.baseName(); + } + } else { + absPath = currentDirectory() / path; + } + + // If this subtoc has a masking entry, then skip it, and go on to the next entry. + // Unless readMasked is true, in which case walk it if it exists. + std::pair key(absPath.baseName(), 0); + if (maskedEntries_.find(key) != maskedEntries_.end()) { + if (!readMasked){ + LOG_DEBUG_LIB(LibFdb5) << "SubToc ignored by mask: " << path << std::endl; + return ""; + } + // This is a masked subtoc, so it is valid for it to not exist. + if (!absPath.exists()) { + LOG_DEBUG_LIB(LibFdb5) << "SubToc does not exist: " << path << std::endl; + return ""; + } + } + + LOG_DEBUG_LIB(LibFdb5) << "Opening SUB_TOC: " << absPath << " " << parentKey_ << std::endl; + + return absPath; +} + +class SubtocPreloader { + + struct AutoFDCloser { + int fd_; + AutoFDCloser(int fd) : fd_(fd) {} + AutoFDCloser(AutoFDCloser&& rhs) : fd_(rhs.fd_) { rhs.fd_ = -1; } + AutoFDCloser(const AutoFDCloser&) = delete; + AutoFDCloser& operator=(AutoFDCloser&& rhs) { + fd_ = rhs.fd_; + rhs.fd_ = -1; + return *this; + } + AutoFDCloser& operator=(const AutoFDCloser&) = delete; + ~AutoFDCloser() { + if (fd_ > 0) ::close(fd_); // n.b. ignore return value + } + }; + + const Key& parentKey_; + + mutable std::map> subTocReadCache_; + std::vector paths_; + +public: + + explicit SubtocPreloader(const Key& parentKey) : parentKey_(parentKey) {} + + decltype(subTocReadCache_)&& cache() { + +#if eckit_HAVE_AIO + int iomode = O_RDONLY; // | O_DIRECT; +#ifdef O_NOATIME + // this introduces issues of permissions + static bool fdbNoATime = eckit::Resource("fdbNoATime;$FDB_OPEN_NOATIME", false); + if(fdbNoATime) { + iomode |= O_NOATIME; + } +#endif + + std::vector aiocbs(paths_.size()); + std::vector buffers(paths_.size()); + std::vector closers; + std::vector done(paths_.size()); + ::memset(done.data(), 0, done.size() * sizeof(char)); + ::memset(aiocbs.data(), 0, sizeof(aiocb) * aiocbs.size()); + + { + eckit::Timer sstime("subtocs.statsubmit", Log::debug()); + for (int i = 0; i < aiocbs.size(); ++i) { + + const eckit::LocalPathName& path = paths_[i]; + + int fd; + SYSCALL2((fd = ::open(path.localPath(), iomode)), path); + closers.emplace_back(AutoFDCloser{fd}); + eckit::Length tocSize = 2*1024*1024; + + aiocb& aio(aiocbs[i]); + zero(aio); + + aio.aio_fildes = fd; + aio.aio_offset = 0; + aio.aio_nbytes = tocSize; + aio.aio_sigevent.sigev_notify = SIGEV_NONE; + + buffers[i].resize(tocSize); + aio.aio_buf = buffers[i].data(); + + SYSCALL(::aio_read(&aio)); + } + } + + std::vector aiocbPtrs(aiocbs.size()); + for (int i = 0; i < aiocbs.size(); ++i) { + aiocbPtrs[i] = &aiocbs[i]; + } + + int doneCount = 0; + + { + eckit::Timer sstime("subtocs.collect", Log::debug()); + + while (doneCount < aiocbs.size()) { + + // Now wait until data is ready from at least one read + + errno = 0; + while (::aio_suspend(aiocbPtrs.data(), aiocbs.size(), nullptr) < 0) { + if (errno != EINTR) { + throw FailedSystemCall("aio_suspend", Here(), errno); + } + } + + // Find which one(s) are ready + + for (int n = 0; n < aiocbs.size(); ++n) { + + if (done[n]) continue; + + int e = ::aio_error(&aiocbs[n]); + if (e == EINPROGRESS) { + continue; + } + + if (e == 0) { + + ssize_t len = ::aio_return(&aiocbs[n]); + if (len != buffers[n].size()) { + aiocbs[n].aio_nbytes = len; + } + + bool grow = true; + auto cachedToc = std::make_unique(buffers[n].size(), grow); + + { + cachedToc->openForWrite(aiocbs[n].aio_nbytes); + AutoClose closer(*cachedToc); + ASSERT(cachedToc->write(buffers[n].data(), aiocbs[n].aio_nbytes) == aiocbs[n].aio_nbytes); + } + ASSERT(subTocReadCache_.find(paths_[n]) == subTocReadCache_.end()); + subTocReadCache_.emplace(paths_[n], std::make_unique(paths_[n], parentKey_, + cachedToc.release())); + + done[n] = true; + doneCount++; + } else { + throw FailedSystemCall("aio_error", Here(), e); + } + } + } + } +#else + NOTIMP +#endif // eckit_HAVE_AIO + + return std::move(subTocReadCache_); + } + + void addPath(const eckit::LocalPathName& path) { + paths_.push_back(path); + } +}; + +void TocHandler::preloadSubTocs(bool readMasked) const { + ASSERT(enumeratedMaskedEntries_); + if (numSubtocsRaw_ == 0) return; + + CachedFDProxy proxy(tocPath_, fd_, cachedToc_); + Offset startPosition = proxy.position(); // remember the current position of the file descriptor + + subTocReadCache_.clear(); + + eckit::Timer preloadTimer("subtocs.preload", Log::debug()); + { + std::unique_ptr r( + new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779) + + // n.b. we call databaseKey() directly, as this preload will normally be called before we have walked + // the toc at all --> TOC_INIT not yet read --> parentKey_ not yet set. + SubtocPreloader preloader(parentKey_); + + while (readNextInternal(*r)) { + + switch (r->header_.tag_) { + case TocRecord::TOC_SUB_TOC: { + LocalPathName absPath = parseSubTocRecord(*r, readMasked); + if (absPath != "") preloader.addPath(absPath); + } + break; + case TocRecord::TOC_INIT: + break; + case TocRecord::TOC_INDEX: + break; + case TocRecord::TOC_CLEAR: + break; + default: { + // This is only a warning, as it is legal for later versions of software to add stuff + // that is just meaningless in a backwards-compatible sense. + Log::warning() << "Unknown TOC entry " << (*r) << " @ " << Here() << std::endl; + break; + } + } + } + + Offset ret = proxy.seek(startPosition); + ASSERT(ret == startPosition); + + subTocReadCache_ = std::move(preloader.cache()); + } + preloadTimer.stop(); +} + void TocHandler::populateMaskedEntriesList() const { ASSERT(fd_ != -1 || cachedToc_); @@ -658,6 +873,8 @@ void TocHandler::populateMaskedEntriesList() const { std::unique_ptr r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779) + size_t countSubTocs = 0; + while ( readNextInternal(*r) ) { eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize); @@ -685,6 +902,7 @@ void TocHandler::populateMaskedEntriesList() const { } case TocRecord::TOC_SUB_TOC: + countSubTocs++; break; case TocRecord::TOC_INIT: break; @@ -705,6 +923,7 @@ void TocHandler::populateMaskedEntriesList() const { Offset ret = proxy.seek(startPosition); ASSERT(ret == startPosition); + numSubtocsRaw_ = countSubTocs; enumeratedMaskedEntries_ = true; } @@ -747,7 +966,7 @@ void TocHandler::writeInitRecord(const Key& key) { << schemaPath_ << std::endl; - eckit::PathName tmp = eckit::PathName::unique(schemaPath_); + eckit::LocalPathName tmp{eckit::PathName::unique(schemaPath_)}; eckit::FileHandle in(dbConfig_.schemaPath()); @@ -763,7 +982,7 @@ void TocHandler::writeInitRecord(const Key& key) { eckit::FileHandle out(tmp); in.copyTo(out); - eckit::PathName::rename(tmp, schemaPath_); + eckit::LocalPathName::rename(tmp, schemaPath_); } std::unique_ptr r2(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_INIT)); // allocate TocRecord on heap (MARS-779) @@ -955,7 +1174,8 @@ Key TocHandler::databaseKey() { // Allocate (large) TocRecord on heap not stack (MARS-779) std::unique_ptr r(new TocRecord(serialisationVersion_.used())); - while ( readNext(*r) ) { + bool walkSubTocs = false; + while ( readNext(*r, walkSubTocs) ) { if (r->header_.tag_ == TocRecord::TOC_INIT) { eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize); dbUID_ = r->header_.uid_; @@ -986,7 +1206,7 @@ size_t TocHandler::numberOfRecords() const { return count_; } -const eckit::PathName& TocHandler::directory() const +const eckit::LocalPathName& TocHandler::directory() const { return directory_; } @@ -1002,6 +1222,14 @@ std::vector TocHandler::loadIndexes(const Catalogue& catalogue, bool sort return indexes; } + // If we haven't yet read the TOC_INIT record to extract the parentKey, it may be needed for + // subtoc handling... + // We've got a bit mangled with our constness here... + if (parentKey_.empty() && remapKeys && !isSubToc_) { + const auto& k = const_cast(*this).databaseKey(); + parentKey_ = k; + } + openForRead(); TocHandlerCloser close(*this); @@ -1009,11 +1237,23 @@ std::vector TocHandler::loadIndexes(const Catalogue& catalogue, bool sort std::unique_ptr r(new TocRecord(serialisationVersion_.used())); count_ = 0; + // A record of all the index entries found (to process later) + struct IndexEntry { + size_t seqNo; + const TocRecord* datap; + size_t dataLen; + LocalPathName tocDirectoryName; // May differ if using the overlay + }; + std::vector indexEntries; + bool debug = LibFdb5::instance().debug(); bool walkSubTocs = true; bool hideSubTocEntries = true; bool hideClearEntries = true; - while ( readNext(*r, walkSubTocs, hideSubTocEntries, hideClearEntries) ) { + bool readMasked = false; + const TocRecord* pdata; + size_t dataLength; + while ( readNext(*r, walkSubTocs, hideSubTocEntries, hideClearEntries, readMasked, &pdata, &dataLength) ) { eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize); std::string path; @@ -1033,14 +1273,9 @@ std::vector TocHandler::loadIndexes(const Catalogue& catalogue, bool sort break; case TocRecord::TOC_INDEX: - s >> path; - s >> offset; - s >> type; - LOG_DEBUG(debug, LibFdb5) << "TocRecord TOC_INDEX " << path << " - " << offset << std::endl; - indexes.push_back( new TocIndex(s, catalogue, r->header_.serialisationVersion_, currentDirectory(), - currentDirectory() / path, offset, preloadBTree_)); + indexEntries.emplace_back(IndexEntry{indexEntries.size(), pdata, dataLength, currentDirectory()}); - if (subTocs != 0 && subTocRead_) { + if (subTocs && subTocRead_) { subTocs->insert(subTocRead_->tocPath()); } if (indexInSubtoc) { @@ -1069,6 +1304,47 @@ std::vector TocHandler::loadIndexes(const Catalogue& catalogue, bool sort } + // Now construct the index objects (we can parallelise this...) + // n.b. would be nicer to use std::for_each with a policy ... but that doesn't work for now. + + static const int nthreads = eckit::Resource("fdbLoadIndexThreads;$FDB_LOAD_INDEX_THREADS", 1); + + { + std::vector> threads; + const int nthreads_shadow = nthreads; // due to lambda capture rules disallowing static... + + std::vector tocindexes; + tocindexes.resize(indexEntries.size()); + + for (int i = 0; i < nthreads; ++i) { + threads.emplace_back(std::async(std::launch::async, [i, &indexEntries, &tocindexes, &nthreads_shadow, debug, &catalogue, this] { + for (int idx = i; idx < indexEntries.size(); idx+=nthreads) { + + const IndexEntry& entry = indexEntries[idx]; + eckit::MemoryStream s(entry.datap->payload_, entry.dataLen - sizeof(TocRecord::Header)); + LocalPathName path; + off_t offset; + std::string type; + s >> path; + s >> offset; + s >> type; + LOG_DEBUG(debug, LibFdb5) << "TocRecord TOC_INDEX " << path << " - " << offset << std::endl; + tocindexes[entry.seqNo] = new TocIndex(s, catalogue, entry.datap->header_.serialisationVersion_, + entry.tocDirectoryName, + entry.tocDirectoryName / path, + offset, preloadBTree_); + } + })); + } + + for (auto& thread : threads) thread.get(); + + indexes.reserve(indexEntries.size()); + for (TocIndex* ti : tocindexes) { + indexes.emplace_back(ti); + } + } + // For some purposes, it is useful to have the indexes sorted by their location, as this is is faster for // iterating through the data. @@ -1095,14 +1371,26 @@ std::vector TocHandler::loadIndexes(const Catalogue& catalogue, bool sort } -const eckit::PathName &TocHandler::tocPath() const { +const eckit::LocalPathName& TocHandler::tocPath() const { return tocPath_; } -const eckit::PathName &TocHandler::schemaPath() const { +const eckit::LocalPathName& TocHandler::schemaPath() const { return schemaPath_; } +void TocHandler::selectSubTocRead(const eckit::LocalPathName& path) const { + + auto it = subTocReadCache_.find(path); + if (it == subTocReadCache_.end()) { + auto r = subTocReadCache_.insert(std::make_pair(path, new TocHandler(path, parentKey_))); + ASSERT(r.second); + it = r.first; + } + + subTocRead_ = it->second.get(); + subTocRead_->openForRead(); +} void TocHandler::dump(std::ostream& out, bool simple, bool walkSubTocs) const { @@ -1117,7 +1405,7 @@ void TocHandler::dump(std::ostream& out, bool simple, bool walkSubTocs) const { while ( readNext(*r, walkSubTocs, hideSubTocEntries, hideClearEntries) ) { eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize); - std::string path; + LocalPathName path; std::string type; bool isSubToc; @@ -1145,7 +1433,8 @@ void TocHandler::dump(std::ostream& out, bool simple, bool walkSubTocs) const { s >> type; out << " Path: " << path << ", offset: " << offset << ", type: " << type; if(!simple) { out << std::endl; } - Index index(new TocIndex(s, *(dynamic_cast(this)), r->header_.serialisationVersion_, currentDirectory(), currentDirectory() / path, offset)); + Index index(new TocIndex(s, *(dynamic_cast(this)), r->header_.serialisationVersion_, + currentDirectory(), currentDirectory() / path, offset)); index.dump(out, " ", simple); break; } @@ -1188,7 +1477,7 @@ void TocHandler::dumpIndexFile(std::ostream& out, const eckit::PathName& indexFi while ( readNext(*r, walkSubTocs, hideSubTocEntries, hideClearEntries, readMasked) ) { eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize); - std::string path; + LocalPathName path; std::string type; off_t offset; @@ -1199,10 +1488,11 @@ void TocHandler::dumpIndexFile(std::ostream& out, const eckit::PathName& indexFi s >> offset; s >> type; - if ((currentDirectory() / path).sameAs(indexFile)) { + if ((currentDirectory() / path).sameAs(eckit::LocalPathName{indexFile})) { r->dump(out, true); out << std::endl << " Path: " << path << ", offset: " << offset << ", type: " << type; - Index index(new TocIndex(s, *(dynamic_cast(this)), r->header_.serialisationVersion_, currentDirectory(), currentDirectory() / path, offset)); + Index index(new TocIndex(s, *(dynamic_cast(this)), r->header_.serialisationVersion_, + currentDirectory(), currentDirectory() / path, offset)); index.dump(out, " ", false, true); } break; @@ -1253,7 +1543,7 @@ void TocHandler::enumerateMasked(const Catalogue& catalogue, std::set 0); - eckit::PathName absPath; + eckit::LocalPathName absPath; if (entry.first.path()[0] == '/') { absPath = entry.first; if (!absPath.exists()) { @@ -1298,7 +1588,7 @@ void TocHandler::enumerateMasked(const Catalogue& catalogue, std::setpayload_[0], r->maxPayloadSize); - std::string path; + LocalPathName path; std::string type; off_t offset; s >> path; @@ -1306,9 +1596,9 @@ void TocHandler::enumerateMasked(const Catalogue& catalogue, std::set> type; // n.b. readNextInternal --> directory_ not currentDirectory() - PathName absPath = directory_ / path; + LocalPathName absPath = directory_ / path; - std::pair key(absPath.baseName(), offset); + std::pair key(absPath.baseName(), offset); if (maskedEntries_.find(key) != maskedEntries_.end()) { if (absPath.exists()) { Index index(new TocIndex(s, *(dynamic_cast(this)), r->header_.serialisationVersion_, directory_, absPath, offset)); @@ -1355,7 +1645,7 @@ const Key& TocHandler::currentRemapKey() const { } } -const PathName &TocHandler::currentDirectory() const { +const LocalPathName& TocHandler::currentDirectory() const { if (subTocRead_) { return subTocRead_->currentDirectory(); } else { @@ -1363,7 +1653,7 @@ const PathName &TocHandler::currentDirectory() const { } } -const PathName& TocHandler::currentTocPath() const { +const LocalPathName& TocHandler::currentTocPath() const { if (subTocRead_) { return subTocRead_->currentTocPath(); } else { @@ -1501,7 +1791,7 @@ std::vector TocHandler::lockfilePaths() const { return paths; } -PathName TocHandler::fullControlFilePath(const std::string& name) const { +LocalPathName TocHandler::fullControlFilePath(const std::string& name) const { return directory_ / name; } diff --git a/src/fdb5/toc/TocHandler.h b/src/fdb5/toc/TocHandler.h index 65e36ecc6..9a8128b6e 100644 --- a/src/fdb5/toc/TocHandler.h +++ b/src/fdb5/toc/TocHandler.h @@ -18,6 +18,7 @@ #include #include "eckit/filesystem/PathName.h" +#include "eckit/filesystem/LocalPathName.h" #include "eckit/filesystem/URI.h" #include "eckit/io/Length.h" #include "eckit/io/MemoryHandle.h" @@ -34,6 +35,7 @@ namespace eckit { class Configuration; +class MemoryHandle; } namespace fdb5 { @@ -98,7 +100,7 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { public: // typedefs typedef std::vector TocVec; - typedef std::vector< eckit::PathName > TocPaths; + typedef std::vector< eckit::LocalPathName > TocPaths; public: // methods @@ -107,7 +109,7 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { TocHandler( const eckit::PathName &dir, const Config& config); /// For initialising sub tocs or diagnostic interrogation. - TocHandler(const eckit::PathName& path, const Key& parentKey); + TocHandler(const eckit::PathName& path, const Key& parentKey, eckit::MemoryHandle* cachedToc=nullptr); ~TocHandler() override; @@ -137,9 +139,9 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { Key databaseKey(); size_t numberOfRecords() const; - const eckit::PathName& directory() const; - const eckit::PathName& tocPath() const; - const eckit::PathName& schemaPath() const; + const eckit::LocalPathName& directory() const; + const eckit::LocalPathName& tocPath() const; + const eckit::LocalPathName& schemaPath() const; void dump(std::ostream& out, bool simple = false, bool walkSubTocs = true) const; void dumpIndexFile(std::ostream& out, const eckit::PathName& indexFile) const; @@ -168,22 +170,21 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { private: // methods - eckit::PathName fullControlFilePath(const std::string& name) const; + eckit::LocalPathName fullControlFilePath(const std::string& name) const; void createControlFile(const std::string& name) const; void removeControlFile(const std::string& name) const; protected: // members mutable Key parentKey_; // Contains the key of the first TOC explored in subtoc chain - const eckit::PathName schemaPath_; uid_t dbUID() const override; protected: // methods // Handle location and remapping information if using a mounted TocCatalogue - const eckit::PathName& currentDirectory() const; - const eckit::PathName& currentTocPath() const; + const eckit::LocalPathName& currentDirectory() const; + const eckit::LocalPathName& currentTocPath() const; const Key& currentRemapKey() const; // Build the record, and return the payload size @@ -215,8 +216,10 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { /// file (opened for read). It resets back to the same place when done. This is /// to allow searching only from the first subtoc. void allMaskableEntries(eckit::Offset startOffset, eckit::Offset endOffset, - std::set>& maskedEntries) const; + std::set>& maskedEntries) const; + eckit::LocalPathName parseSubTocRecord(const TocRecord& r, bool readMasked) const; void populateMaskedEntriesList() const; + void preloadSubTocs(bool readMasked) const; void append(TocRecord &r, size_t payloadSize); @@ -225,9 +228,12 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { // readMasked=true will walk subtocs and read indexes even if they are masked. This is // useful for dumping indexes which are cleared, or only referred to in cleared subtocs. bool readNext(TocRecord &r, bool walkSubTocs = true, bool hideSubTocEntries = true, - bool hideClearEntries = true, bool readMasked = false) const; + bool hideClearEntries = true, bool readMasked = false, + const TocRecord** data=nullptr, size_t* length=nullptr) const; - bool readNextInternal(TocRecord &r) const; + void selectSubTocRead(const eckit::LocalPathName& path) const; + + bool readNextInternal(TocRecord &r, const TocRecord** data=nullptr, size_t* length=nullptr) const; std::string userName(long) const; @@ -237,7 +243,7 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { private: // members - eckit::PathName tocPath_; + eckit::LocalPathName tocPath_; Config dbConfig_; TocSerialisationVersion serialisationVersion_; @@ -256,13 +262,15 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { mutable std::unique_ptr cachedToc_; ///< this is only for read path /// The sub toc is initialised in the read or write pathways for maintaining state. - mutable std::unique_ptr subTocRead_; + mutable std::map> subTocReadCache_; + mutable TocHandler* subTocRead_; // n.b. non-owning mutable std::unique_ptr subTocWrite_; mutable size_t count_; - mutable std::set> maskedEntries_; + mutable std::set> maskedEntries_; mutable bool enumeratedMaskedEntries_; + mutable int numSubtocsRaw_; mutable bool writeMode_; mutable bool dirty_; diff --git a/src/fdb5/toc/TocMoveVisitor.cc b/src/fdb5/toc/TocMoveVisitor.cc index 6fa9a82f0..b7660b876 100644 --- a/src/fdb5/toc/TocMoveVisitor.cc +++ b/src/fdb5/toc/TocMoveVisitor.cc @@ -60,11 +60,11 @@ bool TocMoveVisitor::visitDatabase(const Catalogue& catalogue) { MoveVisitor::visitDatabase(catalogue_); // TOC specific checks: index files not locked - DIR* dirp = ::opendir(catalogue_.basePath().asString().c_str()); + DIR* dirp = ::opendir(catalogue_.basePath().c_str()); struct dirent* dp; while ((dp = readdir(dirp)) != NULL) { if (strstr( dp->d_name, ".index")) { - eckit::PathName src_ = catalogue_.basePath() / dp->d_name; + eckit::PathName src_ = PathName(catalogue_.basePath()) / dp->d_name; int fd = ::open(src_.asString().c_str(), O_RDWR); if(::flock(fd, LOCK_EX)) { std::stringstream ss; @@ -126,7 +126,7 @@ void TocMoveVisitor::move() { dest_db.mkdir(); } - DIR* dirp = ::opendir(catalogue_.basePath().asString().c_str()); + DIR* dirp = ::opendir(catalogue_.basePath().c_str()); struct dirent* dp; while ((dp = readdir(dirp)) != NULL) { if (strstr( dp->d_name, ".index") || @@ -142,7 +142,7 @@ void TocMoveVisitor::move() { FileCopy toc(catalogue_.basePath(), dest_db, "toc", true); queue_.emplace(toc); - dirp = ::opendir(catalogue_.basePath().asString().c_str()); + dirp = ::opendir(catalogue_.basePath().c_str()); while ((dp = readdir(dirp)) != NULL) { if (strstr( dp->d_name, ".lock") || strstr( dp->d_name, "duplicates.allow")) { diff --git a/src/fdb5/toc/TocStats.h b/src/fdb5/toc/TocStats.h index 8670a68b3..55809f059 100644 --- a/src/fdb5/toc/TocStats.h +++ b/src/fdb5/toc/TocStats.h @@ -162,7 +162,7 @@ class TocStatsReportVisitor : public virtual StatsReportVisitor { // bool visitDatabase(const Catalogue& catalogue, const Store& store) override; bool visitDatabase(const Catalogue& catalogue) override; void visitDatum(const Field& field, const std::string& keyFingerprint) override; - void visitDatum(const Field& field, const TypedKey& datumKey) override { NOTIMP; } + void visitDatum(const Field& field, const Key& datumKey) override { NOTIMP; } // This visitor is only legit for one DB - so don't reset database void catalogueComplete(const Catalogue& catalogue) override; diff --git a/src/fdb5/toc/TocStore.cc b/src/fdb5/toc/TocStore.cc index 44ff218b0..f834b65bf 100644 --- a/src/fdb5/toc/TocStore.cc +++ b/src/fdb5/toc/TocStore.cc @@ -77,7 +77,7 @@ std::vector TocStore::collocatedDataURIs() const { std::vector files; std::vector dirs; - (directory_).children(files, dirs); + PathName(directory_).children(files, dirs); std::vector res; for (const auto& f : files) { diff --git a/src/fdb5/toc/TocWipeVisitor.cc b/src/fdb5/toc/TocWipeVisitor.cc index 6d330e13f..b4a08821d 100644 --- a/src/fdb5/toc/TocWipeVisitor.cc +++ b/src/fdb5/toc/TocWipeVisitor.cc @@ -230,8 +230,8 @@ void TocWipeVisitor::addMetadataPaths() { // toc, schema - schemaPath_ = catalogue_.schemaPath(); - tocPath_ = catalogue_.tocPath(); + schemaPath_ = catalogue_.schemaPath().path(); + tocPath_ = catalogue_.tocPath().path(); // subtocs diff --git a/src/fdb5/types/TypesRegistry.cc b/src/fdb5/types/TypesRegistry.cc index 5c65df01f..30670bbba 100644 --- a/src/fdb5/types/TypesRegistry.cc +++ b/src/fdb5/types/TypesRegistry.cc @@ -17,6 +17,9 @@ #include "fdb5/types/TypesFactory.h" #include "fdb5/types/TypesRegistry.h" +#include "metkit/mars/MarsRequest.h" +#include "metkit/mars/Parameter.h" + namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- @@ -87,6 +90,22 @@ const Type &TypesRegistry::lookupType(const std::string &keyword) const { } } +metkit::mars::MarsRequest TypesRegistry::canonicalise(const metkit::mars::MarsRequest& request) const { + metkit::mars::MarsRequest result(request.verb()); + + for (const auto& param : request.parameters()) { + const std::vector& srcVals = param.values(); + std::vector vals; + vals.reserve(srcVals.size()); + for (const auto& v : srcVals) { + vals.push_back(lookupType(param.name()).toKey(v)); + } + result.values(param.name(), vals); + } + + return result; +} + std::ostream &operator<<(std::ostream &s, const TypesRegistry &x) { x.print(s); return s; diff --git a/src/fdb5/types/TypesRegistry.h b/src/fdb5/types/TypesRegistry.h index 2ff87156b..b14e95101 100644 --- a/src/fdb5/types/TypesRegistry.h +++ b/src/fdb5/types/TypesRegistry.h @@ -24,6 +24,10 @@ #include "eckit/serialisation/Streamable.h" +namespace metkit::mars { +class MarsRequest; +} + namespace fdb5 { class Type; @@ -46,6 +50,8 @@ class TypesRegistry : public eckit::Streamable { void dump( std::ostream &out ) const; void dump( std::ostream &out, const std::string &keyword ) const; + metkit::mars::MarsRequest canonicalise(const metkit::mars::MarsRequest& request) const; + const eckit::ReanimatorBase& reanimator() const override { return reanimator_; } static const eckit::ClassSpec& classSpec() { return classSpec_; } void encode(eckit::Stream& s) const override;