diff --git a/src/fdb5/CMakeLists.txt b/src/fdb5/CMakeLists.txt index 6b494bb29..41f61c69b 100644 --- a/src/fdb5/CMakeLists.txt +++ b/src/fdb5/CMakeLists.txt @@ -48,7 +48,7 @@ list( APPEND fdb5_srcs api/helpers/PurgeIterator.h api/helpers/StatsIterator.cc api/helpers/StatsIterator.h - api/helpers/ArchiveCallback.h + api/helpers/Callback.h api/local/QueryVisitor.h api/local/QueueStringLogTarget.h diff --git a/src/fdb5/LibFdb5.cc b/src/fdb5/LibFdb5.cc index ec688c31e..dd5cdea57 100644 --- a/src/fdb5/LibFdb5.cc +++ b/src/fdb5/LibFdb5.cc @@ -48,6 +48,14 @@ const Config& LibFdb5::defaultConfig(const eckit::Configuration& userConfig) { return *config_; } +ConstructorCallback LibFdb5::constructorCallback() { + return constructorCallback_; +} + +void LibFdb5::registerConstructorCallback(ConstructorCallback cb) { + constructorCallback_ = cb; +} + bool LibFdb5::dontDeregisterFactories() const { #if eckit_VERSION_MAJOR > 1 || (eckit_VERSION_MAJOR == 1 && (eckit_VERSION_MINOR > 17 || (eckit_VERSION_MINOR == 17 && eckit_VERSION_PATCH >0))) return eckit::LibEcKit::instance().dontDeregisterFactories(); @@ -74,6 +82,10 @@ RemoteProtocolVersion LibFdb5::remoteProtocolVersion() const { } +const std::set& LibFdb5::auxiliaryRegistry() { + static std::set auxiliaryRegistry(eckit::Resource>("$FDB_AUX_EXTENSIONS;fdbAuxExtensions", {"gribjump"})); + return auxiliaryRegistry; +} //---------------------------------------------------------------------------------------------------------------------- static unsigned getUserEnvRemoteProtocol() { diff --git a/src/fdb5/LibFdb5.h b/src/fdb5/LibFdb5.h index 524c93b9c..c25dda30c 100644 --- a/src/fdb5/LibFdb5.h +++ b/src/fdb5/LibFdb5.h @@ -21,6 +21,7 @@ #include "fdb5/database/DB.h" #include "fdb5/types/TypesRegistry.h" +#include "fdb5/api/helpers/Callback.h" namespace fdb5 { @@ -75,6 +76,12 @@ class LibFdb5 : public eckit::system::Library { bool dontDeregisterFactories() const; + void registerConstructorCallback(ConstructorCallback cb); + + ConstructorCallback constructorCallback(); + + static const std::set& auxiliaryRegistry(); + protected: virtual std::string version() const; @@ -82,6 +89,7 @@ class LibFdb5 : public eckit::system::Library { private: std::unique_ptr config_; + ConstructorCallback constructorCallback_ = CALLBACK_CONSTRUCTOR_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 0a6fe7cc8..f759b57d0 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -21,6 +21,9 @@ #include "eckit/message/Reader.h" #include "eckit/utils/StringTools.h" +#include "eckit/system/Plugin.h" +#include "eckit/system/LibraryManager.h" + #include "metkit/hypercube/HyperCubePayloaded.h" #include "fdb5/LibFdb5.h" @@ -39,7 +42,9 @@ namespace fdb5 { FDB::FDB(const Config &config) : internal_(FDBFactory::instance().build(config)), dirty_(false), - reportStats_(config.getBool("statistics", false)) {} + reportStats_(config.getBool("statistics", false)) { + LibFdb5::instance().constructorCallback()(*this); +} FDB::~FDB() { @@ -284,11 +289,11 @@ void FDB::print(std::ostream& s) const { void FDB::flush() { if (dirty_) { - eckit::Timer timer; timer.start(); internal_->flush(); + flushCallback_(); dirty_ = false; timer.stop(); @@ -322,8 +327,12 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const { return internal_->enabled(controlIdentifier); } -void FDB::registerCallback(ArchiveCallback callback) { - internal_->registerCallback(callback); +void FDB::registerArchiveCallback(ArchiveCallback callback) { // todo rename + internal_->registerArchiveCallback(callback); +} + +void FDB::registerFlushCallback(FlushCallback callback) { // todo rename + flushCallback_ = callback; } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDB.h b/src/fdb5/api/FDB.h index 69794859d..6be852565 100644 --- a/src/fdb5/api/FDB.h +++ b/src/fdb5/api/FDB.h @@ -34,7 +34,7 @@ #include "fdb5/api/helpers/WipeIterator.h" #include "fdb5/api/helpers/MoveIterator.h" #include "fdb5/config/Config.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { namespace message { @@ -119,7 +119,8 @@ class FDB { bool dirty() const; - void registerCallback(ArchiveCallback callback); + void registerArchiveCallback(ArchiveCallback callback); + void registerFlushCallback(FlushCallback callback); // -------------- API management ---------------------------- @@ -155,6 +156,7 @@ class FDB { FDBStats stats_; + FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDBFactory.h b/src/fdb5/api/FDBFactory.h index ed526421b..6cd3c1c37 100644 --- a/src/fdb5/api/FDBFactory.h +++ b/src/fdb5/api/FDBFactory.h @@ -37,7 +37,7 @@ #include "fdb5/api/helpers/PurgeIterator.h" #include "fdb5/api/helpers/StatsIterator.h" #include "fdb5/api/helpers/StatusIterator.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { namespace message { @@ -91,7 +91,7 @@ class FDBBase : private eckit::NonCopyable { virtual AxesIterator axes(const FDBToolRequest& request, int axes) { NOTIMP; } - void registerCallback(ArchiveCallback callback) {callback_ = callback;} + void registerArchiveCallback(ArchiveCallback callback) {callback_ = callback;} // -------------- API management ---------------------------- diff --git a/src/fdb5/api/helpers/ArchiveCallback.h b/src/fdb5/api/helpers/Callback.h similarity index 54% rename from src/fdb5/api/helpers/ArchiveCallback.h rename to src/fdb5/api/helpers/Callback.h index a98dc894a..8b9f04409 100644 --- a/src/fdb5/api/helpers/ArchiveCallback.h +++ b/src/fdb5/api/helpers/Callback.h @@ -14,14 +14,20 @@ */ #pragma once - +#include #include "fdb5/database/Key.h" #include "fdb5/database/FieldLocation.h" namespace fdb5 { -using ArchiveCallback = std::function; +class FDB; + +using ArchiveCallback = std::function>)>; +using FlushCallback = std::function; +using ConstructorCallback = std::function; -static const ArchiveCallback CALLBACK_NOOP = [](const Key&, const FieldLocation&) {}; +static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, std::future>) {}; +static const FlushCallback CALLBACK_FLUSH_NOOP = []() {}; +static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](FDB&) {}; } // namespace fdb5 diff --git a/src/fdb5/api/local/PurgeVisitor.cc b/src/fdb5/api/local/PurgeVisitor.cc index d0c9d9f84..5bd2a2cf8 100644 --- a/src/fdb5/api/local/PurgeVisitor.cc +++ b/src/fdb5/api/local/PurgeVisitor.cc @@ -85,6 +85,8 @@ void PurgeVisitor::visitDatum(const Field& field, const std::string& keyFingerpr void PurgeVisitor::catalogueComplete(const Catalogue& catalogue) { internalVisitor_->catalogueComplete(catalogue); + internalVisitor_->gatherAuxiliaryURIs(); + if (!porcelain_) { internalVisitor_->report(out_); } diff --git a/src/fdb5/daos/DaosCatalogueWriter.cc b/src/fdb5/daos/DaosCatalogueWriter.cc index 764716825..3db6be247 100644 --- a/src/fdb5/daos/DaosCatalogueWriter.cc +++ b/src/fdb5/daos/DaosCatalogueWriter.cc @@ -228,7 +228,7 @@ const Index& DaosCatalogueWriter::currentIndex() { /// @todo: other writers may be simultaneously updating the axes KeyValues in DAOS. Should these /// new updates be retrieved and put into in-memory axes from time to time, e.g. every /// time a value is put in an axis KeyValue? -void DaosCatalogueWriter::archive(const Key& key, std::unique_ptr fieldLocation) { +void DaosCatalogueWriter::archive(const Key& key, std::shared_ptr fieldLocation) { if (current_.null()) { ASSERT(!currentIndexKey_.empty()); diff --git a/src/fdb5/daos/DaosCatalogueWriter.h b/src/fdb5/daos/DaosCatalogueWriter.h index a1acbac39..454cbc408 100644 --- a/src/fdb5/daos/DaosCatalogueWriter.h +++ b/src/fdb5/daos/DaosCatalogueWriter.h @@ -56,7 +56,7 @@ class DaosCatalogueWriter : public DaosCatalogue, public CatalogueWriter { void clean() override; void close() override; - void archive(const Key& key, std::unique_ptr fieldLocation) override; + void archive(const Key& key, std::shared_ptr fieldLocation) override; virtual void print( std::ostream &out ) const override { NOTIMP; } diff --git a/src/fdb5/daos/DaosStore.h b/src/fdb5/daos/DaosStore.h index 6c9af3b2e..34dc5e0e9 100644 --- a/src/fdb5/daos/DaosStore.h +++ b/src/fdb5/daos/DaosStore.h @@ -42,6 +42,10 @@ class DaosStore : public Store, public DaosCommon { void checkUID() const override { /* nothing to do */ } + // DAOS store does not currently support auxiliary objects + std::vector getAuxiliaryURIs(const eckit::URI&) const override { return {}; } + bool auxiliaryURIExists(const eckit::URI&) const override { return false; } + protected: // methods std::string type() const override { return "daos"; } diff --git a/src/fdb5/database/Catalogue.h b/src/fdb5/database/Catalogue.h index cf29eaf67..2046b6cee 100644 --- a/src/fdb5/database/Catalogue.h +++ b/src/fdb5/database/Catalogue.h @@ -121,7 +121,7 @@ class CatalogueReader { class CatalogueWriter { public: virtual const Index& currentIndex() = 0; - virtual void archive(const Key& key, std::unique_ptr fieldLocation) = 0; + virtual void archive(const Key& key, std::shared_ptr fieldLocation) = 0; virtual void overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) = 0; virtual void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) = 0; virtual void reconsolidate() = 0; diff --git a/src/fdb5/database/DB.cc b/src/fdb5/database/DB.cc index 54f57b768..db3662b9c 100644 --- a/src/fdb5/database/DB.cc +++ b/src/fdb5/database/DB.cc @@ -14,7 +14,6 @@ #include "fdb5/database/DB.h" #include "fdb5/database/Field.h" #include "fdb5/toc/TocEngine.h" -#include "fdb5/api/helpers/ArchiveCallback.h" using eckit::Log; @@ -110,9 +109,14 @@ void DB::archive(const Key& key, const void* data, eckit::Length length, const K ASSERT(cat); const Index& idx = cat->currentIndex(); - std::unique_ptr location(store().archive(idx.key(), data, length)); - callback(field, *location); + std::shared_ptr location(store().archive(idx.key(), data, length)); + + // In anticipaton of store().archive() working asynchronously in later FDB versions. + std::promise> promise; + promise.set_value(location); + + callback(field, data, length, promise.get_future()); cat->archive(key, std::move(location)); } diff --git a/src/fdb5/database/DB.h b/src/fdb5/database/DB.h index 1a34bdfac..f6e37825f 100644 --- a/src/fdb5/database/DB.h +++ b/src/fdb5/database/DB.h @@ -23,7 +23,7 @@ #include "fdb5/database/EntryVisitMechanism.h" #include "fdb5/database/Key.h" #include "fdb5/database/Store.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { class DataHandle; diff --git a/src/fdb5/database/Field.cc b/src/fdb5/database/Field.cc index 53d3a7be9..a29dd3b60 100644 --- a/src/fdb5/database/Field.cc +++ b/src/fdb5/database/Field.cc @@ -16,7 +16,7 @@ namespace fdb5 { Field::Field() {} -Field::Field(std::unique_ptr location, time_t timestamp, const FieldDetails& details): +Field::Field(std::shared_ptr location, time_t timestamp, const FieldDetails& details): location_(std::move(location)), timestamp_(timestamp), details_(details) { diff --git a/src/fdb5/database/Field.h b/src/fdb5/database/Field.h index b49ef3753..4ce77a3e3 100644 --- a/src/fdb5/database/Field.h +++ b/src/fdb5/database/Field.h @@ -43,7 +43,7 @@ class Field { Field(); - Field(std::unique_ptr location, time_t timestamp, const FieldDetails& details = FieldDetails()); + Field(std::shared_ptr location, time_t timestamp, const FieldDetails& details = FieldDetails()); Field(const FieldLocation&& location, time_t timestamp, const FieldDetails& details = FieldDetails()); eckit::DataHandle* dataHandle() const { return location_->dataHandle(); } diff --git a/src/fdb5/database/PurgeVisitor.h b/src/fdb5/database/PurgeVisitor.h index b0c072a98..008a61d8e 100644 --- a/src/fdb5/database/PurgeVisitor.h +++ b/src/fdb5/database/PurgeVisitor.h @@ -29,6 +29,8 @@ class PurgeVisitor : public virtual StatsReportVisitor { virtual void report(std::ostream& out) const = 0; virtual void purge(std::ostream& out, bool porcelain, bool doit) const = 0; + + virtual void gatherAuxiliaryURIs() {} // NOOP by default }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/database/Store.h b/src/fdb5/database/Store.h index 55c66e87b..b98431705 100644 --- a/src/fdb5/database/Store.h +++ b/src/fdb5/database/Store.h @@ -62,6 +62,9 @@ class Store { virtual std::vector collocatedDataURIs() const = 0; virtual std::set asCollocatedDataURIs(const std::vector&) const = 0; + virtual std::vector getAuxiliaryURIs(const eckit::URI&) const { NOTIMP; } + virtual bool auxiliaryURIExists(const eckit::URI&) const { NOTIMP; } + protected: // members const Schema& schema_; //<< schema is owned by catalogue which always outlives the store }; diff --git a/src/fdb5/toc/TocCatalogueWriter.cc b/src/fdb5/toc/TocCatalogueWriter.cc index c34714bba..376403619 100644 --- a/src/fdb5/toc/TocCatalogueWriter.cc +++ b/src/fdb5/toc/TocCatalogueWriter.cc @@ -294,7 +294,7 @@ bool TocCatalogueWriter::enabled(const ControlIdentifier& controlIdentifier) con return TocCatalogue::enabled(controlIdentifier); } -void TocCatalogueWriter::archive(const Key& key, std::unique_ptr fieldLocation) { +void TocCatalogueWriter::archive(const Key& key, std::shared_ptr fieldLocation) { dirty_ = true; if (current_.null()) { diff --git a/src/fdb5/toc/TocCatalogueWriter.h b/src/fdb5/toc/TocCatalogueWriter.h index 28e2a7422..61696a106 100644 --- a/src/fdb5/toc/TocCatalogueWriter.h +++ b/src/fdb5/toc/TocCatalogueWriter.h @@ -70,7 +70,7 @@ class TocCatalogueWriter : public TocCatalogue, public CatalogueWriter { void clean() override; void close() override; - void archive(const Key& key, std::unique_ptr fieldLocation) override; + void archive(const Key& key, std::shared_ptr fieldLocation) override; void reconsolidateIndexesAndTocs(); virtual void print( std::ostream &out ) const override; diff --git a/src/fdb5/toc/TocPurgeVisitor.cc b/src/fdb5/toc/TocPurgeVisitor.cc index 1d4104156..f837a6bd2 100644 --- a/src/fdb5/toc/TocPurgeVisitor.cc +++ b/src/fdb5/toc/TocPurgeVisitor.cc @@ -51,6 +51,32 @@ bool TocPurgeVisitor::visitDatabase(const Catalogue& catalogue, const Store& sto return true; } +void TocPurgeVisitor::gatherAuxiliaryURIs() { + for (const auto& it : dataUsage_) { // + + // Check if .data file is deletable + bool deletable = false; + if (it.second == 0) { + eckit::PathName path(it.first); + if (store_.uriBelongs(eckit::URI(store_.type(), path))) { + deletable = true; + } + } + + // Add auxiliary files to the corresponding set + eckit::URI uri(store_.type(), eckit::PathName(it.first)); + for (const auto& auxURI : store_.getAuxiliaryURIs(uri)) { + if (!store_.auxiliaryURIExists(auxURI)) continue; + // Todo: in future can we just use URIs, not paths? + eckit::PathName auxPath = auxURI.path(); + if (deletable) { + deleteAuxFiles_.insert(auxPath); + } else { + keepAuxFiles_.insert(auxPath); + } + } + } +} void TocPurgeVisitor::report(std::ostream& out) const { @@ -112,8 +138,27 @@ void TocPurgeVisitor::report(std::ostream& out) const { if (!cnt2) { out << " - NONE -" << std::endl; } + out << std::endl; + // Auxiliary files + out << "Auxiliary files to be deleted:" << std::endl; + for (const auto& it : deleteAuxFiles_) { + out << " " << it << std::endl; + } + if (deleteAuxFiles_.empty()) { + out << " - NONE -" << std::endl; + } out << std::endl; + + out << "Auxiliary files to be kept:" << std::endl; + for (const auto& it : keepAuxFiles_) { + out << " " << it << std::endl; + } + if (keepAuxFiles_.empty()) { + out << " - NONE -" << std::endl; + } + out << std::endl; + size_t cnt3 = 0; out << "Index files to be deleted:" << std::endl; for (const auto& it : indexUsage_) { // @@ -163,6 +208,12 @@ void TocPurgeVisitor::purge(std::ostream& out, bool porcelain, bool doit) const } } + for (const auto& path : deleteAuxFiles_) { + if (path.dirName().sameAs(directory) && keepAuxFiles_.find(path) == keepAuxFiles_.end()) { + store_.remove(eckit::URI(store_.type(), path), logAlways, logVerbose, doit); + } + } + for (const auto& it : indexUsage_) { // if (it.second == 0) { eckit::PathName path(it.first); diff --git a/src/fdb5/toc/TocPurgeVisitor.h b/src/fdb5/toc/TocPurgeVisitor.h index cda7b20b3..417cdd665 100644 --- a/src/fdb5/toc/TocPurgeVisitor.h +++ b/src/fdb5/toc/TocPurgeVisitor.h @@ -36,9 +36,13 @@ class TocPurgeVisitor : public PurgeVisitor, public TocStatsReportVisitor { void report(std::ostream& out) const override; void purge(std::ostream& out, bool porcelain, bool doit) const override; + void gatherAuxiliaryURIs() override; + private: // members const Store& store_; + std::set deleteAuxFiles_; + std::set keepAuxFiles_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/toc/TocStore.cc b/src/fdb5/toc/TocStore.cc index fc0ddf407..13b9b29de 100644 --- a/src/fdb5/toc/TocStore.cc +++ b/src/fdb5/toc/TocStore.cc @@ -35,7 +35,9 @@ namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- TocStore::TocStore(const Schema& schema, const Key& key, const Config& config) : - Store(schema), TocCommon(StoreRootManager(config).directory(key).directory_) {} + Store(schema), + TocCommon(StoreRootManager(config).directory(key).directory_), + auxFileExtensions_{auxFileExtensions()} {} eckit::URI TocStore::uri() const { @@ -64,6 +66,13 @@ bool TocStore::uriExists(const eckit::URI& uri) const { } +bool TocStore::auxiliaryURIExists(const eckit::URI& uri) const { + ASSERT(uri.scheme() == type()); + eckit::PathName p(uri.path()); + ASSERT(p.dirName().sameAs(directory_)); + return p.exists(); +} + std::vector TocStore::collocatedDataURIs() const { std::vector files; @@ -284,8 +293,33 @@ bool TocStore::canMoveTo(const Key& key, const Config& config, const eckit::URI& // src.copyTo(dest); // } +eckit::URI TocStore::getAuxiliaryURI(const eckit::URI& uri, const std::string& ext) const { + // Filebackend: ext is a suffix to append to the file name + ASSERT(uri.scheme() == type()); + eckit::PathName path = uri.path() + "." + ext; + return eckit::URI(type(), path); +} + +std::vector TocStore::getAuxiliaryURIs(const eckit::URI& uri) const { + ASSERT(uri.scheme() == type()); + std::vector uris; + for (const auto& e : LibFdb5::instance().auxiliaryRegistry()) { + uris.push_back(getAuxiliaryURI(uri, e)); + } + return uris; +} + +std::set TocStore::auxFileExtensions() const { + std::set extensions; + for (const auto& e : LibFdb5::instance().auxiliaryRegistry()) { + extensions.insert("." + e); + } + return extensions; +} + void TocStore::moveTo(const Key& key, const Config& config, const eckit::URI& dest, eckit::Queue& queue) const { eckit::PathName destPath = dest.path(); + for (const eckit::PathName& root: StoreRootManager(config).canMoveToRoots(key)) { if (root.sameAs(destPath)) { eckit::PathName src_db = directory_; @@ -298,9 +332,11 @@ void TocStore::moveTo(const Key& key, const Config& config, const eckit::URI& de while ((dp = ::readdir(dirp)) != NULL) { if (strstr( dp->d_name, ".data")) { eckit::PathName file(src_db / dp->d_name); - struct stat fileStat; - ::stat(file.asString().c_str(), &fileStat); - files.emplace(fileStat.st_size, new FileCopy(src_db.path(), dest_db, dp->d_name)); + if ((file.extension() == ".data") || auxFileExtensions_.find(file.extension()) != auxFileExtensions_.end()) { + struct stat fileStat; + SYSCALL(::stat(file.asString().c_str(), &fileStat)); + files.emplace(fileStat.st_size, new FileCopy(src_db.path(), dest_db, dp->d_name)); + } } } closedir(dirp); diff --git a/src/fdb5/toc/TocStore.h b/src/fdb5/toc/TocStore.h index 9527c82ec..13f079920 100644 --- a/src/fdb5/toc/TocStore.h +++ b/src/fdb5/toc/TocStore.h @@ -53,6 +53,10 @@ class TocStore : public Store, public TocCommon { void moveTo(const Key& key, const Config& config, const eckit::URI& dest, eckit::Queue& queue) const override; void remove(const Key& key) const override; + std::vector getAuxiliaryURIs(const eckit::URI&) const override; + bool auxiliaryURIExists(const eckit::URI&) const override; + std::set auxFileExtensions() const; + protected: // methods std::string type() const override { return "file"; } @@ -76,6 +80,9 @@ class TocStore : public Store, public TocCommon { void print( std::ostream &out ) const override; +private: // methods + eckit::URI getAuxiliaryURI(const eckit::URI&, const std::string& ext) const; + private: // types typedef std::map< std::string, eckit::DataHandle * > HandleStore; @@ -87,6 +94,8 @@ class TocStore : public Store, public TocCommon { mutable PathStore dataPaths_; + std::set auxFileExtensions_; + }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/toc/TocWipeVisitor.cc b/src/fdb5/toc/TocWipeVisitor.cc index d9f952eca..c7a036bde 100644 --- a/src/fdb5/toc/TocWipeVisitor.cc +++ b/src/fdb5/toc/TocWipeVisitor.cc @@ -115,6 +115,7 @@ bool TocWipeVisitor::visitDatabase(const Catalogue& catalogue, const Store& stor ASSERT(lockfilePaths_.empty()); ASSERT(indexPaths_.empty()); ASSERT(dataPaths_.empty()); + ASSERT(auxiliaryDataPaths_.empty()); ASSERT(safePaths_.empty()); ASSERT(indexesToMask_.empty()); @@ -163,6 +164,7 @@ bool TocWipeVisitor::visitIndex(const Index& index) { std::vector indexDataPaths(index.dataURIs()); for (const eckit::URI& uri : store_.asCollocatedDataURIs(indexDataPaths)) { + auto auxPaths = getAuxiliaryPaths(uri); if (include) { if (!store_.uriBelongs(uri)) { Log::error() << "Index to be deleted has pointers to fields that don't belong to the configured store." << std::endl; @@ -172,14 +174,25 @@ bool TocWipeVisitor::visitIndex(const Index& index) { NOTIMP; } dataPaths_.insert(eckit::PathName(uri.path())); + auxiliaryDataPaths_.insert(auxPaths.begin(), auxPaths.end()); } else { safePaths_.insert(eckit::PathName(uri.path())); + safePaths_.insert(auxPaths.begin(), auxPaths.end()); } } return true; // Explore contained entries } +std::vector TocWipeVisitor::getAuxiliaryPaths(const eckit::URI& dataURI) { + // todo: in future, we should be using URIs, not paths. + std::vector paths; + for (const auto& auxURI : store_.getAuxiliaryURIs(dataURI)) { + if (store_.auxiliaryURIExists(auxURI)) paths.push_back(auxURI.path()); + } + return paths; +} + void TocWipeVisitor::addMaskedPaths() { //ASSERT(indexRequest_.empty()); @@ -198,7 +211,11 @@ void TocWipeVisitor::addMaskedPaths() { } } for (const auto& uri : data) { - if (store_.uriBelongs(uri)) dataPaths_.insert(eckit::PathName(uri.path())); + if (store_.uriBelongs(uri)) { + dataPaths_.insert(uri.path()); + auto auxPaths = getAuxiliaryPaths(uri); + auxiliaryDataPaths_.insert(auxPaths.begin(), auxPaths.end()); + } } } @@ -228,7 +245,7 @@ void TocWipeVisitor::ensureSafePaths() { if (safePaths_.find(schemaPath_) != safePaths_.end()) schemaPath_ = ""; for (const auto& p : safePaths_) { - for (std::set* s : {&subtocPaths_, &lockfilePaths_, &indexPaths_, &dataPaths_}) { + for (std::set* s : {&subtocPaths_, &lockfilePaths_, &indexPaths_, &dataPaths_, &auxiliaryDataPaths_}) { s->erase(p); } } @@ -237,6 +254,7 @@ void TocWipeVisitor::calculateResidualPaths() { // Remove paths to non-existant files. This is reasonable as we may be recovering from a // previous failed, partial wipe. As such, referenced files may not exist any more. + // NB: Not needed for auxiliaryDataPaths_ as their existence is checked in getAuxiliaryPaths() for (std::set* fileset : {&subtocPaths_, &lockfilePaths_, &indexPaths_}) { for (std::set::iterator it = fileset->begin(); it != fileset->end(); ) { @@ -279,6 +297,7 @@ void TocWipeVisitor::calculateResidualPaths() { if (schemaPath_.asString().size()) deletePaths.insert(schemaPath_); + deletePaths.insert(auxiliaryDataPaths_.begin(), auxiliaryDataPaths_.end()); std::vector allPathsVector; StdDir(catalogue_.basePath()).children(allPathsVector); std::set allPaths(allPathsVector.begin(), allPathsVector.end()); @@ -389,6 +408,13 @@ void TocWipeVisitor::report(bool wipeAll) { } out_ << std::endl; + out_ << "Auxiliary files to delete: " << std::endl; + if (auxiliaryDataPaths_.empty()) out_ << " - NONE -" << std::endl; + for (const auto& f : auxiliaryDataPaths_) { + out_ << " " << f << std::endl; + } + out_ << std::endl; + if (store_.type() != "file") { out_ << "Store URI to delete:" << std::endl; if (wipeAll) { @@ -479,6 +505,13 @@ void TocWipeVisitor::wipe(bool wipeAll) { } } + for (const PathName& path : auxiliaryDataPaths_) { + eckit::URI uri(store_.type(), path); + if (store_.auxiliaryURIExists(uri)) { + store_.remove(uri, logAlways, logVerbose, doit_); + } + } + if (wipeAll && store_.type() != "file") /// @todo: if the store is holding catalogue information (e.g. daos KVs) it /// should not be removed diff --git a/src/fdb5/toc/TocWipeVisitor.h b/src/fdb5/toc/TocWipeVisitor.h index 6b1f88c18..8ccdbef4f 100644 --- a/src/fdb5/toc/TocWipeVisitor.h +++ b/src/fdb5/toc/TocWipeVisitor.h @@ -45,6 +45,7 @@ class TocWipeVisitor : public WipeVisitor { void addMetadataPaths(); void ensureSafePaths(); void calculateResidualPaths(); + std::vector getAuxiliaryPaths(const eckit::URI& uri); bool anythingToWipe() const; @@ -68,6 +69,7 @@ class TocWipeVisitor : public WipeVisitor { std::set lockfilePaths_; std::set indexPaths_; std::set dataPaths_; + std::set auxiliaryDataPaths_; std::set safePaths_; std::set residualPaths_; diff --git a/tests/fdb/api/CMakeLists.txt b/tests/fdb/api/CMakeLists.txt index cf631cdbd..5e5e58847 100644 --- a/tests/fdb/api/CMakeLists.txt +++ b/tests/fdb/api/CMakeLists.txt @@ -27,7 +27,8 @@ list( APPEND api_tests select dist fdb_c - archive_callback + callback + auxiliary ) foreach( _test ${api_tests} ) diff --git a/tests/fdb/api/test_auxiliary.cc b/tests/fdb/api/test_auxiliary.cc new file mode 100644 index 000000000..fe83b307c --- /dev/null +++ b/tests/fdb/api/test_auxiliary.cc @@ -0,0 +1,147 @@ +#include "eckit/testing/Test.h" +#include "metkit/mars/MarsRequest.h" +#include "fdb5/api/FDB.h" +#include "fdb5/api/helpers/FDBToolRequest.h" + +namespace fdb5::test { + +//---------------------------------------------------------------------------------------------------------------------- + +std::set extensions = {"foo", "bar"}; + +eckit::PathName writeAuxiliaryData(const eckit::PathName datapath, const std::string ext) { + eckit::PathName auxpath(datapath + "." + ext); + std::string data_str = "Some extra data"; + const void* data = static_cast(data_str.c_str()); + size_t length = data_str.size(); + eckit::FileHandle file(auxpath); + file.openForWrite(0); + file.write(data, length); + file.close(); + return auxpath; +} + +std::set setup(FDB& fdb) { + // Setup: Write data, generating auxiliary files using the archive callback + std::set auxPaths; + fdb.registerArchiveCallback([&auxPaths] (const Key& key, const void* data, size_t length, std::future> future) { + std::shared_ptr location = future.get(); + for (const auto& ext : extensions) { + auxPaths.insert(writeAuxiliaryData(location->uri().path(), ext)); + } + }); + + std::string data_str = "Raining cats and dogs"; + const void* data = static_cast(data_str.c_str()); + size_t length = data_str.size(); + + Key key; + key.set("class","od"); + key.set("expver","xxxx"); + key.set("type","fc"); + key.set("stream","oper"); + key.set("date","20101010"); + key.set("time","0000"); + key.set("domain","g"); + key.set("levtype","sfc"); + key.set("param","130"); + + key.set("step","1"); + fdb.archive(key, data, length); + + key.set("date","20111213"); + fdb.archive(key, data, length); + + key.set("type","pf"); + fdb.archive(key, data, length); + + fdb.flush(); + + return auxPaths; +} + +//---------------------------------------------------------------------------------------------------------------------- + +CASE("Wipe with extensions") { + + ::setenv("FDB_AUX_EXTENSIONS", "foo,bar", 1); + + FDB fdb; + std::set auxPaths = setup(fdb); + EXPECT(auxPaths.size() == 6); + for (const auto& auxPath : auxPaths) { + EXPECT(auxPath.exists()); + } + + // call wipe + FDBToolRequest request = FDBToolRequest::requestsFromString("class=od,expver=xxxx")[0]; + bool doit = true; + auto listObject = fdb.wipe(request, doit); + + WipeElement elem; + while (listObject.next(elem)) { + eckit::Log::info() << elem << std::endl; + } + + // Check that the auxiliary files have been removed + for (const auto& auxPath : auxPaths) { + EXPECT(!auxPath.exists()); + } +} + +CASE("Purge with extensions") { + + ::setenv("FDB_AUX_EXTENSIONS", "foo,bar", 1); + + std::set auxPathsDelete; + + // Archive the same data three times + for (int i = 0; i < 2; i++) { + FDB fdb; + auto aux = setup(fdb); + auxPathsDelete.insert(aux.begin(), aux.end()); + } + FDB fdb; + auto auxPathsKeep = setup(fdb); + + EXPECT(auxPathsDelete.size() == 12); + EXPECT(auxPathsKeep.size() == 6); + + for (const auto& auxPath : auxPathsDelete) { + EXPECT(auxPath.exists()); + } + for (const auto& auxPath : auxPathsKeep) { + EXPECT(auxPath.exists()); + } + + // call purge + FDBToolRequest request = FDBToolRequest::requestsFromString("class=od,expver=xxxx")[0]; + bool doit = true; + auto listObject = fdb.purge(request, doit, false); + + PurgeElement elem; + while (listObject.next(elem)) { + eckit::Log::info () << elem << std::endl; + } + + // Check that the masked auxiliary files have been removed + for (const auto& auxPath : auxPathsDelete) { + EXPECT(!auxPath.exists()); + } + + // Check that the unmasked auxiliary files have not been removed + for (const auto& auxPath : auxPathsKeep) { + EXPECT(auxPath.exists()); + } +} + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace fdb5::test + +int main(int argc, char** argv) { + + eckit::Log::info() << ::getenv("FDB_HOME") << std::endl; + + return ::eckit::testing::run_tests(argc, argv); +} diff --git a/tests/fdb/api/test_archive_callback.cc b/tests/fdb/api/test_callback.cc similarity index 80% rename from tests/fdb/api/test_archive_callback.cc rename to tests/fdb/api/test_callback.cc index ced87fa5b..60449c883 100644 --- a/tests/fdb/api/test_archive_callback.cc +++ b/tests/fdb/api/test_callback.cc @@ -4,7 +4,7 @@ namespace fdb5::test { //---------------------------------------------------------------------------------------------------------------------- -CASE("Archive callback") { +CASE("Archive and flush callback") { FDB fdb; std::string data_str = "Raining cats and dogs"; @@ -24,9 +24,15 @@ CASE("Archive callback") { std::map map; std::vector keys; + bool flushCalled = false; - fdb.registerCallback([&map] (const fdb5::Key& key, const fdb5::FieldLocation& location) { - map[key] = location.fullUri(); + fdb.registerArchiveCallback([&map] (const Key& key, const void* data, size_t length, std::future> future) { + std::shared_ptr location = future.get(); + map[key] = location->fullUri(); + }); + + fdb.registerFlushCallback([&flushCalled] () { + flushCalled = true; }); key.set("step","1"); @@ -43,6 +49,8 @@ CASE("Archive callback") { fdb.flush(); + EXPECT(flushCalled); + EXPECT(map.size() == 3); // for (const auto& [key, uri] : map) { diff --git a/tests/fdb/tools/CMakeLists.txt b/tests/fdb/tools/CMakeLists.txt index 939db703a..6ff2df23a 100644 --- a/tests/fdb/tools/CMakeLists.txt +++ b/tests/fdb/tools/CMakeLists.txt @@ -12,3 +12,5 @@ foreach( _t ${fdb_tools_tests} ) COMMAND ${_t}.sh) endforeach() + +add_subdirectory( auxiliary ) diff --git a/tests/fdb/tools/auxiliary/CMakeLists.txt b/tests/fdb/tools/auxiliary/CMakeLists.txt new file mode 100644 index 000000000..aa8fcd9cf --- /dev/null +++ b/tests/fdb/tools/auxiliary/CMakeLists.txt @@ -0,0 +1,9 @@ +if (HAVE_FDB_BUILD_TOOLS) + ecbuild_configure_file( move_auxiliary.sh.in fdb_move_auxiliary.sh @ONLY ) + + ecbuild_add_test( + TYPE SCRIPT + CONDITION HAVE_FDB_BUILD_TOOLS + COMMAND fdb_move_auxiliary.sh) +endif() + diff --git a/tests/fdb/tools/auxiliary/move_auxiliary.sh.in b/tests/fdb/tools/auxiliary/move_auxiliary.sh.in new file mode 100755 index 000000000..9e1868d13 --- /dev/null +++ b/tests/fdb/tools/auxiliary/move_auxiliary.sh.in @@ -0,0 +1,59 @@ +#!/usr/bin/env bash + +set -eux + +fdbread="$" +fdbwrite="$" +fdbmove="$" + +srcdir=@CMAKE_CURRENT_SOURCE_DIR@ +bindir=@CMAKE_CURRENT_BINARY_DIR@ + +export FDB5_HOME=$bindir +export FDB5_CONFIG_FILE=${bindir}/aux_config.yaml +export FDB_AUX_EXTENSIONS="foo,bar" + +cd $bindir + +### cleanup and prepare test +root1=${bindir}/aux_root1 +root2=${bindir}/aux_root2 +rm -rf ${root1} ${root2} +mkdir -p ${root1} ${root2} + +cp ${srcdir}/schema $bindir + +cat < ${bindir}/aux_config.yaml +--- +type: local +engine: toc +schema: ./schema +spaces: +- handler: Default + roots: + - path: ${root1} + - path: ${root2} +EOF + +# Write some test data +$fdbwrite ${srcdir}/x.grib + +# Write aux data manually +dbname="mc:0001:oper:20211015:0000:g" +for f in $(find ${root1}/${dbname} -name "*.data"); do + echo "aux data" > ${f}.foo + echo "aux data" > ${f}.bar + +done + +# Call fdb move +req="class=mc,expver=0001,stream=oper,date=20211015,time=0000,domain=g" +$fdbmove --dest=${bindir}/aux_root2 --keep=1 $req + +# Check that the aux files have been moved +for f in $(find ${root2}/${dbname} -name "*.data"); do + [ -f ${f}.foo ] + [ -f ${f}.bar ] +done + +echo "Passed" diff --git a/tests/fdb/tools/auxiliary/schema b/tests/fdb/tools/auxiliary/schema new file mode 120000 index 000000000..3e8dcecfe --- /dev/null +++ b/tests/fdb/tools/auxiliary/schema @@ -0,0 +1 @@ +../../../regressions/FDB-282/schema \ No newline at end of file diff --git a/tests/fdb/tools/auxiliary/x.grib b/tests/fdb/tools/auxiliary/x.grib new file mode 120000 index 000000000..3bced6b50 --- /dev/null +++ b/tests/fdb/tools/auxiliary/x.grib @@ -0,0 +1 @@ +../../../regressions/FDB-282/in.grib \ No newline at end of file