diff --git a/src/fdb5/CMakeLists.txt b/src/fdb5/CMakeLists.txt index 6b494bb29..41f61c69b 100644 --- a/src/fdb5/CMakeLists.txt +++ b/src/fdb5/CMakeLists.txt @@ -48,7 +48,7 @@ list( APPEND fdb5_srcs api/helpers/PurgeIterator.h api/helpers/StatsIterator.cc api/helpers/StatsIterator.h - api/helpers/ArchiveCallback.h + api/helpers/Callback.h api/local/QueryVisitor.h api/local/QueueStringLogTarget.h diff --git a/src/fdb5/LibFdb5.cc b/src/fdb5/LibFdb5.cc index ec688c31e..161f3c407 100644 --- a/src/fdb5/LibFdb5.cc +++ b/src/fdb5/LibFdb5.cc @@ -48,6 +48,14 @@ const Config& LibFdb5::defaultConfig(const eckit::Configuration& userConfig) { return *config_; } +ConstructorCallback LibFdb5::constructorCallback() { + return constructorCallback_; +} + +void LibFdb5::registerConstructorCallback(ConstructorCallback cb) { + constructorCallback_ = cb; +} + bool LibFdb5::dontDeregisterFactories() const { #if eckit_VERSION_MAJOR > 1 || (eckit_VERSION_MAJOR == 1 && (eckit_VERSION_MINOR > 17 || (eckit_VERSION_MINOR == 17 && eckit_VERSION_PATCH >0))) return eckit::LibEcKit::instance().dontDeregisterFactories(); diff --git a/src/fdb5/LibFdb5.h b/src/fdb5/LibFdb5.h index 524c93b9c..2e1020a5d 100644 --- a/src/fdb5/LibFdb5.h +++ b/src/fdb5/LibFdb5.h @@ -21,6 +21,7 @@ #include "fdb5/database/DB.h" #include "fdb5/types/TypesRegistry.h" +#include "fdb5/api/helpers/Callback.h" namespace fdb5 { @@ -75,6 +76,10 @@ class LibFdb5 : public eckit::system::Library { bool dontDeregisterFactories() const; + void registerConstructorCallback(ConstructorCallback cb); + + ConstructorCallback constructorCallback(); + protected: virtual std::string version() const; @@ -82,6 +87,7 @@ class LibFdb5 : public eckit::system::Library { private: std::unique_ptr config_; + ConstructorCallback constructorCallback_ = CALLBACK_CONSTRUCTOR_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 84a302225..4ae6edc0f 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -20,6 +20,9 @@ #include "eckit/message/Message.h" #include "eckit/message/Reader.h" +#include "eckit/system/Plugin.h" +#include "eckit/system/LibraryManager.h" + #include "metkit/hypercube/HyperCubePayloaded.h" #include "fdb5/LibFdb5.h" @@ -38,7 +41,10 @@ namespace fdb5 { FDB::FDB(const Config &config) : internal_(FDBFactory::instance().build(config)), dirty_(false), - reportStats_(config.getBool("statistics", false)) {} + reportStats_(config.getBool("statistics", false)) { + eckit::system::LibraryManager::autoLoadPlugins({}); + LibFdb5::instance().constructorCallback()(*this); +} FDB::~FDB() { @@ -283,11 +289,11 @@ void FDB::print(std::ostream& s) const { void FDB::flush() { if (dirty_) { - eckit::Timer timer; timer.start(); internal_->flush(); + flushCallback_(); dirty_ = false; timer.stop(); @@ -321,8 +327,12 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const { return internal_->enabled(controlIdentifier); } -void FDB::registerCallback(ArchiveCallback callback) { - internal_->registerCallback(callback); +void FDB::registerArchiveCallback(ArchiveCallback callback) { // todo rename + internal_->registerArchiveCallback(callback); +} + +void FDB::registerFlushCallback(FlushCallback callback) { // todo rename + flushCallback_ = callback; } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDB.h b/src/fdb5/api/FDB.h index 69794859d..6be852565 100644 --- a/src/fdb5/api/FDB.h +++ b/src/fdb5/api/FDB.h @@ -34,7 +34,7 @@ #include "fdb5/api/helpers/WipeIterator.h" #include "fdb5/api/helpers/MoveIterator.h" #include "fdb5/config/Config.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { namespace message { @@ -119,7 +119,8 @@ class FDB { bool dirty() const; - void registerCallback(ArchiveCallback callback); + void registerArchiveCallback(ArchiveCallback callback); + void registerFlushCallback(FlushCallback callback); // -------------- API management ---------------------------- @@ -155,6 +156,7 @@ class FDB { FDBStats stats_; + FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDBFactory.h b/src/fdb5/api/FDBFactory.h index ed526421b..6cd3c1c37 100644 --- a/src/fdb5/api/FDBFactory.h +++ b/src/fdb5/api/FDBFactory.h @@ -37,7 +37,7 @@ #include "fdb5/api/helpers/PurgeIterator.h" #include "fdb5/api/helpers/StatsIterator.h" #include "fdb5/api/helpers/StatusIterator.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { namespace message { @@ -91,7 +91,7 @@ class FDBBase : private eckit::NonCopyable { virtual AxesIterator axes(const FDBToolRequest& request, int axes) { NOTIMP; } - void registerCallback(ArchiveCallback callback) {callback_ = callback;} + void registerArchiveCallback(ArchiveCallback callback) {callback_ = callback;} // -------------- API management ---------------------------- diff --git a/src/fdb5/api/helpers/ArchiveCallback.h b/src/fdb5/api/helpers/Callback.h similarity index 54% rename from src/fdb5/api/helpers/ArchiveCallback.h rename to src/fdb5/api/helpers/Callback.h index a98dc894a..8b9f04409 100644 --- a/src/fdb5/api/helpers/ArchiveCallback.h +++ b/src/fdb5/api/helpers/Callback.h @@ -14,14 +14,20 @@ */ #pragma once - +#include #include "fdb5/database/Key.h" #include "fdb5/database/FieldLocation.h" namespace fdb5 { -using ArchiveCallback = std::function; +class FDB; + +using ArchiveCallback = std::function>)>; +using FlushCallback = std::function; +using ConstructorCallback = std::function; -static const ArchiveCallback CALLBACK_NOOP = [](const Key&, const FieldLocation&) {}; +static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, std::future>) {}; +static const FlushCallback CALLBACK_FLUSH_NOOP = []() {}; +static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](FDB&) {}; } // namespace fdb5 diff --git a/src/fdb5/daos/DaosCatalogueWriter.cc b/src/fdb5/daos/DaosCatalogueWriter.cc index 3a9f0b661..fc3927f9c 100644 --- a/src/fdb5/daos/DaosCatalogueWriter.cc +++ b/src/fdb5/daos/DaosCatalogueWriter.cc @@ -226,7 +226,7 @@ const Index& DaosCatalogueWriter::currentIndex() { /// @todo: other writers may be simultaneously updating the axes KeyValues in DAOS. Should these /// new updates be retrieved and put into in-memory axes from time to time, e.g. every /// time a value is put in an axis KeyValue? -void DaosCatalogueWriter::archive(const Key& key, std::unique_ptr fieldLocation) { +void DaosCatalogueWriter::archive(const Key& key, std::shared_ptr fieldLocation) { if (current_.null()) { ASSERT(!currentIndexKey_.empty()); diff --git a/src/fdb5/daos/DaosCatalogueWriter.h b/src/fdb5/daos/DaosCatalogueWriter.h index b738a6420..44fcd7384 100644 --- a/src/fdb5/daos/DaosCatalogueWriter.h +++ b/src/fdb5/daos/DaosCatalogueWriter.h @@ -56,7 +56,7 @@ class DaosCatalogueWriter : public DaosCatalogue, public CatalogueWriter { void clean() override; void close() override; - void archive(const Key& key, std::unique_ptr fieldLocation) override; + void archive(const Key& key, std::shared_ptr fieldLocation) override; virtual void print( std::ostream &out ) const override { NOTIMP; } diff --git a/src/fdb5/database/Catalogue.h b/src/fdb5/database/Catalogue.h index 1d7be79fc..c2b344701 100644 --- a/src/fdb5/database/Catalogue.h +++ b/src/fdb5/database/Catalogue.h @@ -121,7 +121,7 @@ class CatalogueReader { class CatalogueWriter { public: virtual const Index& currentIndex() = 0; - virtual void archive(const Key& key, std::unique_ptr fieldLocation) = 0; + virtual void archive(const Key& key, std::shared_ptr fieldLocation) = 0; virtual void overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) = 0; virtual void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) = 0; virtual void reconsolidate() = 0; diff --git a/src/fdb5/database/DB.cc b/src/fdb5/database/DB.cc index 56d5c2a3c..b916612f1 100644 --- a/src/fdb5/database/DB.cc +++ b/src/fdb5/database/DB.cc @@ -14,7 +14,6 @@ #include "fdb5/database/DB.h" #include "fdb5/database/Field.h" #include "fdb5/toc/TocEngine.h" -#include "fdb5/api/helpers/ArchiveCallback.h" using eckit::Log; @@ -110,9 +109,14 @@ void DB::archive(const Key& key, const void* data, eckit::Length length, const K ASSERT(cat); const Index& idx = cat->currentIndex(); - std::unique_ptr location(store().archive(idx.key(), data, length)); - callback(field, *location); + std::shared_ptr location(store().archive(idx.key(), data, length)); + + // In anticipaton of store().archive() working asynchronously in later FDB versions. + std::promise> promise; + promise.set_value(location); + + callback(field, data, length, promise.get_future()); cat->archive(key, std::move(location)); } diff --git a/src/fdb5/database/DB.h b/src/fdb5/database/DB.h index fcb6735c1..ee80a826f 100644 --- a/src/fdb5/database/DB.h +++ b/src/fdb5/database/DB.h @@ -23,7 +23,7 @@ #include "fdb5/database/EntryVisitMechanism.h" #include "fdb5/database/Key.h" #include "fdb5/database/Store.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { class DataHandle; diff --git a/src/fdb5/database/Field.cc b/src/fdb5/database/Field.cc index 53d3a7be9..a29dd3b60 100644 --- a/src/fdb5/database/Field.cc +++ b/src/fdb5/database/Field.cc @@ -16,7 +16,7 @@ namespace fdb5 { Field::Field() {} -Field::Field(std::unique_ptr location, time_t timestamp, const FieldDetails& details): +Field::Field(std::shared_ptr location, time_t timestamp, const FieldDetails& details): location_(std::move(location)), timestamp_(timestamp), details_(details) { diff --git a/src/fdb5/database/Field.h b/src/fdb5/database/Field.h index b49ef3753..4ce77a3e3 100644 --- a/src/fdb5/database/Field.h +++ b/src/fdb5/database/Field.h @@ -43,7 +43,7 @@ class Field { Field(); - Field(std::unique_ptr location, time_t timestamp, const FieldDetails& details = FieldDetails()); + Field(std::shared_ptr location, time_t timestamp, const FieldDetails& details = FieldDetails()); Field(const FieldLocation&& location, time_t timestamp, const FieldDetails& details = FieldDetails()); eckit::DataHandle* dataHandle() const { return location_->dataHandle(); } diff --git a/src/fdb5/toc/TocCatalogueWriter.cc b/src/fdb5/toc/TocCatalogueWriter.cc index 8bf0f8078..feea5c9aa 100644 --- a/src/fdb5/toc/TocCatalogueWriter.cc +++ b/src/fdb5/toc/TocCatalogueWriter.cc @@ -294,7 +294,7 @@ bool TocCatalogueWriter::enabled(const ControlIdentifier& controlIdentifier) con return TocCatalogue::enabled(controlIdentifier); } -void TocCatalogueWriter::archive(const Key& key, std::unique_ptr fieldLocation) { +void TocCatalogueWriter::archive(const Key& key, std::shared_ptr fieldLocation) { dirty_ = true; if (current_.null()) { diff --git a/src/fdb5/toc/TocCatalogueWriter.h b/src/fdb5/toc/TocCatalogueWriter.h index 2d2caa88f..040799e43 100644 --- a/src/fdb5/toc/TocCatalogueWriter.h +++ b/src/fdb5/toc/TocCatalogueWriter.h @@ -70,7 +70,7 @@ class TocCatalogueWriter : public TocCatalogue, public CatalogueWriter { void clean() override; void close() override; - void archive(const Key& key, std::unique_ptr fieldLocation) override; + void archive(const Key& key, std::shared_ptr fieldLocation) override; void reconsolidateIndexesAndTocs(); virtual void print( std::ostream &out ) const override; diff --git a/tests/fdb/api/CMakeLists.txt b/tests/fdb/api/CMakeLists.txt index cf631cdbd..a61a69aa0 100644 --- a/tests/fdb/api/CMakeLists.txt +++ b/tests/fdb/api/CMakeLists.txt @@ -27,7 +27,7 @@ list( APPEND api_tests select dist fdb_c - archive_callback + callback ) foreach( _test ${api_tests} ) diff --git a/tests/fdb/api/test_archive_callback.cc b/tests/fdb/api/test_callback.cc similarity index 80% rename from tests/fdb/api/test_archive_callback.cc rename to tests/fdb/api/test_callback.cc index ced87fa5b..60449c883 100644 --- a/tests/fdb/api/test_archive_callback.cc +++ b/tests/fdb/api/test_callback.cc @@ -4,7 +4,7 @@ namespace fdb5::test { //---------------------------------------------------------------------------------------------------------------------- -CASE("Archive callback") { +CASE("Archive and flush callback") { FDB fdb; std::string data_str = "Raining cats and dogs"; @@ -24,9 +24,15 @@ CASE("Archive callback") { std::map map; std::vector keys; + bool flushCalled = false; - fdb.registerCallback([&map] (const fdb5::Key& key, const fdb5::FieldLocation& location) { - map[key] = location.fullUri(); + fdb.registerArchiveCallback([&map] (const Key& key, const void* data, size_t length, std::future> future) { + std::shared_ptr location = future.get(); + map[key] = location->fullUri(); + }); + + fdb.registerFlushCallback([&flushCalled] () { + flushCalled = true; }); key.set("step","1"); @@ -43,6 +49,8 @@ CASE("Archive callback") { fdb.flush(); + EXPECT(flushCalled); + EXPECT(map.size() == 3); // for (const auto& [key, uri] : map) {