From fcfb82634dd0730db3c8e373fd0c7c89939df431 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Mon, 8 Jul 2024 17:22:15 +0100 Subject: [PATCH 1/5] Flush and Post-archive callback, and init gribjump plugin --- src/fdb5/api/FDB.cc | 37 +++++++++++++++++++++++++- src/fdb5/api/FDB.h | 6 +++++ src/fdb5/api/helpers/ArchiveCallback.h | 5 ++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 84a302225..66c203a30 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -13,6 +13,8 @@ * (Project ID: 671951) www.nextgenio.eu */ +#include + #include "eckit/config/Resource.h" #include "eckit/io/DataHandle.h" #include "eckit/io/MemoryHandle.h" @@ -20,6 +22,9 @@ #include "eckit/message/Message.h" #include "eckit/message/Reader.h" +#include "eckit/system/Plugin.h" +#include "eckit/system/LibraryManager.h" + #include "metkit/hypercube/HyperCubePayloaded.h" #include "fdb5/LibFdb5.h" @@ -38,7 +43,10 @@ namespace fdb5 { FDB::FDB(const Config &config) : internal_(FDBFactory::instance().build(config)), dirty_(false), - reportStats_(config.getBool("statistics", false)) {} + reportStats_(config.getBool("statistics", false)) { + + initPlugins(config); +} FDB::~FDB() { @@ -123,6 +131,8 @@ void FDB::archive(const Key& key, const void* data, size_t length) { timer.stop(); stats_.addArchive(length, timer); + + postArchiveCallback_(key, data, length); } bool FDB::sorted(const metkit::mars::MarsRequest &request) { @@ -283,6 +293,7 @@ void FDB::print(std::ostream& s) const { void FDB::flush() { if (dirty_) { + flushCallback_(); eckit::Timer timer; timer.start(); @@ -325,6 +336,30 @@ void FDB::registerCallback(ArchiveCallback callback) { internal_->registerCallback(callback); } +void FDB::registerCallback(FlushCallback callback) { + flushCallback_ = callback; +} + +void FDB::registerCallback(PostArchiveCallback callback) { + postArchiveCallback_ = callback; +} + +void FDB::initPlugins(const Config& config){ + // TODO: from config, with env var to force it OFF. + bool enableGribjump = eckit::Resource("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false); + bool disableGribjump = eckit::Resource("fdbDisableGribjump;$FDB_DISABLE_GRIBJUMP", false); // Emergency off-switch, takes precendence. + + /* Can we do this without dlsym and extern C shenanigans? Need to pass fdb to gribjump somehow... */ + if (enableGribjump && !disableGribjump) { + eckit::system::Plugin& plugin = eckit::system::LibraryManager::loadPlugin("gribjump"); + + using SetupFunction = void (*)(fdb5::FDB&); + SetupFunction setup = (SetupFunction) dlsym(plugin.handle(), "gribjump_plugin_setup"); + ASSERT(setup); + setup(*this); + } + +} //---------------------------------------------------------------------------------------------------------------------- } // namespace fdb5 diff --git a/src/fdb5/api/FDB.h b/src/fdb5/api/FDB.h index 69794859d..2e0d21630 100644 --- a/src/fdb5/api/FDB.h +++ b/src/fdb5/api/FDB.h @@ -120,6 +120,8 @@ class FDB { bool dirty() const; void registerCallback(ArchiveCallback callback); + void registerCallback(FlushCallback callback); + void registerCallback(PostArchiveCallback callback); // -------------- API management ---------------------------- @@ -146,6 +148,8 @@ class FDB { bool sorted(const metkit::mars::MarsRequest &request); + void initPlugins(const Config& config); + private: // members std::unique_ptr internal_; @@ -155,6 +159,8 @@ class FDB { FDBStats stats_; + FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP; + PostArchiveCallback postArchiveCallback_ = POST_ARCHIVE_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/helpers/ArchiveCallback.h b/src/fdb5/api/helpers/ArchiveCallback.h index a98dc894a..350118a27 100644 --- a/src/fdb5/api/helpers/ArchiveCallback.h +++ b/src/fdb5/api/helpers/ArchiveCallback.h @@ -21,7 +21,12 @@ namespace fdb5 { using ArchiveCallback = std::function; +using FlushCallback = std::function; +using PostArchiveCallback = std::function; static const ArchiveCallback CALLBACK_NOOP = [](const Key&, const FieldLocation&) {}; +static const FlushCallback CALLBACK_FLUSH_NOOP = []() {}; +static const PostArchiveCallback POST_ARCHIVE_NOOP = [](const Key&, const void*, size_t) {}; + } // namespace fdb5 From e6b9c3d03aea7263b3c7ec86eaa2fdde796e4c84 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Wed, 10 Jul 2024 15:40:25 +0100 Subject: [PATCH 2/5] Use new plugin.setup --- src/fdb5/api/FDB.cc | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 66c203a30..6dd9a98c9 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -345,18 +345,12 @@ void FDB::registerCallback(PostArchiveCallback callback) { } void FDB::initPlugins(const Config& config){ - // TODO: from config, with env var to force it OFF. - bool enableGribjump = eckit::Resource("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false); + bool enableGribjump = eckit::Resource("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false); // TODO: Make this a config option. bool disableGribjump = eckit::Resource("fdbDisableGribjump;$FDB_DISABLE_GRIBJUMP", false); // Emergency off-switch, takes precendence. - /* Can we do this without dlsym and extern C shenanigans? Need to pass fdb to gribjump somehow... */ if (enableGribjump && !disableGribjump) { eckit::system::Plugin& plugin = eckit::system::LibraryManager::loadPlugin("gribjump"); - - using SetupFunction = void (*)(fdb5::FDB&); - SetupFunction setup = (SetupFunction) dlsym(plugin.handle(), "gribjump_plugin_setup"); - ASSERT(setup); - setup(*this); + plugin.setup(this); } } From 74da13088033316ade98cce393345bc83541932f Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Thu, 11 Jul 2024 14:20:02 +0100 Subject: [PATCH 3/5] Accept data pointer in archive callback --- src/fdb5/api/FDB.cc | 5 ----- src/fdb5/api/FDB.h | 2 -- src/fdb5/api/helpers/ArchiveCallback.h | 7 ++----- src/fdb5/database/DB.cc | 2 +- tests/fdb/api/test_archive_callback.cc | 2 +- 5 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 6dd9a98c9..307fb8895 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -132,7 +132,6 @@ void FDB::archive(const Key& key, const void* data, size_t length) { timer.stop(); stats_.addArchive(length, timer); - postArchiveCallback_(key, data, length); } bool FDB::sorted(const metkit::mars::MarsRequest &request) { @@ -340,10 +339,6 @@ void FDB::registerCallback(FlushCallback callback) { flushCallback_ = callback; } -void FDB::registerCallback(PostArchiveCallback callback) { - postArchiveCallback_ = callback; -} - void FDB::initPlugins(const Config& config){ bool enableGribjump = eckit::Resource("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false); // TODO: Make this a config option. bool disableGribjump = eckit::Resource("fdbDisableGribjump;$FDB_DISABLE_GRIBJUMP", false); // Emergency off-switch, takes precendence. diff --git a/src/fdb5/api/FDB.h b/src/fdb5/api/FDB.h index 2e0d21630..be1e6c693 100644 --- a/src/fdb5/api/FDB.h +++ b/src/fdb5/api/FDB.h @@ -121,7 +121,6 @@ class FDB { void registerCallback(ArchiveCallback callback); void registerCallback(FlushCallback callback); - void registerCallback(PostArchiveCallback callback); // -------------- API management ---------------------------- @@ -160,7 +159,6 @@ class FDB { FDBStats stats_; FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP; - PostArchiveCallback postArchiveCallback_ = POST_ARCHIVE_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/helpers/ArchiveCallback.h b/src/fdb5/api/helpers/ArchiveCallback.h index 350118a27..54f3a47c7 100644 --- a/src/fdb5/api/helpers/ArchiveCallback.h +++ b/src/fdb5/api/helpers/ArchiveCallback.h @@ -20,13 +20,10 @@ namespace fdb5 { -using ArchiveCallback = std::function; +using ArchiveCallback = std::function; using FlushCallback = std::function; -using PostArchiveCallback = std::function; -static const ArchiveCallback CALLBACK_NOOP = [](const Key&, const FieldLocation&) {}; +static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, const FieldLocation&) {}; static const FlushCallback CALLBACK_FLUSH_NOOP = []() {}; -static const PostArchiveCallback POST_ARCHIVE_NOOP = [](const Key&, const void*, size_t) {}; - } // namespace fdb5 diff --git a/src/fdb5/database/DB.cc b/src/fdb5/database/DB.cc index 56d5c2a3c..8aae6a9e8 100644 --- a/src/fdb5/database/DB.cc +++ b/src/fdb5/database/DB.cc @@ -112,7 +112,7 @@ void DB::archive(const Key& key, const void* data, eckit::Length length, const K const Index& idx = cat->currentIndex(); std::unique_ptr location(store().archive(idx.key(), data, length)); - callback(field, *location); + callback(field, data, length, *location); cat->archive(key, std::move(location)); } diff --git a/tests/fdb/api/test_archive_callback.cc b/tests/fdb/api/test_archive_callback.cc index ced87fa5b..bb9130e92 100644 --- a/tests/fdb/api/test_archive_callback.cc +++ b/tests/fdb/api/test_archive_callback.cc @@ -25,7 +25,7 @@ CASE("Archive callback") { std::map map; std::vector keys; - fdb.registerCallback([&map] (const fdb5::Key& key, const fdb5::FieldLocation& location) { + fdb.registerCallback([&map] (const Key& key, const void* data, size_t length, const FieldLocation& location) { map[key] = location.fullUri(); }); From 8ff6f7bf4f187f7fcf2158f61bc631340bcccaaf Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Mon, 15 Jul 2024 11:41:26 +0100 Subject: [PATCH 4/5] Add flush test and minor cleanup --- src/fdb5/api/FDB.cc | 9 ++------- tests/fdb/api/CMakeLists.txt | 2 +- .../api/{test_archive_callback.cc => test_callback.cc} | 9 ++++++++- 3 files changed, 11 insertions(+), 9 deletions(-) rename tests/fdb/api/{test_archive_callback.cc => test_callback.cc} (91%) diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 307fb8895..858aadbd8 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -13,8 +13,6 @@ * (Project ID: 671951) www.nextgenio.eu */ -#include - #include "eckit/config/Resource.h" #include "eckit/io/DataHandle.h" #include "eckit/io/MemoryHandle.h" @@ -44,7 +42,6 @@ FDB::FDB(const Config &config) : internal_(FDBFactory::instance().build(config)), dirty_(false), reportStats_(config.getBool("statistics", false)) { - initPlugins(config); } @@ -131,7 +128,6 @@ void FDB::archive(const Key& key, const void* data, size_t length) { timer.stop(); stats_.addArchive(length, timer); - } bool FDB::sorted(const metkit::mars::MarsRequest &request) { @@ -340,14 +336,13 @@ void FDB::registerCallback(FlushCallback callback) { } void FDB::initPlugins(const Config& config){ - bool enableGribjump = eckit::Resource("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false); // TODO: Make this a config option. - bool disableGribjump = eckit::Resource("fdbDisableGribjump;$FDB_DISABLE_GRIBJUMP", false); // Emergency off-switch, takes precendence. + bool enableGribjump = eckit::Resource("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false); + bool disableGribjump = eckit::Resource("fdbDisableGribjump;$FDB_DISABLE_GRIBJUMP", false); // Emergency off-switch if (enableGribjump && !disableGribjump) { eckit::system::Plugin& plugin = eckit::system::LibraryManager::loadPlugin("gribjump"); plugin.setup(this); } - } //---------------------------------------------------------------------------------------------------------------------- diff --git a/tests/fdb/api/CMakeLists.txt b/tests/fdb/api/CMakeLists.txt index cf631cdbd..a61a69aa0 100644 --- a/tests/fdb/api/CMakeLists.txt +++ b/tests/fdb/api/CMakeLists.txt @@ -27,7 +27,7 @@ list( APPEND api_tests select dist fdb_c - archive_callback + callback ) foreach( _test ${api_tests} ) diff --git a/tests/fdb/api/test_archive_callback.cc b/tests/fdb/api/test_callback.cc similarity index 91% rename from tests/fdb/api/test_archive_callback.cc rename to tests/fdb/api/test_callback.cc index bb9130e92..9fd09f405 100644 --- a/tests/fdb/api/test_archive_callback.cc +++ b/tests/fdb/api/test_callback.cc @@ -4,7 +4,7 @@ namespace fdb5::test { //---------------------------------------------------------------------------------------------------------------------- -CASE("Archive callback") { +CASE("Archive and flush callback") { FDB fdb; std::string data_str = "Raining cats and dogs"; @@ -24,11 +24,16 @@ CASE("Archive callback") { std::map map; std::vector keys; + bool flushCalled = false; fdb.registerCallback([&map] (const Key& key, const void* data, size_t length, const FieldLocation& location) { map[key] = location.fullUri(); }); + fdb.registerCallback([&flushCalled] () { + flushCalled = true; + }); + key.set("step","1"); keys.push_back(key); fdb.archive(key, data, length); @@ -43,6 +48,8 @@ CASE("Archive callback") { fdb.flush(); + EXPECT(flushCalled); + EXPECT(map.size() == 3); // for (const auto& [key, uri] : map) { From 74b15913f20dd02f8388567ff54a608f43794f72 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 16 Jul 2024 17:31:17 +0100 Subject: [PATCH 5/5] Add constructor callback, autoload plugins, and pass a future to archiveCallback --- src/fdb5/CMakeLists.txt | 2 +- src/fdb5/LibFdb5.cc | 8 +++++++ src/fdb5/LibFdb5.h | 6 ++++++ src/fdb5/api/FDB.cc | 21 ++++++------------- src/fdb5/api/FDB.h | 8 +++---- src/fdb5/api/FDBFactory.h | 4 ++-- .../helpers/{ArchiveCallback.h => Callback.h} | 10 ++++++--- src/fdb5/daos/DaosCatalogueWriter.cc | 2 +- src/fdb5/daos/DaosCatalogueWriter.h | 2 +- src/fdb5/database/Catalogue.h | 2 +- src/fdb5/database/DB.cc | 10 ++++++--- src/fdb5/database/DB.h | 2 +- src/fdb5/database/Field.cc | 2 +- src/fdb5/database/Field.h | 2 +- src/fdb5/toc/TocCatalogueWriter.cc | 2 +- src/fdb5/toc/TocCatalogueWriter.h | 2 +- tests/fdb/api/test_callback.cc | 7 ++++--- 17 files changed, 52 insertions(+), 40 deletions(-) rename src/fdb5/api/helpers/{ArchiveCallback.h => Callback.h} (73%) diff --git a/src/fdb5/CMakeLists.txt b/src/fdb5/CMakeLists.txt index 6b494bb29..41f61c69b 100644 --- a/src/fdb5/CMakeLists.txt +++ b/src/fdb5/CMakeLists.txt @@ -48,7 +48,7 @@ list( APPEND fdb5_srcs api/helpers/PurgeIterator.h api/helpers/StatsIterator.cc api/helpers/StatsIterator.h - api/helpers/ArchiveCallback.h + api/helpers/Callback.h api/local/QueryVisitor.h api/local/QueueStringLogTarget.h diff --git a/src/fdb5/LibFdb5.cc b/src/fdb5/LibFdb5.cc index ec688c31e..161f3c407 100644 --- a/src/fdb5/LibFdb5.cc +++ b/src/fdb5/LibFdb5.cc @@ -48,6 +48,14 @@ const Config& LibFdb5::defaultConfig(const eckit::Configuration& userConfig) { return *config_; } +ConstructorCallback LibFdb5::constructorCallback() { + return constructorCallback_; +} + +void LibFdb5::registerConstructorCallback(ConstructorCallback cb) { + constructorCallback_ = cb; +} + bool LibFdb5::dontDeregisterFactories() const { #if eckit_VERSION_MAJOR > 1 || (eckit_VERSION_MAJOR == 1 && (eckit_VERSION_MINOR > 17 || (eckit_VERSION_MINOR == 17 && eckit_VERSION_PATCH >0))) return eckit::LibEcKit::instance().dontDeregisterFactories(); diff --git a/src/fdb5/LibFdb5.h b/src/fdb5/LibFdb5.h index 524c93b9c..2e1020a5d 100644 --- a/src/fdb5/LibFdb5.h +++ b/src/fdb5/LibFdb5.h @@ -21,6 +21,7 @@ #include "fdb5/database/DB.h" #include "fdb5/types/TypesRegistry.h" +#include "fdb5/api/helpers/Callback.h" namespace fdb5 { @@ -75,6 +76,10 @@ class LibFdb5 : public eckit::system::Library { bool dontDeregisterFactories() const; + void registerConstructorCallback(ConstructorCallback cb); + + ConstructorCallback constructorCallback(); + protected: virtual std::string version() const; @@ -82,6 +87,7 @@ class LibFdb5 : public eckit::system::Library { private: std::unique_ptr config_; + ConstructorCallback constructorCallback_ = CALLBACK_CONSTRUCTOR_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 858aadbd8..4ae6edc0f 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -42,7 +42,8 @@ FDB::FDB(const Config &config) : internal_(FDBFactory::instance().build(config)), dirty_(false), reportStats_(config.getBool("statistics", false)) { - initPlugins(config); + eckit::system::LibraryManager::autoLoadPlugins({}); + LibFdb5::instance().constructorCallback()(*this); } @@ -288,12 +289,11 @@ void FDB::print(std::ostream& s) const { void FDB::flush() { if (dirty_) { - flushCallback_(); - eckit::Timer timer; timer.start(); internal_->flush(); + flushCallback_(); dirty_ = false; timer.stop(); @@ -327,23 +327,14 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const { return internal_->enabled(controlIdentifier); } -void FDB::registerCallback(ArchiveCallback callback) { - internal_->registerCallback(callback); +void FDB::registerArchiveCallback(ArchiveCallback callback) { // todo rename + internal_->registerArchiveCallback(callback); } -void FDB::registerCallback(FlushCallback callback) { +void FDB::registerFlushCallback(FlushCallback callback) { // todo rename flushCallback_ = callback; } -void FDB::initPlugins(const Config& config){ - bool enableGribjump = eckit::Resource("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false); - bool disableGribjump = eckit::Resource("fdbDisableGribjump;$FDB_DISABLE_GRIBJUMP", false); // Emergency off-switch - - if (enableGribjump && !disableGribjump) { - eckit::system::Plugin& plugin = eckit::system::LibraryManager::loadPlugin("gribjump"); - plugin.setup(this); - } -} //---------------------------------------------------------------------------------------------------------------------- } // namespace fdb5 diff --git a/src/fdb5/api/FDB.h b/src/fdb5/api/FDB.h index be1e6c693..6be852565 100644 --- a/src/fdb5/api/FDB.h +++ b/src/fdb5/api/FDB.h @@ -34,7 +34,7 @@ #include "fdb5/api/helpers/WipeIterator.h" #include "fdb5/api/helpers/MoveIterator.h" #include "fdb5/config/Config.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { namespace message { @@ -119,8 +119,8 @@ class FDB { bool dirty() const; - void registerCallback(ArchiveCallback callback); - void registerCallback(FlushCallback callback); + void registerArchiveCallback(ArchiveCallback callback); + void registerFlushCallback(FlushCallback callback); // -------------- API management ---------------------------- @@ -147,8 +147,6 @@ class FDB { bool sorted(const metkit::mars::MarsRequest &request); - void initPlugins(const Config& config); - private: // members std::unique_ptr internal_; diff --git a/src/fdb5/api/FDBFactory.h b/src/fdb5/api/FDBFactory.h index ed526421b..6cd3c1c37 100644 --- a/src/fdb5/api/FDBFactory.h +++ b/src/fdb5/api/FDBFactory.h @@ -37,7 +37,7 @@ #include "fdb5/api/helpers/PurgeIterator.h" #include "fdb5/api/helpers/StatsIterator.h" #include "fdb5/api/helpers/StatusIterator.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { namespace message { @@ -91,7 +91,7 @@ class FDBBase : private eckit::NonCopyable { virtual AxesIterator axes(const FDBToolRequest& request, int axes) { NOTIMP; } - void registerCallback(ArchiveCallback callback) {callback_ = callback;} + void registerArchiveCallback(ArchiveCallback callback) {callback_ = callback;} // -------------- API management ---------------------------- diff --git a/src/fdb5/api/helpers/ArchiveCallback.h b/src/fdb5/api/helpers/Callback.h similarity index 73% rename from src/fdb5/api/helpers/ArchiveCallback.h rename to src/fdb5/api/helpers/Callback.h index 54f3a47c7..8b9f04409 100644 --- a/src/fdb5/api/helpers/ArchiveCallback.h +++ b/src/fdb5/api/helpers/Callback.h @@ -14,16 +14,20 @@ */ #pragma once - +#include #include "fdb5/database/Key.h" #include "fdb5/database/FieldLocation.h" namespace fdb5 { -using ArchiveCallback = std::function; +class FDB; + +using ArchiveCallback = std::function>)>; using FlushCallback = std::function; +using ConstructorCallback = std::function; -static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, const FieldLocation&) {}; +static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, std::future>) {}; static const FlushCallback CALLBACK_FLUSH_NOOP = []() {}; +static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](FDB&) {}; } // namespace fdb5 diff --git a/src/fdb5/daos/DaosCatalogueWriter.cc b/src/fdb5/daos/DaosCatalogueWriter.cc index 3a9f0b661..fc3927f9c 100644 --- a/src/fdb5/daos/DaosCatalogueWriter.cc +++ b/src/fdb5/daos/DaosCatalogueWriter.cc @@ -226,7 +226,7 @@ const Index& DaosCatalogueWriter::currentIndex() { /// @todo: other writers may be simultaneously updating the axes KeyValues in DAOS. Should these /// new updates be retrieved and put into in-memory axes from time to time, e.g. every /// time a value is put in an axis KeyValue? -void DaosCatalogueWriter::archive(const Key& key, std::unique_ptr fieldLocation) { +void DaosCatalogueWriter::archive(const Key& key, std::shared_ptr fieldLocation) { if (current_.null()) { ASSERT(!currentIndexKey_.empty()); diff --git a/src/fdb5/daos/DaosCatalogueWriter.h b/src/fdb5/daos/DaosCatalogueWriter.h index b738a6420..44fcd7384 100644 --- a/src/fdb5/daos/DaosCatalogueWriter.h +++ b/src/fdb5/daos/DaosCatalogueWriter.h @@ -56,7 +56,7 @@ class DaosCatalogueWriter : public DaosCatalogue, public CatalogueWriter { void clean() override; void close() override; - void archive(const Key& key, std::unique_ptr fieldLocation) override; + void archive(const Key& key, std::shared_ptr fieldLocation) override; virtual void print( std::ostream &out ) const override { NOTIMP; } diff --git a/src/fdb5/database/Catalogue.h b/src/fdb5/database/Catalogue.h index 1d7be79fc..c2b344701 100644 --- a/src/fdb5/database/Catalogue.h +++ b/src/fdb5/database/Catalogue.h @@ -121,7 +121,7 @@ class CatalogueReader { class CatalogueWriter { public: virtual const Index& currentIndex() = 0; - virtual void archive(const Key& key, std::unique_ptr fieldLocation) = 0; + virtual void archive(const Key& key, std::shared_ptr fieldLocation) = 0; virtual void overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) = 0; virtual void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) = 0; virtual void reconsolidate() = 0; diff --git a/src/fdb5/database/DB.cc b/src/fdb5/database/DB.cc index 8aae6a9e8..b916612f1 100644 --- a/src/fdb5/database/DB.cc +++ b/src/fdb5/database/DB.cc @@ -14,7 +14,6 @@ #include "fdb5/database/DB.h" #include "fdb5/database/Field.h" #include "fdb5/toc/TocEngine.h" -#include "fdb5/api/helpers/ArchiveCallback.h" using eckit::Log; @@ -110,9 +109,14 @@ void DB::archive(const Key& key, const void* data, eckit::Length length, const K ASSERT(cat); const Index& idx = cat->currentIndex(); - std::unique_ptr location(store().archive(idx.key(), data, length)); - callback(field, data, length, *location); + std::shared_ptr location(store().archive(idx.key(), data, length)); + + // In anticipaton of store().archive() working asynchronously in later FDB versions. + std::promise> promise; + promise.set_value(location); + + callback(field, data, length, promise.get_future()); cat->archive(key, std::move(location)); } diff --git a/src/fdb5/database/DB.h b/src/fdb5/database/DB.h index fcb6735c1..ee80a826f 100644 --- a/src/fdb5/database/DB.h +++ b/src/fdb5/database/DB.h @@ -23,7 +23,7 @@ #include "fdb5/database/EntryVisitMechanism.h" #include "fdb5/database/Key.h" #include "fdb5/database/Store.h" -#include "fdb5/api/helpers/ArchiveCallback.h" +#include "fdb5/api/helpers/Callback.h" namespace eckit { class DataHandle; diff --git a/src/fdb5/database/Field.cc b/src/fdb5/database/Field.cc index 53d3a7be9..a29dd3b60 100644 --- a/src/fdb5/database/Field.cc +++ b/src/fdb5/database/Field.cc @@ -16,7 +16,7 @@ namespace fdb5 { Field::Field() {} -Field::Field(std::unique_ptr location, time_t timestamp, const FieldDetails& details): +Field::Field(std::shared_ptr location, time_t timestamp, const FieldDetails& details): location_(std::move(location)), timestamp_(timestamp), details_(details) { diff --git a/src/fdb5/database/Field.h b/src/fdb5/database/Field.h index b49ef3753..4ce77a3e3 100644 --- a/src/fdb5/database/Field.h +++ b/src/fdb5/database/Field.h @@ -43,7 +43,7 @@ class Field { Field(); - Field(std::unique_ptr location, time_t timestamp, const FieldDetails& details = FieldDetails()); + Field(std::shared_ptr location, time_t timestamp, const FieldDetails& details = FieldDetails()); Field(const FieldLocation&& location, time_t timestamp, const FieldDetails& details = FieldDetails()); eckit::DataHandle* dataHandle() const { return location_->dataHandle(); } diff --git a/src/fdb5/toc/TocCatalogueWriter.cc b/src/fdb5/toc/TocCatalogueWriter.cc index 8bf0f8078..feea5c9aa 100644 --- a/src/fdb5/toc/TocCatalogueWriter.cc +++ b/src/fdb5/toc/TocCatalogueWriter.cc @@ -294,7 +294,7 @@ bool TocCatalogueWriter::enabled(const ControlIdentifier& controlIdentifier) con return TocCatalogue::enabled(controlIdentifier); } -void TocCatalogueWriter::archive(const Key& key, std::unique_ptr fieldLocation) { +void TocCatalogueWriter::archive(const Key& key, std::shared_ptr fieldLocation) { dirty_ = true; if (current_.null()) { diff --git a/src/fdb5/toc/TocCatalogueWriter.h b/src/fdb5/toc/TocCatalogueWriter.h index 2d2caa88f..040799e43 100644 --- a/src/fdb5/toc/TocCatalogueWriter.h +++ b/src/fdb5/toc/TocCatalogueWriter.h @@ -70,7 +70,7 @@ class TocCatalogueWriter : public TocCatalogue, public CatalogueWriter { void clean() override; void close() override; - void archive(const Key& key, std::unique_ptr fieldLocation) override; + void archive(const Key& key, std::shared_ptr fieldLocation) override; void reconsolidateIndexesAndTocs(); virtual void print( std::ostream &out ) const override; diff --git a/tests/fdb/api/test_callback.cc b/tests/fdb/api/test_callback.cc index 9fd09f405..60449c883 100644 --- a/tests/fdb/api/test_callback.cc +++ b/tests/fdb/api/test_callback.cc @@ -26,11 +26,12 @@ CASE("Archive and flush callback") { std::vector keys; bool flushCalled = false; - fdb.registerCallback([&map] (const Key& key, const void* data, size_t length, const FieldLocation& location) { - map[key] = location.fullUri(); + fdb.registerArchiveCallback([&map] (const Key& key, const void* data, size_t length, std::future> future) { + std::shared_ptr location = future.get(); + map[key] = location->fullUri(); }); - fdb.registerCallback([&flushCalled] () { + fdb.registerFlushCallback([&flushCalled] () { flushCalled = true; });