Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/gribjump plugin #28

Merged
merged 5 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/fdb5/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ list( APPEND fdb5_srcs
api/helpers/PurgeIterator.h
api/helpers/StatsIterator.cc
api/helpers/StatsIterator.h
api/helpers/ArchiveCallback.h
api/helpers/Callback.h

api/local/QueryVisitor.h
api/local/QueueStringLogTarget.h
Expand Down
8 changes: 8 additions & 0 deletions src/fdb5/LibFdb5.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ const Config& LibFdb5::defaultConfig(const eckit::Configuration& userConfig) {
return *config_;
}

ConstructorCallback LibFdb5::constructorCallback() {
return constructorCallback_;
}

void LibFdb5::registerConstructorCallback(ConstructorCallback cb) {
constructorCallback_ = cb;
}

bool LibFdb5::dontDeregisterFactories() const {
#if eckit_VERSION_MAJOR > 1 || (eckit_VERSION_MAJOR == 1 && (eckit_VERSION_MINOR > 17 || (eckit_VERSION_MINOR == 17 && eckit_VERSION_PATCH >0)))
return eckit::LibEcKit::instance().dontDeregisterFactories();
Expand Down
6 changes: 6 additions & 0 deletions src/fdb5/LibFdb5.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "fdb5/database/DB.h"
#include "fdb5/types/TypesRegistry.h"
#include "fdb5/api/helpers/Callback.h"

namespace fdb5 {

Expand Down Expand Up @@ -75,13 +76,18 @@ class LibFdb5 : public eckit::system::Library {

bool dontDeregisterFactories() const;

void registerConstructorCallback(ConstructorCallback cb);

ConstructorCallback constructorCallback();

protected:
virtual std::string version() const;

virtual std::string gitsha1(unsigned int count) const;

private:
std::unique_ptr<Config> config_;
ConstructorCallback constructorCallback_ = CALLBACK_CONSTRUCTOR_NOOP;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
18 changes: 14 additions & 4 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include "eckit/message/Message.h"
#include "eckit/message/Reader.h"

#include "eckit/system/Plugin.h"
#include "eckit/system/LibraryManager.h"

#include "metkit/hypercube/HyperCubePayloaded.h"

#include "fdb5/LibFdb5.h"
Expand All @@ -38,7 +41,10 @@ namespace fdb5 {
FDB::FDB(const Config &config) :
internal_(FDBFactory::instance().build(config)),
dirty_(false),
reportStats_(config.getBool("statistics", false)) {}
reportStats_(config.getBool("statistics", false)) {
eckit::system::LibraryManager::autoLoadPlugins({});
LibFdb5::instance().constructorCallback()(*this);
}


FDB::~FDB() {
Expand Down Expand Up @@ -283,11 +289,11 @@ void FDB::print(std::ostream& s) const {

void FDB::flush() {
if (dirty_) {

eckit::Timer timer;
timer.start();

internal_->flush();
flushCallback_();
dirty_ = false;

timer.stop();
Expand Down Expand Up @@ -321,8 +327,12 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const {
return internal_->enabled(controlIdentifier);
}

void FDB::registerCallback(ArchiveCallback callback) {
internal_->registerCallback(callback);
void FDB::registerArchiveCallback(ArchiveCallback callback) { // todo rename
internal_->registerArchiveCallback(callback);
}

void FDB::registerFlushCallback(FlushCallback callback) { // todo rename
flushCallback_ = callback;
}

//----------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions src/fdb5/api/FDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "fdb5/api/helpers/WipeIterator.h"
#include "fdb5/api/helpers/MoveIterator.h"
#include "fdb5/config/Config.h"
#include "fdb5/api/helpers/ArchiveCallback.h"
#include "fdb5/api/helpers/Callback.h"

namespace eckit {
namespace message {
Expand Down Expand Up @@ -119,7 +119,8 @@ class FDB {

bool dirty() const;

void registerCallback(ArchiveCallback callback);
void registerArchiveCallback(ArchiveCallback callback);
void registerFlushCallback(FlushCallback callback);

// -------------- API management ----------------------------

Expand Down Expand Up @@ -155,6 +156,7 @@ class FDB {

FDBStats stats_;

FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In constructor:

flushCallback_ = LibFDB::instance().defaultFlushCallback();

};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/api/FDBFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include "fdb5/api/helpers/PurgeIterator.h"
#include "fdb5/api/helpers/StatsIterator.h"
#include "fdb5/api/helpers/StatusIterator.h"
#include "fdb5/api/helpers/ArchiveCallback.h"
#include "fdb5/api/helpers/Callback.h"

namespace eckit {
namespace message {
Expand Down Expand Up @@ -91,7 +91,7 @@ class FDBBase : private eckit::NonCopyable {

virtual AxesIterator axes(const FDBToolRequest& request, int axes) { NOTIMP; }

void registerCallback(ArchiveCallback callback) {callback_ = callback;}
void registerArchiveCallback(ArchiveCallback callback) {callback_ = callback;}

// -------------- API management ----------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@
*/

#pragma once

#include <future>
#include "fdb5/database/Key.h"
#include "fdb5/database/FieldLocation.h"

namespace fdb5 {

using ArchiveCallback = std::function<void(const Key&, const FieldLocation&)>;
class FDB;

using ArchiveCallback = std::function<void(const Key& key, const void* data, size_t length, std::future<std::shared_ptr<FieldLocation>>)>;
using FlushCallback = std::function<void()>;
using ConstructorCallback = std::function<void(FDB&)>;

static const ArchiveCallback CALLBACK_NOOP = [](const Key&, const FieldLocation&) {};
static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, std::future<std::shared_ptr<FieldLocation>>) {};
static const FlushCallback CALLBACK_FLUSH_NOOP = []() {};
static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](FDB&) {};

} // namespace fdb5
2 changes: 1 addition & 1 deletion src/fdb5/daos/DaosCatalogueWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ const Index& DaosCatalogueWriter::currentIndex() {
/// @todo: other writers may be simultaneously updating the axes KeyValues in DAOS. Should these
/// new updates be retrieved and put into in-memory axes from time to time, e.g. every
/// time a value is put in an axis KeyValue?
void DaosCatalogueWriter::archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) {
void DaosCatalogueWriter::archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) {

if (current_.null()) {
ASSERT(!currentIndexKey_.empty());
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/daos/DaosCatalogueWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DaosCatalogueWriter : public DaosCatalogue, public CatalogueWriter {
void clean() override;
void close() override;

void archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) override;
void archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) override;

virtual void print( std::ostream &out ) const override { NOTIMP; }

Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Catalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class CatalogueReader {
class CatalogueWriter {
public:
virtual const Index& currentIndex() = 0;
virtual void archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) = 0;
virtual void archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) = 0;
virtual void overlayDB(const Catalogue& otherCatalogue, const std::set<std::string>& variableKeys, bool unmount) = 0;
virtual void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) = 0;
virtual void reconsolidate() = 0;
Expand Down
10 changes: 7 additions & 3 deletions src/fdb5/database/DB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "fdb5/database/DB.h"
#include "fdb5/database/Field.h"
#include "fdb5/toc/TocEngine.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

using eckit::Log;

Expand Down Expand Up @@ -110,9 +109,14 @@ void DB::archive(const Key& key, const void* data, eckit::Length length, const K
ASSERT(cat);

const Index& idx = cat->currentIndex();
std::unique_ptr<FieldLocation> location(store().archive(idx.key(), data, length));

callback(field, *location);
std::shared_ptr<FieldLocation> location(store().archive(idx.key(), data, length));

// In anticipaton of store().archive() working asynchronously in later FDB versions.
std::promise<std::shared_ptr<FieldLocation>> promise;
promise.set_value(location);

callback(field, data, length, promise.get_future());

cat->archive(key, std::move(location));
}
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/DB.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "fdb5/database/EntryVisitMechanism.h"
#include "fdb5/database/Key.h"
#include "fdb5/database/Store.h"
#include "fdb5/api/helpers/ArchiveCallback.h"
#include "fdb5/api/helpers/Callback.h"

namespace eckit {
class DataHandle;
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Field.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace fdb5 {

Field::Field() {}

Field::Field(std::unique_ptr<FieldLocation> location, time_t timestamp, const FieldDetails& details):
Field::Field(std::shared_ptr<FieldLocation> location, time_t timestamp, const FieldDetails& details):
location_(std::move(location)),
timestamp_(timestamp),
details_(details) {
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Field.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Field {

Field();

Field(std::unique_ptr<FieldLocation> location, time_t timestamp, const FieldDetails& details = FieldDetails());
Field(std::shared_ptr<FieldLocation> location, time_t timestamp, const FieldDetails& details = FieldDetails());
Field(const FieldLocation&& location, time_t timestamp, const FieldDetails& details = FieldDetails());

eckit::DataHandle* dataHandle() const { return location_->dataHandle(); }
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/toc/TocCatalogueWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ bool TocCatalogueWriter::enabled(const ControlIdentifier& controlIdentifier) con
return TocCatalogue::enabled(controlIdentifier);
}

void TocCatalogueWriter::archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) {
void TocCatalogueWriter::archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) {
dirty_ = true;

if (current_.null()) {
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/toc/TocCatalogueWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TocCatalogueWriter : public TocCatalogue, public CatalogueWriter {
void clean() override;
void close() override;

void archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) override;
void archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) override;
void reconsolidateIndexesAndTocs();

virtual void print( std::ostream &out ) const override;
Expand Down
2 changes: 1 addition & 1 deletion tests/fdb/api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ list( APPEND api_tests
select
dist
fdb_c
archive_callback
callback
)

foreach( _test ${api_tests} )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace fdb5::test {

//----------------------------------------------------------------------------------------------------------------------
CASE("Archive callback") {
CASE("Archive and flush callback") {
FDB fdb;

std::string data_str = "Raining cats and dogs";
Expand All @@ -24,9 +24,15 @@ CASE("Archive callback") {

std::map<fdb5::Key, eckit::URI> map;
std::vector<Key> keys;
bool flushCalled = false;

fdb.registerCallback([&map] (const fdb5::Key& key, const fdb5::FieldLocation& location) {
map[key] = location.fullUri();
fdb.registerArchiveCallback([&map] (const Key& key, const void* data, size_t length, std::future<std::shared_ptr<FieldLocation>> future) {
std::shared_ptr<FieldLocation> location = future.get();
map[key] = location->fullUri();
});

fdb.registerFlushCallback([&flushCalled] () {
flushCalled = true;
});

key.set("step","1");
Expand All @@ -43,6 +49,8 @@ CASE("Archive callback") {

fdb.flush();

EXPECT(flushCalled);

EXPECT(map.size() == 3);

// for (const auto& [key, uri] : map) {
Expand Down
Loading